Rx: Event composition – single-valued

In a first blog post on the topic „event composition with Rx“ I presented operators that take several event input streams and create a single output streams, whose elements contain values from each input event („multi-valued“ event composition operators). This second blog post instead treats “single-valued” event composition. Multiple streams go into those composition operators and one single stream comes out, where each element of the output stream matches exactly one event/value from any input stream!

Example solution, PPT slides and general information

In general, all introductory information on composition, description of the „multi-valued“ event composition operators and more bits-and-pieces can be found in my first blog post.

I’ve created a SL3 demo project where you can experiment with each composition operator. You can find the sourcecode (SL3 VS2008 project) here and the live demo on Windows Azure here.

The PPT slides that contain all diagrams of the composition operators can be downloaded here.
I would be glad if you could leave my name and/or the blog URL (www.minddriven.de) on every diagram.

Merge()

The operator Merge does serialization of all input event streams by firing an event everytime one of the input streams fires. The event/value of the output stream will be the same as the event/value on the input stream that has fired.

The following code shows the simple usage of Merge:

    var composition = 
        Observable.Merge(stream1, stream2, stream3);

And the following example flow diagram shows the behavior of the operator under this code. Note how the incoming events are written to the output stream in exactly the same order as they come in:
Rx Event Composition - Merge

Amb()

Amb comes from ambiguous and has the following semantics. If you combine several input event streams with Amb, the output stream will only contain the events from the input stream that fires first. The other input streams are not taken into account afterwards. That’s a kind of winner-takes-all-strategy and could be valuable in some situations.

Amb can be used as follows:

    var composition = 
       stream1.Amb(stream2).Amb(stream3);

And the following diagram shows an example composition situation. Note that stream 1 fires first and consequently Amb only forwards events from this stream:
Rx Event Composition - Amb

Concat()

Concat does (the name says it all) concatenation of the input event streams. But wait: how can this work? In fact Concat only works when you have finite event streams! On infinite streams, Concat watches the very first stream forever and the other streams don’t come to their turn. Thus use Concat on finite event streams, when serialization of the input streams in the right order is crucial.

The following code shows the use of Concat on three finite input streams:

    var composition = 
        stream1.Concat(stream2).Concat(stream3);

And here’s the concatenation done by this code in an example situation (the square brackets illustrate finite streams):
Rx Event Composition - Concat

Until()

The operator name says it all: Until forwards events from an input stream to the output stream until an event on a second stream occurs.

The following code shows the usage:

    var composition = 
        stream1.Until(stream2);

And the following diagram shows the behavior of Until on 2 streams based on this code:
Rx Event Composition - Until

WaitUntil()

Again the operator name speaks for itself: WaitUntil does not forward events from an input stream to an output stream until an event on a second stream occurs. From then on every event of the first input stream will be forwarded.

Here’s some code that shows how WaitUntil works:

    var composition = 
        stream1.WaitUntil(stream2);

And the following diagram shows WaitUntil’s behavior based on this code in an example situation:

Rx Event Composition - WaitUntil

That’s it again. I hope you finde this information useful. As in the blog post on multi-valued event composition, I want to encourage you to give me active feedback and to improve the examples and descriptions of the composition operators above. Thanks!

kick it on DotNetKicks.com

Rx: Event composition – multi-valued

In preparation of some presentation on the Reactive Extensions (Rx) last week I’ve investigated the composition methods for several event streams on the Observable class. What methods are existing and what are they doing? Let’s take a look…

This first blog post treats „multi-valued“ event composition in terms of multiple streams going in and one stream comes out, but each element of the output stream is composed by sub-elements from each input stream! There is a second blog post that treats „single-valued“ event composition as well.

Example solution

I’ve written a little SL3 application where you can discover the behavior of those composition operators. It builds on example code that first has been published by Paul Batum some months ago and extends his code with some more possibilities. I’ve hosted the SL3 on Windows Azure where you can test it directly:

Rx Composition Demo

You can download the sourcecode of the Visual Studio 2008 solution as well: [Event Composition Sourcecode (ZIP)]
All you need to run the solution is the latest Rx for SL3 build, Visual Studio 2008 and the Silverlight 3 SDK.

Powerpoint Sources

If you want to use my diagrams in your presentations or somewhere else, here is the source PPT file: [Rx Event Composition PPT]
I would be glad if you could leave my name and/or the blog URL (www.minddriven.de) on every diagram.

How to read the diagrams and code

The general visualization concept is taken from Eric Meijer’s PDC session about Rx. Thanks Eric for your great work!

Rx Event Composition - General Visualization

The visualization shows an Rx composing operator in the middle. On the left there are event input streams, on the right is the composed output stream. The order of the events on a timeline is from right to left: the most right event goes first into the operator and comes first out in composed form. This is visualized by the t1, t2, t3 and t1‚, t2‚, t3‚ points.

The sourcecode snippets below are kind of abstract. They only show how the operators are used. They are based on 3 streams stream1, stream2 and stream3. Those streams can be seen as general abstract event streams, no matter which datatype they’re really using.

Now let’s come to our multi-valued composition operators…

SelectMany()

The SelectMany operator should be known from LINQ on IEnumerable or others and is reflected by the from keyword in LINQ queries as well. It projects a foreach loop or several cascaded foreach loops, if you execute multiple SelectMany calls after another. Thus the order in which SelectMany is executed is relevant!

The same is true for SelectMany on IObservable. Take the following code:

var composition = 
    from element1 in stream1
    from element2 in stream2
    from element3 in stream3
    select new[] { element1, element2, element3 };

This is equal to the following direct use of SelectMany:

var composition = 
    stream1.SelectMany(
        element1 => stream2.SelectMany(
        element2 => stream3.SelectMany(
        element3 => Observable.Return(new[] { element1, element2, element3 }))));

With this code an example composition scenario could be:

Rx Event Composition - SelectMany

Note that the event order is crucial. The first event on every input stream is in the right order, thus yielding to a first composed event. The second event on input stream 2 first fires when the third event on stream 3 is fired. The resulting event in this case contains the first event from stream 1, since it retains the right event order.

CombineLatest()

CombineLatest is a very useful operator. It fires everytime when one input stream fires. Then the corresponding output stream contains the latest values/events from each input stream.

Here’s the code how to use the operator:

var composition = 
    stream1.CombineLatest(
        stream2, (element1, element2) => new[] { element1, element2 })
    .CombineLatest(
        stream3, (elements12, element3) => new[] { elements12[0], elements12[1], element3 }));

And an example composition follows:

Rx Event Composition - CombineLatest

Zip()

In comparison to CombineLatest and others the Zip operator doesn’t use any results from an event stream multiple times. However it builds a queue of the events on every input stream. When every input stream queue contains at least one event, the Zip composition operator fires. Thus it synchronizes events, whereas every event on an input stream is waiting for an event on the other input streams.

The following code shows the usage of Zip():

var composition = 
    stream1
        .Zip(stream2, (element1, element2) => new[] { element1, element2 })
        .Zip(stream3, (elements12, element3) => new[] { elements12[0], elements12[1], element3 });

And the following diagram shows an example composition. Note that no event is used twice in the output stream and compare this with CombineLatest and the other operators:

Rx Event Composition - Zip

Note that the last event on stream 3 has no „partner“ on the other streams and thus it doesn’t appear on the output stream in this example.

Also notice that Zip() is equal to Join() if used with a conjunction of all input event streams as parameter:

var composition = 
    Observable.Join(
        stream1.And(stream2).And(stream3)
            .Then((element1, element2, element3) => new[] { element1, element2, element3 }));

Join() – 2 out of 3

The Join operator can be used in many cases and is very flexible. The method Join() can take several event compositions as parameters. Everytime one composition fires, the whole operator fires. That said, if you create a conjunction of all your event streams, then the composed output stream will be equal to Zip().

But Join gives you more power and flexibility. One example is a „2 out of 3“-situation. Imagine you have 3 input streams and want Join to fire everytime when 2 input streams contain an event. This can be done by passing Join the appropriate partial compositions as shown in the following code:

var composition = 
    Observable.Join(
        stream1.And(stream2)
            .Then((element1, element2) => new[] { element1, element2 }),
        stream1.And(stream3)
            .Then((element1, element3) => new[] { element1, element3 }),
        stream2.And(stream3)
            .Then((element2, element3) => new[] { element2, element3 }));

This yields to the following resulting composition for example input streams:

Rx Event Composition - Join 2 out of 3

Again in this example, the last event on stream 3 has no partner and doesn’t appear on the output stream (due to an odd amount of events over all input event streams).

ForkJoin()

ForkJoin can be used if you want a composition to fire exactly once. When all input event streams contain at least one event/value, the ForkJoin operator fires with the first event on every stream. Afterwards it doesn’t react to any changes/event on the input streams again.

Here’s the code how ForkJoin can be used:

var composition = 
    Observable.ForkJoin(stream1, stream2, stream3);

And here’s the diagram of a composition example:

Rx Event Composition - ForkJoin

That’s it for now, guys. I hope it’s valuable for you. Since these are just my initial ideas on this topic, I’m reliant on your feedback. Together we could make all those descriptions even more useful.

kick it on DotNetKicks.com