標題誇張了點,不過我在寫 Async Stream 範例的過程中突發奇想把它移植到一般的 .NET Framework 環境中使用。
開發環境 Visual Studio 2019 Preview 1 (16.0.0 Preview 1)
框架 .NET Framework 4.7.2
編譯器 C# 8.0 beta
有沒有可能把這個行為移植到傳統的 .NET Framework 環境中?還真的有可能,只不過需要一點點小小的改造,首先因為用到了 ValueTask,所以先使用 nuget 加入 System.Threading.Tasks.Extensions 套件,我在測試所使用的版本是 v4.6.0-preview.18751.3,nuget 同時會幫你安裝 System.Runtime.CompilerServices.Unsafe 套件。
安裝完成後,要自己寫程式碼先補上 Async Streams 的三個主要 interfaces:
namespace System.Collections.Generic
{
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator();
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
T Current { get; }
ValueTask<bool> MoveNextAsync();
}
}
namespace System
{
public interface IAsyncDisposable
{
ValueTask DisposeAsync();
}
}
為了 yield return ,照著前一篇文自行寫程式碼加上編譯器需要的兩個型別:
namespace System.Threading.Tasks
{
using System.Runtime.CompilerServices;
using System.Threading.Tasks.Sources;
internal struct ManualResetValueTaskSourceLogic<TResult>
{
private ManualResetValueTaskSourceCore<TResult> _core;
public ManualResetValueTaskSourceLogic(IStrongBox<ManualResetValueTaskSourceLogic<TResult>> parent) : this() { }
public short Version => _core.Version;
public TResult GetResult(short token) => _core.GetResult(token);
public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token);
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _core.OnCompleted(continuation, state, token, flags);
public void Reset() => _core.Reset();
public void SetResult(TResult result) => _core.SetResult(result);
public void SetException(Exception error) => _core.SetException(error);
}
}
namespace System.Runtime.CompilerServices
{
internal interface IStrongBox<T> { ref T Value { get; } }
}
步驟還沒完呢,這時候會發現還缺少一個型別 ManualResetValueTaskSourceCore<TResult>,這個型別在 ManualResetValueTaskSourceLogic<TResult> 擔任一個很重要的角色 -- 它才是真正執行邏輯的型。很不幸的,這個型別並不存在於 .NET Framework 4.7.2,於是我去微軟的 git hub 找到了這個型別在 .NET Core 裡的原始程式碼;問題來了, 這個程式碼的內用了一些 .NET Core 才有的東西,此時我面臨了兩個抉擇 (1) 一層層追原始碼全部實作 (2) 試著改看看,讓原來呼叫 .NET Core 獨有的部分轉換成 .NET Framework 可以使用的形式。基於我只是覺得好玩,想要早點做完這個實驗,所以選了 (2),修改了一部分的程式碼來適應 .NET Framework 4.7.2,在此做一個免責宣告 -- 我不知道這樣搞會有甚麼奇妙的副作用,總之請各位以有趣的角度來看待這件事。
我改好的 ManualResetValueTaskSourceCore<TResult> 如下 (這裡還包含有另一個 ManualResetValueTaskSourceCoreShared class ):
namespace System.Threading.Tasks.Sources
{
/// <summary>
/// reference from https://github.com/dotnet/coreclr/blob/master/src/System.Private.CoreLib/shared/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs
/// </summary>
/// <typeparam name="TResult"></typeparam>
[StructLayout(LayoutKind.Auto)]
public struct ManualResetValueTaskSourceCore<TResult>
{
private Action<object> _continuation;
private object _continuationState;
private ExecutionContext _executionContext;
private object _capturedContext;
private bool _completed;
private TResult _result;
private ExceptionDispatchInfo _error;
private short _version;
public bool RunContinuationsAsynchronously { get; set; }
public void Reset()
{
_version++;
_completed = false;
_result = default;
_error = null;
_executionContext = null;
_capturedContext = null;
_continuation = null;
_continuationState = null;
}
public void SetResult(TResult result)
{
_result = result;
SignalCompletion();
}
public void SetException(Exception error)
{
_error = ExceptionDispatchInfo.Capture(error);
SignalCompletion();
}
public short Version => _version;
public ValueTaskSourceStatus GetStatus(short token)
{
ValidateToken(token);
return
!_completed ? ValueTaskSourceStatus.Pending :
_error == null ? ValueTaskSourceStatus.Succeeded :
_error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
ValueTaskSourceStatus.Faulted;
}
public TResult GetResult(short token)
{
ValidateToken(token);
if (!_completed)
{
ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
}
_error?.Throw();
return _result;
}
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
if (continuation == null)
{
throw new ArgumentNullException(nameof(continuation));
}
ValidateToken(token);
if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
{
_executionContext = ExecutionContext.Capture();
}
if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
{
SynchronizationContext sc = SynchronizationContext.Current;
if (sc != null && sc.GetType() != typeof(SynchronizationContext))
{
_capturedContext = sc;
}
else
{
TaskScheduler ts = TaskScheduler.Current;
if (ts != TaskScheduler.Default)
{
_capturedContext = ts;
}
}
}
object oldContinuation = _continuation;
if (oldContinuation == null)
{
_continuationState = state;
oldContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);
}
if (oldContinuation != null)
{
if (!ReferenceEquals(oldContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel))
{
ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
}
switch (_capturedContext)
{
case null:
if (_executionContext != null)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(continuation), state);
}
else
{
ThreadPool.UnsafeQueueUserWorkItem(new WaitCallback(continuation), state);
}
break;
case SynchronizationContext sc:
sc.Post(s =>
{
var tuple = (Tuple<Action<object>, object>)s;
tuple.Item1(tuple.Item2);
}, Tuple.Create(continuation, state));
break;
case TaskScheduler ts:
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
break;
}
}
}
private void ValidateToken(short token)
{
if (token != _version)
{
ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
}
}
private void SignalCompletion()
{
if (_completed)
{
ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
}
_completed = true;
if (_continuation != null || Interlocked.CompareExchange(ref _continuation, ManualResetValueTaskSourceCoreShared.s_sentinel, null) != null)
{
if (_executionContext != null)
{
ExecutionContext.Run(
_executionContext,
(s) => ((ManualResetValueTaskSourceCore<TResult>)s).InvokeContinuation(),
this);
}
else
{
InvokeContinuation();
}
}
}
private void InvokeContinuation()
{
switch (_capturedContext)
{
case null:
if (RunContinuationsAsynchronously)
{
if (_executionContext != null)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(_continuation), _continuationState);
}
else
{
ThreadPool.UnsafeQueueUserWorkItem(new WaitCallback(_continuation), _continuationState);
}
}
else
{
_continuation(_continuationState);
}
break;
case SynchronizationContext sc:
sc.Post(s =>
{
var state = (Tuple<Action<object>, object>)s;
state.Item1(state.Item2);
}, Tuple.Create(_continuation, _continuationState));
break;
case TaskScheduler ts:
Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
break;
}
}
}
internal static class ManualResetValueTaskSourceCoreShared
{
internal static void ThrowInvalidOperationException() => throw new InvalidOperationException();
internal static readonly Action<object> s_sentinel = CompletionSentinel;
private static void CompletionSentinel(object _)
{
Debug.Fail("The sentinel delegate should never be invoked.");
ThrowInvalidOperationException();
}
}
}
然後複製前一篇文章的例子:
public class AsyncEnumerableProcess
{
async static public IAsyncEnumerable<string> ReadLineAsync(string path)
{
var enumerator = new AsyncEnumerator(path);
try
{
while (await enumerator.MoveNextAsync())
{
await Task.Delay(100);
yield return enumerator.Current;
}
}
finally
{
await enumerator.DisposeAsync();
}
}
}
public class AsyncEnumerator : IAsyncEnumerator<string>
{
private readonly StreamReader _reader;
private bool _disposed;
public string Current { get; private set; }
public AsyncEnumerator(string path)
{
_reader = File.OpenText(path);
_disposed = false;
}
async public ValueTask<bool> MoveNextAsync()
{
Current = await _reader.ReadLineAsync();
return Current != null;
}
async public ValueTask DisposeAsync()
{
await Task.Run(() => Dispose());
}
private void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (!this._disposed)
{
if (_reader != null)
{
_reader.Dispose();
}
_disposed = true;
}
}
}
class Program
{
async static Task Main(string[] args)
{
var path = "SourceFile.txt";
await foreach (var item in AsyncEnumerableProcess.ReadLineAsync(path))
{
Console.WriteLine(item);
};
Console.ReadLine();
}
}
嗯,真的會動,沒想到這樣就可以在 .NET Framework 4.7.2 用 Async Stream 了。