{"id":563,"date":"2009-11-28T13:46:05","date_gmt":"2009-11-28T12:46:05","guid":{"rendered":"http:\/\/www.minddriven.de\/?p=563"},"modified":"2010-07-10T10:24:56","modified_gmt":"2010-07-10T09:24:56","slug":"rx-event-composition-multi-valued","status":"publish","type":"post","link":"https:\/\/www.minddriven.de\/index.php\/technology\/dot-net\/rx-event-composition-multi-valued","title":{"rendered":"Rx: Event composition &#8211; multi-valued"},"content":{"rendered":"<p>In preparation of some presentation on the <a href=\"http:\/\/msdn.microsoft.com\/en-us\/devlabs\/ee794896.aspx\">Reactive Extensions (Rx)<\/a> last week I&#8217;ve investigated the composition methods for several event streams on the <code>Observable<\/code> class. What methods are existing and what are they doing? Let&#8217;s take a look&#8230;<\/p>\n<p>This first blog post treats &#8222;multi-valued&#8220; event composition in terms of multiple streams going in and one stream comes out, but each element of the output stream is composed by sub-elements from <em>each<\/em> input stream! There is a <a href=\"http:\/\/www.minddriven.de\/?p=584\">second blog post<\/a> that treats &#8222;single-valued&#8220; event composition as well.<\/p>\n<h3>Example solution<\/h3>\n<p>I&#8217;ve written a little SL3 application where you can discover the behavior of those composition operators. It builds on example code that first has been <a href=\"http:\/\/www.paulbatum.com\/2009\/08\/reacting-to-reactive-framework-part-7.html\">published by Paul Batum<\/a> some months ago and extends his code with some more possibilities. I&#8217;ve hosted the SL3 on Windows Azure where you can test it directly:<\/p>\n<p style=\"text-align: left;\"><a href=\"http:\/\/rxcomposition.cloudapp.net\/\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-570\" style=\"border: 1px solid black;\" title=\"Rx Composition Demo\" src=\"http:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_Demo.png\" alt=\"Rx Composition Demo\" width=\"350\" height=\"289\" srcset=\"https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_Demo.png 350w, https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_Demo-300x247.png 300w\" sizes=\"auto, (max-width: 350px) 100vw, 350px\" \/><\/a><\/p>\n<p>You can download the sourcecode of the Visual Studio 2008 solution as well: [<a href=\"http:\/\/www.minddriven.de\/stuff\/RxEventComposition.zip\">Event Composition Sourcecode (ZIP)<\/a>]<br \/>\nAll you need to run the solution is the latest Rx for SL3 build, Visual Studio 2008 and the Silverlight 3 SDK.<\/p>\n<h3>Powerpoint Sources<\/h3>\n<p>If you want to use my diagrams in your presentations or somewhere else, here is the source PPT file: [<a href=\"http:\/\/www.minddriven.de\/stuff\/RxEventComposition.ppt\">Rx Event Composition PPT<\/a>]<br \/>\nI would be glad if you could leave my name and\/or the blog URL (<a href=\"http:\/\/www.minddriven.de\">www.minddriven.de<\/a>) on every diagram.<\/p>\n<h3>How to read the diagrams and code<\/h3>\n<p>The general visualization concept is taken from Eric Meijer&#8217;s <a href=\"http:\/\/microsoftpdc.com\/Sessions\/VTL04\">PDC session about Rx<\/a>. Thanks Eric for your great work!<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-565\" title=\"Rx Event Composition - General Visualization\" src=\"http:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_General.png\" alt=\"Rx Event Composition - General Visualization\" width=\"650\" height=\"316\" srcset=\"https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_General.png 650w, https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_General-300x145.png 300w\" sizes=\"auto, (max-width: 650px) 100vw, 650px\" \/><\/p>\n<p>The visualization shows an Rx <em>composing operator<\/em> in the middle. On the left there are event input streams, on the right is the composed output stream. The order of the events on a timeline is from right to left: the most right event goes first into the operator and comes first out in composed form. This is visualized by the t<sub>1<\/sub>, t<sub>2<\/sub>, t<sub>3<\/sub> and t<sub>1<\/sub>&#8218;, t<sub>2<\/sub>&#8218;, t<sub>3<\/sub>&#8218; points.<\/p>\n<p>The <em>sourcecode snippets<\/em> below are kind of abstract. They only show how the operators are used. They are based on 3 streams <code>stream1<\/code>, <code>stream2<\/code> and <code>stream3<\/code>. Those streams can be seen as general abstract event streams, no matter which datatype they&#8217;re really using.<\/p>\n<p>Now let&#8217;s come to our multi-valued composition operators&#8230;<\/p>\n<h3>SelectMany()<\/h3>\n<p>The <code>SelectMany<\/code> operator should be known from LINQ on <code>IEnumerable<\/code> or others and is reflected by the <code>from<\/code> keyword in LINQ queries as well. It projects a <code>foreach<\/code> loop or several cascaded <code>foreach<\/code> loops, if you execute multiple <code>SelectMany<\/code> calls after another. Thus the <em>order<\/em> in which <code>SelectMany<\/code> is executed is relevant!<\/p>\n<p>The same is true for <code>SelectMany<\/code> on <code>IObservable<\/code>. Take the following code:<\/p>\n<pre class=\"brush:csharp\">\r\nvar composition = \r\n    from element1 in stream1\r\n    from element2 in stream2\r\n    from element3 in stream3\r\n    select new[] { element1, element2, element3 };\r\n<\/pre>\n<p>This is equal to the following direct use of <code>SelectMany<\/code>:<\/p>\n<pre class=\"brush:csharp\">\r\nvar composition = \r\n    stream1.SelectMany(\r\n        element1 =&gt; stream2.SelectMany(\r\n        element2 =&gt; stream3.SelectMany(\r\n        element3 =&gt; Observable.Return(new[] { element1, element2, element3 }))));\r\n<\/pre>\n<p>With this code an example composition scenario could be:<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-573\" title=\"Rx Event Composition - SelectMany\" src=\"http:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_SelectMany1.png\" alt=\"Rx Event Composition - SelectMany\" width=\"650\" height=\"250\" srcset=\"https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_SelectMany1.png 650w, https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_SelectMany1-300x115.png 300w\" sizes=\"auto, (max-width: 650px) 100vw, 650px\" \/><\/p>\n<p>Note that the event order is crucial. The first event on every input stream is in the right order, thus yielding to a first composed event. The second event on input stream 2 first fires when the third event on stream 3 is fired. The resulting event in this case contains the first event from stream 1, since it retains the right event order.<\/p>\n<h3>CombineLatest()<\/h3>\n<p><code>CombineLatest<\/code> is a very useful operator. It fires everytime when one input stream fires. Then the corresponding output stream contains the <em>latest<\/em> values\/events from each input stream.<\/p>\n<p>Here&#8217;s the code how to use the operator:<\/p>\n<pre class=\"brush:csharp\">\r\nvar composition = \r\n    stream1.CombineLatest(\r\n        stream2, (element1, element2) =&gt; new[] { element1, element2 })\r\n    .CombineLatest(\r\n        stream3, (elements12, element3) =&gt; new[] { elements12[0], elements12[1], element3 }));\r\n<\/pre>\n<p>And an example composition follows:<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-574\" title=\"Rx Event Composition - CombineLatest\" src=\"http:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_CombineLatest.png\" alt=\"Rx Event Composition - CombineLatest\" width=\"650\" height=\"247\" srcset=\"https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_CombineLatest.png 650w, https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_CombineLatest-300x114.png 300w\" sizes=\"auto, (max-width: 650px) 100vw, 650px\" \/><\/p>\n<h3>Zip()<\/h3>\n<p>In comparison to <code>CombineLatest<\/code> and others the <code>Zip<\/code> operator doesn&#8217;t use any results from an event stream multiple times. However it builds a <em>queue<\/em> of the events on every input stream. When every input stream queue contains at least one event, the <code>Zip<\/code> composition operator fires. Thus it synchronizes events, whereas every event on an input stream is waiting for an event on the other input streams.<\/p>\n<p>The following code shows the usage of <code>Zip()<\/code>:<\/p>\n<pre class=\"brush:csharp\">\r\nvar composition = \r\n    stream1\r\n        .Zip(stream2, (element1, element2) =&gt; new[] { element1, element2 })\r\n        .Zip(stream3, (elements12, element3) =&gt; new[] { elements12[0], elements12[1], element3 });\r\n<\/pre>\n<p>And the following diagram shows an example composition. Note that no event is used twice in the output stream and compare this with <code>CombineLatest<\/code> and the other operators:<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-576\" title=\"Rx Event Composition - Zip\" src=\"http:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_Zip.png\" alt=\"Rx Event Composition - Zip\" width=\"650\" height=\"250\" srcset=\"https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_Zip.png 650w, https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_Zip-300x115.png 300w\" sizes=\"auto, (max-width: 650px) 100vw, 650px\" \/><\/p>\n<p>Note that the last event on stream 3 has no &#8222;partner&#8220; on the other streams and thus it doesn&#8217;t appear on the output stream in this example.<\/p>\n<p>Also notice that <code>Zip()<\/code> is equal to <code>Join()<\/code> if used with a conjunction of all input event streams as parameter:<\/p>\n<pre class=\"brush:csharp\">\r\nvar composition = \r\n    Observable.Join(\r\n        stream1.And(stream2).And(stream3)\r\n            .Then((element1, element2, element3) =&gt; new[] { element1, element2, element3 }));\r\n<\/pre>\n<h3>Join() &#8211; 2 out of 3<\/h3>\n<p>The <code>Join<\/code> operator can be used in many cases and is very flexible. The method <code>Join()<\/code> can take several event compositions as parameters. Everytime one composition fires, the whole operator fires. That said, if you create a conjunction of all your event streams, then the composed output stream will be equal to <code>Zip()<\/code>.<\/p>\n<p>But <code>Join<\/code> gives you more power and flexibility. One example is a &#8222;2 out of 3&#8220;-situation. Imagine you have 3 input streams and want <code>Join<\/code> to fire everytime when 2 input streams contain an event. This can be done by passing <code>Join<\/code> the appropriate partial compositions as shown in the following code:<\/p>\n<pre class=\"brush:csharp\">\r\nvar composition = \r\n    Observable.Join(\r\n        stream1.And(stream2)\r\n            .Then((element1, element2) =&gt; new[] { element1, element2 }),\r\n        stream1.And(stream3)\r\n            .Then((element1, element3) =&gt; new[] { element1, element3 }),\r\n        stream2.And(stream3)\r\n            .Then((element2, element3) =&gt; new[] { element2, element3 }));\r\n<\/pre>\n<p>This yields to the following resulting composition for example input streams:<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-578\" title=\"Rx Event Composition - Join 2 out of 3\" src=\"http:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_Join2o3.png\" alt=\"Rx Event Composition - Join 2 out of 3\" width=\"650\" height=\"314\" srcset=\"https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_Join2o3.png 650w, https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_Join2o3-300x144.png 300w\" sizes=\"auto, (max-width: 650px) 100vw, 650px\" \/><\/p>\n<p>Again in this example, the last event on stream 3 has no partner and doesn&#8217;t appear on the output stream (due to an odd amount of events over all input event streams).<\/p>\n<h3>ForkJoin()<\/h3>\n<p><code>ForkJoin<\/code> can be used if you want a composition to fire exactly once. When all input event streams contain at least one event\/value, the <code>ForkJoin<\/code> operator fires with the first event on every stream. Afterwards it doesn&#8217;t react to any changes\/event on the input streams again.<\/p>\n<p>Here&#8217;s the code how <code>ForkJoin<\/code> can be used:<\/p>\n<pre class=\"brush:csharp\">\r\nvar composition = \r\n    Observable.ForkJoin(stream1, stream2, stream3);\r\n<\/pre>\n<p>And here&#8217;s the diagram of a composition example:<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-580\" title=\"Rx Event Composition - ForkJoin\" src=\"http:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_ForkJoin.png\" alt=\"Rx Event Composition - ForkJoin\" width=\"650\" height=\"249\" srcset=\"https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_ForkJoin.png 650w, https:\/\/www.minddriven.de\/wp-content\/uploads\/2009\/11\/Rx_Composition_ForkJoin-300x114.png 300w\" sizes=\"auto, (max-width: 650px) 100vw, 650px\" \/><\/p>\n<p>That&#8217;s it for now, guys. I hope it&#8217;s valuable for you. Since these are just my initial ideas on this topic, I&#8217;m reliant on your feedback. Together we could make all those descriptions even more useful.<\/p>\n<p><a href=\"http:\/\/www.dotnetkicks.com\/kick\/?url=http%3a%2f%2fwww.minddriven.de%2f%3fp%3d563\"><img decoding=\"async\" src=\"http:\/\/www.dotnetkicks.com\/Services\/Images\/KickItImageGenerator.ashx?url=http%3a%2f%2fwww.minddriven.de%2f%3fp%3d563\" border=\"0\" alt=\"kick it on DotNetKicks.com\" \/><\/a><\/p>\n","protected":false},"excerpt":{"rendered":"<p>In preparation of some presentation on the Reactive Extensions (Rx) last week I&#8217;ve investigated the composition methods for several event streams on the Observable class. What methods are existing and what are they doing? Let&#8217;s take a look&#8230; This first blog post treats &#8222;multi-valued&#8220; event composition in terms of multiple streams going in and one &hellip; <a href=\"https:\/\/www.minddriven.de\/index.php\/technology\/dot-net\/rx-event-composition-multi-valued\" class=\"more-link\"><span class=\"screen-reader-text\">Rx: Event composition &#8211; multi-valued<\/span> weiterlesen<\/a><\/p>\n","protected":false},"author":2,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[6],"tags":[198,197,181,195,179,178],"class_list":["post-563","post","type-post","status-publish","format-standard","hentry","category-dot-net","tag-combining","tag-composition","tag-events","tag-reactive-extensions-for-net","tag-reactive-framework","tag-rx"],"_links":{"self":[{"href":"https:\/\/www.minddriven.de\/index.php\/wp-json\/wp\/v2\/posts\/563","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.minddriven.de\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.minddriven.de\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.minddriven.de\/index.php\/wp-json\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/www.minddriven.de\/index.php\/wp-json\/wp\/v2\/comments?post=563"}],"version-history":[{"count":18,"href":"https:\/\/www.minddriven.de\/index.php\/wp-json\/wp\/v2\/posts\/563\/revisions"}],"predecessor-version":[{"id":933,"href":"https:\/\/www.minddriven.de\/index.php\/wp-json\/wp\/v2\/posts\/563\/revisions\/933"}],"wp:attachment":[{"href":"https:\/\/www.minddriven.de\/index.php\/wp-json\/wp\/v2\/media?parent=563"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.minddriven.de\/index.php\/wp-json\/wp\/v2\/categories?post=563"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.minddriven.de\/index.php\/wp-json\/wp\/v2\/tags?post=563"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}