mirror of
https://github.com/Alex-Rachel/TEngine.git
synced 2025-08-14 16:51:28 +00:00
Init TEngine4.0.0
Init TEngine4.0.0
This commit is contained in:
97
UnityProject/Packages/UniTask/Runtime/Linq/ToObservable.cs
Normal file
97
UnityProject/Packages/UniTask/Runtime/Linq/ToObservable.cs
Normal file
@@ -0,0 +1,97 @@
|
||||
using Cysharp.Threading.Tasks.Internal;
|
||||
using System;
|
||||
using System.Threading;
|
||||
|
||||
namespace Cysharp.Threading.Tasks.Linq
|
||||
{
|
||||
public static partial class UniTaskAsyncEnumerable
|
||||
{
|
||||
public static IObservable<TSource> ToObservable<TSource>(this IUniTaskAsyncEnumerable<TSource> source)
|
||||
{
|
||||
Error.ThrowArgumentNullException(source, nameof(source));
|
||||
|
||||
return new ToObservable<TSource>(source);
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class ToObservable<T> : IObservable<T>
|
||||
{
|
||||
readonly IUniTaskAsyncEnumerable<T> source;
|
||||
|
||||
public ToObservable(IUniTaskAsyncEnumerable<T> source)
|
||||
{
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
public IDisposable Subscribe(IObserver<T> observer)
|
||||
{
|
||||
var ctd = new CancellationTokenDisposable();
|
||||
|
||||
RunAsync(source, observer, ctd.Token).Forget();
|
||||
|
||||
return ctd;
|
||||
}
|
||||
|
||||
static async UniTaskVoid RunAsync(IUniTaskAsyncEnumerable<T> src, IObserver<T> observer, CancellationToken cancellationToken)
|
||||
{
|
||||
// cancellationToken.IsCancellationRequested is called when Rx's Disposed.
|
||||
// when disposed, finish silently.
|
||||
|
||||
var e = src.GetAsyncEnumerator(cancellationToken);
|
||||
try
|
||||
{
|
||||
bool hasNext;
|
||||
|
||||
do
|
||||
{
|
||||
try
|
||||
{
|
||||
hasNext = await e.MoveNextAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
observer.OnError(ex);
|
||||
return;
|
||||
}
|
||||
|
||||
if (hasNext)
|
||||
{
|
||||
observer.OnNext(e.Current);
|
||||
}
|
||||
else
|
||||
{
|
||||
observer.OnCompleted();
|
||||
return;
|
||||
}
|
||||
} while (!cancellationToken.IsCancellationRequested);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (e != null)
|
||||
{
|
||||
await e.DisposeAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class CancellationTokenDisposable : IDisposable
|
||||
{
|
||||
readonly CancellationTokenSource cts = new CancellationTokenSource();
|
||||
|
||||
public CancellationToken Token => cts.Token;
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (!cts.IsCancellationRequested)
|
||||
{
|
||||
cts.Cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user