GitXplorerGitXplorer
q

AsyncObservable

public
3 stars
0 forks
0 issues

Commits

List of commits on branch master.
Unverified
8f4e1c2c50e8c063eedb59e2432f8dced52f0325

Add UnicastAsyncSubject

qquinmars committed 6 years ago
Unverified
e7b98ba5f0e02bbed7f6b29d2c8f63bcdc459aef

Add Tests for sums intermediate results

qquinmars committed 6 years ago
Unverified
65edd36253c0f413f8015f8cf743bb1ecb019255

Use new aggregating infrastructure for sum

qquinmars committed 6 years ago
Unverified
b24901f34f515eb970fdcbdf654b3ff2cc7232e0

Fix comment

qquinmars committed 6 years ago
Unverified
ca976c117217af26f42f2c3ddc96a7c72d19e815

Remove unused variable

qquinmars committed 6 years ago
Unverified
32450f55ea228458914cb1d897aed44e3591ca5e

Refactor aggregating operators, start with Max

qquinmars committed 6 years ago

README

The README file for this repository.

AsyncObservable

Around one year ago Bart de Smert started to work on IAsyncObservable in public. IAsyncObservable offers some advantages over the classical Rx interface, especially when slow asynchronous consumers are involved. His implementation is a one to one translation of the Rx.NET contract, so he unfortunately omitted the opportunity to correct some deficits of the original synchronous contract. Namely:

  • Synchronous Cancellation: The following will print the numbers from 0 to 999 because there is no way for the Take operator to stop Range.
Range(0,1000)
   .Do(Console.WriteLine($"{i}"))
   .Take(2)
   .Subscribe(); 
  • IDisposable Contract: A resource should not be disposed while it is still in use. In Rx one IDisposable (returned by Subscribe) is used for two conflicting purposes. On the one hand it is used as a cancellation trigger to stop the parent observable production, on the other hand it is used for resource cleanup. Cancellation demands to be fast, so that not to many neglected values are generated. Contrary, resource cleanup requests to be late so that a resource will not be disposed while it is still in use, for example in a still running OnNext call.

In this repo I play with an alternate interface, that tries to circumvent these two points.

IAsyncObservable interface and contract

There are manly two interfaces involved, IAsnycObservable:

public interface IAsyncObservable<out T>
{
    ValueTask SubscribeAsync(IAsyncObserver<T> observer);
}

it offers one method to subscribe an observer to an observable. The returned task should complete when the sequence is finished completely, i.e. after OnFinallyAsync of the observer is called.

public interface IAsyncObserver<in T>
{
    ValueTask OnSubscibeAsync(IDisposable cancelable);
    ValueTask OnNextAsync(T value);
    ValueTask OnCompletedAsync();
    ValueTask OnErrorAsync(Exception error);
    ValueTask OnFinallyAsync();
}

The observer interface may only be used sequentially. All methods must not be called in parallel. The returned tasks have to be awaited before another or the same method is allowed to be called. For an operator it is ok to rely on that producer and consumer play nice. It is not required to defend the contract against a broken producer or consumer. Operators should if possible pass the returned tasks through, so that in an ideal case the task of the final consumer is passed directly to the source. That has the consequence that a task may be in a faulted state. Faulted tasks are explicitly allowed. Consequently an operator that does more than passing exceptions through via OnErrorAsync(), like Catch for example, has to actively protect the upstream from faulted downstream tasks.

OnSubscribeAsync()

The first method to be called is OnSubscribeAsync(). It passes a cancellation trigger to the downstream observer enabling it to stop further upstream production. If an observer needs to acquire any resources, the OnSubscribeAsync() method would be a good place to do that. Some operator may trigger the cancellation immediately like for example Take(0). It is not mandatory for an operator to react on the cancellation and an operator can choose to simply pass the upstream cancelable through. But it is good practice to stop at least any further propagation.

OnNextAsync(), OnCompletedAsync() and OnErrorAsync()

The semantics of those three methods are very similar to the synchronous IObserver contract. If an operator terminates a sequence, like for example Take, it should cancel the upstream observer. If it is simply forwarding a completion or an error it should not call Dispose of the upstream Disposeble.

OnFinallyAsync()

The OnFinallyAsync() method should be called after OnCompletedAsync(), OnErrorAsync() or - in the case of a cancellation - OnNextAsync() are await. An operator can dispose used resources here safely.

Open questions

The contract may and will probably change. There are some open questions.

  • When using await should the tasks be awaited with .ConfigureAwait(true)?
  • How should scheduler look like?