Bart De Smet: Interactive Extensions (Ix)

Play Bart De Smet: Interactive Extensions (Ix)

The Discussion

• Love the entire Rx concept, and Ix sounds equally as cool. It would be great to have some more documentation on all of these cool features like IQbservable and the providers..... maybe samples (twitter?)

• Great video! Exploding mind with knowledge right now. But great to see how it Rx and Ix work together.

• Interesting video, (I'll most likely need multiple viewing to understand everything). So far it has raise d to me a missing ability, Could we have an observable(?) that issues on multiple publishers? Something for Bart to think about on your next plane trip.

Would that be a dual of a ISubscriber or IBuffer?

I call the process a MultiYield..

```def MultiYield: EvensAndOdds xs
IE[T] -> { Evens: IE[T]
Odds: IE[T]
} // The out could be a named Tuple.
f(xs){ foreach(x in xs)
match (x Mod 2)
| 0 : Yield Evens x // Read as Yield onto Evens the value x
| 1 : Yield  Odds x // Read as Yield onto odds the value x
}

def MultiYield: LT_EQ_GT xs value
IE[T] -> { LT: IE[T]
EQ: IE[T]
GT: IE[T] }  where T is IComparable[T]
f(xs){ foreach(x in xs)
match x.ComparedTo(value)
| < 0 : Yield LT x
| = 0 : Yield EQ x
| > 0 : Yield GT x
}```

Example of QuickSort using the MultiYield.

```def fn: QSort xs
// Quick Sort using multi yielder
[Collection] -> [AscendingCollection]  // Note: The are traits. The input must have, and the output has.
f(xs){ xs.count
| = 0 : []
| = 1 : xs
| > 0 : { Pivot := xs[Math.Random[xs.count]]
m := LT_EQ_GT xs pivot // use the MultiYield.
QSort( m.LT ) + m.EQ + QSort( m.GT ) // Concatenate the results.
}
```

The xs sequence is iterator over just once for each section ( rather than the usually three times)..

Bart: I (currently) don't have any idea how to implement them.

Would the MultiYield be useful?

• Of course in the case of randomness, you could simply parameterize on a seed value in this case (although obviously randomness is chosen for the sake of showcasing how to deal with side-effects):

```public static IEnumerable<int> Roll(int min, int max, int? seed = null)
{
Contract.Requires(min < max);
Contract.Ensures(Contract.Result<IEnumerable<int>>() != null);
Contract.Ensures(Contract.Result<IEnumerable<int>>().All(x => min <= x && x <= max));

var r = seed.HasValue ? new Random(seed.Value) : new Random();
while (true)
yield return r.Next(min, max + 1);
}
```

I really like both Publish and Share. In particular Publish sounds like a clever alternative to Memoize.

Must. Use. More. Rx. &. Ix. Vacation. Awaits.

• In case you want to use NuGet to install Ix, look for the Ix_Experimental-Main and Ix_Experimental-Providers packages (following our naming convention used in Rx).

• ,CKurt wrote

Great video! Exploding mind with knowledge right now. But great to see how it Rx and Ix work together.

I was just thinking that, Rx is all shiny and new and perhaps is stealing the limelight from our old workhorse IEnumerable, even though its just as important as IObservable

Great to see some innovation happening in the Interactive space as well

Oh and i cant pass up a challange like that  the awnsers should be

• Publish: Result should be correct if Zip calls GetEnumerator for both IEs before starting to call OnNext on them, if not it wont be correct
• Memoize: Since it always returns [a view of] the original IE from the start, the result should be correct
• Shared: The result should be incorrect (every value will be devided by the next value not by itself) because the internal IE is beeing consumed

Another interesting twist whould be (i think)

a = xs.Publish(xs_ => xs_.Skip( 1 ).Zip( xs_, (l,r) => l / r ) )

and

b = xs.Publish(xs => xs_.Zip( xs_.Skip( 1 ), (l,r) => r / l ) )

Would a and b be the same sequence? will they even be the same length? (my bet is on 'no', but can you figure out why?)

Really interesting stuff looking forward to hear more about Ix, Bart and the Rx team

• I think that IObservable<T> is not really the dual of IEnumerable<T>. IObservable<T> is really dual to IEnumerable<Tuple<Time,T>>. Now you *can* write all operators for both.

• @Jules: Thanks for your post. First things first: to be clear, you can write the time-based operations for IE<T> without any change to the type. Where we disagree is in the interpretation of time. Let's go into more details.

First, a quick reminder about the duality between IE<T> and IO<T>. If you distill the essence of the interfaces using arrows, you get both isomorphisms (ignoring the aspect of resource maintenance, captured by IDisposable, since we only care about the data flow aspect):

• IE<T> ~ () -> (() -> T | Exception | void)
• IO<T> ~ ((T | Exception | void) -> ()) -> ()

The only thing we did is reverse the arrows, and end up with the dual type. Interfaces are a mere manifestation of the lack of discriminated union types, and have a particular imperative feeling to it that some consider to be "more clear".

With regards to the time notion now. Your interpretation of time is one that treats time as data. The universe you describe is one where Tuple<Time, T> is a coordinate in a n+1 dimensional space with n being the number of spatial coordinates and 1 reflecting the time dimension. The spatial coordinates are defined by the CLR type system and heap here: typeof(T) and the particular instance of the type. This approach is totally valid, and in fact we have it in Rx in the form of Timestamped<T>, which is isomorphic to Tuple<Time, T>.

However, there's a different interpretation of time that relates to the control flow aspect of computation. Rather than associating time with data (by treating it as data itself), time can be related to the execution itself. In Rx, this is embodied by the IScheduler interface which carries a clock. In this world, the scheduler defines the universe of space (where things happen, e.g. on the thread pool), and time (when things happen, related to the scheduler's clock). Going back to machine architecture analogies, you could see the number of instructions executed (or the number of CPU clock ticks) as the clock.

Operators like Timeout have been defined in terms of the latter interpretation where time is extrinsic to the data (observe the IScheduler parameter passed in). For example, upon arrival of an OnNext message, the scheduler can be invoked to run a timeout timer to monitor the next OnNext message coming down the pipe. In this worldview, time is associated with the execution of method calls. You can do a very similar thing for IE<T>, now based on the dual MoveNext method. Again, you observe the time behavior of the environment that's sending you the data by running a local timer (ignoring relativistic effects), measuring the time that elapses between method calls: the incoming MoveNext call and the time it takes for the environment to respond to the outgoing MoveNext request. To do so, you need to rely on a clock which is extrinsic to the data, e.g. provided by the IScheduler interface.

That's not to say you can't define time-based operations using an intrinsic notion of time, e.g. based on Timestamped<T>. In that case, you're performing distance measurements and coordinate transformations on the data itself, which happens to carry time information. I invite the niners to think about all kinds of operators in terms of this (e.g. Select translates an object in space). This approach works very well for analysis of historical data, e.g. log data. However, you could do this too by using virtual time scheduling.

The discussion where to put time is very similar to the one on where to put other aspects of the control flow such as exceptions. Do you treat those as code or as data? In Rx and Ix, we treat exceptions as control flow aspects (cf. OnError and the dual of MoveNext throwing) at the interface level (note for Ix: the C# language doesn't provide checked exceptions, hence it doesn't show up in the IR<T> interface explicitly). However, one can employ reification to treat those control flow mechanisms as part of the data flow, using operators like Materialize. Similarly, the time aspect can be moved from control flow to data flow using the Timestamp operator. Both treatments are valid, and it depends on your scenario which one is more convenient than the other.

Finally, the discussion on extrinsic or intrinsic notion of time with regards to the data is one of the essential differences between Rx and StreamInsight. In the latter technology, every event carries its own notion of time (cf. CepStream<T>), including duration. In Rx, we model those things differently using the concept of reactive coincidence. Again, because both notions make sense, there are conversions possible between both worlds; e.g. StreamInsight has an IO<T>-based programming model exposed.

• Just just wondering how much overlap with the TPL Dataflow team your team has if any, there seems to be quite a bit of common ground with you guys coming at the problem space from high level down and TPL from the low level up?  For example you have .Zip and they have JoinBlock<T1,T2...>.  Also I notice they have direct support for AsObservable and AsObserver on their source and target types.

• @scyonx: We'll work on providing more samples on the subject going forward. Stay tuned.

• @AdamSpeight2008: You're lucky I have some flights coming up pretty soon . In the meantime, here are a few things to think about. How do grouping and windowing operators compare to the idea of a multi-yield? What's the lifetime of the underlying enumerator in case you have multiple sequences that are artifacts of applying an operator over a shared sequence (such as multi-yield)? Think about both the acquisition of the enumerator as well as the disposal thereof.

• @PerfectPhase: TPL Dataflow has some overlap for sure, but at the same time the two technologies can work together using - as you mention - bridging facilities.

The main difference is this: with Rx you get a very high degree of compositionality thanks to the composition taking place at the IObservable<T> level. If you look at that layer as the declarative layer, you can think of deeper layers in Rx as operational layers. Concepts that fit in there include subjects and schedulers.

At that level, it's much harder to compose because of the much more stateful nature of the building blocks. You can already see this being the case in the Rx API where the binding operators meet their subjects, requiring different - imperative-style - interaction through the notion of IConnectableObservable<T>.

Similarly, composition of things like "backpressure" on an observable channel using queues and producer/consumer interaction is much harder. Say you have multiple sources coming together and you need to throttle the rate at which things are being sent. How do operators propagate the signals required to apply backpressure etc? Typically you deliver those out-of-band with regards to the composed query operators, requiring quite some plumbing.

All in all, your analysis of "high level down" and "low level up" is pretty accurate. It's much (to use an analogy with a different domain) like the contrast between the querying experience in an RDBMS versus leveraging ISAM style access. The level at which you glue things together is different, and hence you have different levels of high-level expressiveness and/or low-level control.

The nice thing is you can mix and match the technologies depending on your needs. Also, preferences in programming style contribute to those decisions: functional declarative state-poor combinator-centric query style, versus a more imperative state-rich approach based on message-passing actor/agent style. And thanks to the bridges available on the building blocks, you can sandwich Rx inside TPL Dataflow and vice versa, if needed.

Also make sure to have a look at the documents available on the TPL Dataflow website. Also see this discussion that summarizes some of the observations (pun intended) made here.

• @bdesmet:  This is the closest I can come to a MultiYield, so I've implemented QSort

```  ' def multiYield: LEG *xs value
'   IO<T> -> { L: IO<T> , E: IO<T> , G: IO<T> }
'   f(xs,value){ for x in xs
'           match x.ComparedTo(value)
'             | < 0 : Yield x On L
'             | = 0 : Yield x On E
'             | > 0 : Yield x On G
'       }
<Runtime.CompilerServices.Extension()>
Public Function LEG(Of T As IComparable(Of T))(ByVal xs As IObservable(Of T), ByVal value As T) _
As Tuple(Of IObservable(Of T), IObservable(Of T), IObservable(Of T))
Dim xs_g = xs.GroupBy(Function(x) Math.Sign(x.CompareTo(value)))
Return New Tuple(Of IObservable(Of T), IObservable(Of T), IObservable(Of T))(
(From i In xs_g Where i.Key < 0).SelectMany(Function(ii) ii),
(From i In xs_g Where i.Key = 0).SelectMany(Function(ii) ii),
(From i In xs_g Where i.Key > 0).SelectMany(Function(ii) ii))
End Function

' def fn: QSort *xs
'   IO<T> -> IO<T>
'   f(xs){ match xs.IsEmpty.FirstOrDefautl
'            |  True : xs
'            | False : { Pivot = Random(xs.Count.FirstOrDefault
'                          ios = xs.LEG(Pivot)
'                          ios.L.QSort + ios.E + ios.G.QSort
'                      }
'        }
<Runtime.CompilerServices.Extension()>
Public Function QSort(Of T As IComparable(Of T))(ByVal xs As IObservable(Of T)) As IObservable(Of T)
If xs.IsEmpty.FirstOrDefault() Then Return xs
Dim r = New Random()
Dim pivot = xs.Skip(r.Next(xs.Count().Single)).FirstOrDefault
Dim q = xs.LEG(pivot)
Return q.Item1.QSort.Concat(q.Item2).Concat(q.Item3.QSort)
End Function
```

• @bdesmet: Thanks for the explanation, that's interesting stuff

• The function "Return" needs a new name. This is a keyword in VB, which means you cannot write

x = Return(y)

I know this isn't as much of a problem in C#, since you would have to write EnumerableEx.Return anyways, but it is still bad form.

• The Interactive Extensions (Ix) link is broken.

Luckily it can be found in nuget.

• @evarlast:Thanks for bringing the broken link to our attention. We'll work on fixing it. In fact, we have a new version of Ix available (v1.1.10823). Hiding the old version from the Download Center caused the link breakage. Sorry for the inconvenience.