Rx Workshop

Rx Workshop: Observables versus Events

Download this episode

Download Video

Description

Learn about the first-class representation of event streams using observable sequences in Rx, and how to use subjects to publish and subscribe to sources.

Download the Challenge

Embed

Format

Available formats for this video:

Actual format may change based on video formats available and browser capability.

    The Discussion

    • Polity

      Great way of introducing RX Guys!

      Unfortunatly, the given challange does not the challange given in the video.
      >>>> using UnifiedProgrammingModel.DictionaryService;

      Did something get mixed up when posting?

      Challenge 2 - Unified Programming Model.csproj

    • Charles

      @Polity: Thanks. Indeed, the challenges were misnumbered. Sorry about the confusion. This should be fixed now!

    • limes

      Great workshop on exciting concepts - congratulations!

      Had been watching Rx "from the distance" for quite a time, but now I decided to really get hands on.

      So, re Challenge1: my first go on it yielded considerably less and more concise code for events than for Observables! I just could not see how this ought to illustrate the (better) composability of Observables; it seemed rather to the contrary... oops.

      Then I realized that my simple-minded solution for events was way off spec (or what I think is the intended semantics) - while with Observables it all comes out just as it should be automatically.

      However, the problem with the challenge at hand is IMHO that the "test case" in Program.cs does not surface any of the relevant bits. That is, I think, because it only tests the case where there is one subscriber for changes in length. Therefore I'd like to propose the following as expected output:

                  // Expected Output:
                  // *** Events ***
                  // The
                  // Reactive
                  // Extensions
      /*  *  */   // 1: 10
                  // are
      /*  *  */   // 1: 3
      /*  +  */   // new
      /*  +  */   // and          // verify: there is NO change in length!
      /*  *  */   // 1: 13
      /*  +  */   // 2: 13        // verify: 2nd handler gets notified, too!
      /*  +  */   // 2: 1
                  // *** Observables ***
                  // The
                  // Reactive
                  // Extensions
      /*  *  */   // 1: 10
                  // Extensions
                  // are
      /*  *  */   // 1: 3
      /*  +  */   // new
      /*  +  */   // and          // verify: there is NO change in length!
      /*  *  */   // 1: 13
      /*  +  */   // 2: 13        // verify: 2nd handler gets notified, too!
      /*  +  */   // 2: 1

      ...produced by the this test code:

                  Console.WriteLine("*** Events ***");
                  var events = new Events();
                  Action<string> textHandler = text => Console.WriteLine(text);
      /*  *  */   Action<int> lengthHandler1 = length => Console.WriteLine("1: " + length);
      /*  +  */   Action<int> lengthHandler2 = length => Console.WriteLine("2: " + length);
                  events.TextChanged += textHandler;
                  events.OnTextChanged("The");
                  events.OnTextChanged("Reactive");
      /*  *  */   events.LengthChanged += lengthHandler1;
                  events.OnTextChanged("Extensions");
                  events.OnTextChanged("are");
      /*  +  */   events.LengthChanged += lengthHandler2;
      /*  +  */   events.OnTextChanged("new");
      /*  +  */   events.OnTextChanged("and");
                  events.TextChanged -= textHandler;
                  events.OnTextChanged("compositional");
      /*  *  */   events.LengthChanged -= lengthHandler1;
                  events.OnTextChanged("!");
      /*  +  */   events.LengthChanged -= lengthHandler2;
      
                  Console.WriteLine("*** Observables ***");
                  var observables = new Observables();
                  var textChangedSubscription = observables.TextChanged.Subscribe(text => Console.WriteLine(text));
                  observables.OnTextChanged("The");
                  observables.OnTextChanged("Reactive");
      /*  *  */   var lengthSubscription1 = observables.LengthChanged.Subscribe(length => Console.WriteLine("1: " + length));
                  observables.OnTextChanged("Extensions");
                  observables.OnTextChanged("are");
      /*  +  */   var lengthSubscription2 = observables.LengthChanged.Subscribe(length => Console.WriteLine("2: " + length));
      /*  +  */   observables.OnTextChanged("new");
      /*  +  */   observables.OnTextChanged("and");
                  textChangedSubscription.Dispose();
                  observables.OnTextChanged("compositional");
                  lengthSubscription1.Dispose();
                  observables.OnTextChanged("!");
      /*  +  */   lengthSubscription2.Dispose();

      The prefixed comments denote my changes/additions relative to the original. Besides the additional length-change subscriber I have added the new text-change events "new" and "and" after "are", which both should NOT yield a length-change event. The whole point of this is that the stream of events as such should really be independent of the subscribers, particularly:

      a) which / how many subscribers there are and

      b) *when* a particular subscriber actually did subscribe.

      ***Note that the above test case still does not account for the one case where we have both,

      a) a length-change subscriber that subscribed even before the very first *text-change* event (missing in test case, SHOULD see an event since (no text ~ length 0) != (some text ~ length > 0)

      b) a length-change subscriber that subscribed right in between the text-change events of "are" and "new" (present in test case, should NOT see an event since "are" and "new" are both three characters long)

      The problem I'm seeing there is that in the event solution you can get either a) or b) to see the right thing(s), but not both - whereas, again, with Observables you get it right for both just naturally. That is, at least given the restriction that the original [Events|Observables].OnTextChanged(string) firing method may not be changed, I'm not even sure if it's possible at all to get it completely right with only events.

      Well, and there we're back at compositionality: adding a new kind of event (stream) should really only require an additive change.

      So that's what I wanted to share; this my train of thought felt somewhat educational to me. Sorry for that it all got soo lengthy Blushing

      ___

      p.s.: please do try yourself a solution for *** based on events-only, it's one of those things which you deem "definitely doable" at first glance but find yourself utterly puzzled at when having to actually do it. At least that's what applies to me.

       

    • stacyh

      The challenge was cool. 1 line of code for the Rx solution. After that I wasn't too motivated to try the events version. I'm curious what the most concise version of the events solution is so far.

      Great job guys!

    • N2Cheval

      Sorry but I'm having a mental block solving the Observables section. I've checked the main site and looked through the links like HOL202 but it's not clicking yet. Where can I either get some more hints or find the solution.

    • staceyw

      Thanks for video. Is this right?

      namespace IntroductionToRx
      {
          class Events
          {
              public event Action<string> TextChanged;
              private event Action<int> lenChanged;
              private int lastLen;
      
              public virtual void OnTextChanged(string text)
              {
                  var t = TextChanged;
                  if (t != null)
                      t(text);
                  if (lastLen != text.Length)
                  {
                      lastLen = text.Length;
                      OnLenChanged(text.Length);
                  }
              }
              public virtual void OnLenChanged(int len)
              {
                  var t = lenChanged;
                  if (t != null)
                      t(len);
              }
              public event Action<int> LengthChanged
              {
                  add
                  {
                      lenChanged = (Action<int>)Delegate.Combine(lenChanged, value);
                  }
                  remove
                  {
                      lenChanged = (Action<int>)Delegate.Remove(lenChanged, value);
                  }
              }
          }
      }
      
      namespace IntroductionToRx
      {
          class Observables
          {
              ISubject<string> textChanged = new Subject<string>();
              int lastLen;
      
              public virtual void OnTextChanged(string text)
              {
                  textChanged.OnNext(text);
              }
      
              public IObservable<string> TextChanged { get { return textChanged; } }
      
              public IObservable<int> LengthChanged
              {
                  get
                  { // Compose new "event" by filtering existing event.
                      return textChanged.Where(s => s.Length != lastLen).Do(s=>lastLen=s.Length).Select(s=>s.Length);
                  }
              }
          }
      }
      
      Output
      ======================
      *** Events ***
      The
      Reactive
      Extensions
      10
      are
      3
      13
      *** Observables ***
      The
      Reactive
      Extensions
      10
      are
      3
      13

    • phulsmeijer

      How about

      class Events
      {
      public event Action<string> TextChanged;

      public virtual void OnTextChanged(string text)
      {
      var t = TextChanged;
      if (t != null)
      t(text);
      }

      private event Action<int> lengthChanged;

      public virtual void OnLengthChanged(string s)
      {
      if (lengthChanged != null)
      {
      lengthChanged(s == null ? 0 : s.Length);
      }
      }

      public event Action<int> LengthChanged
      {
      add
      {
      if (lengthChanged == null)
      {
      TextChanged += OnLengthChanged;
      }
      lengthChanged += value;
      }
      remove
      {
      lengthChanged -= value;
      if (lengthChanged == null)
      {
      TextChanged -= OnLengthChanged;
      }
      }
      }
      }

      class Observables
      {
      ISubject<string> textChanged = new Subject<string>();

      public virtual void OnTextChanged(string text)
      {
      textChanged.OnNext(text);
      }

      public IObservable<string> TextChanged { get { return textChanged; } }

      public IObservable<int> LengthChanged
      {
      get { return TextChanged.Select((s) => s == null ? 0 : s.Length); }
      }
      }

    • jyates

      Why go to all that trouble? Why not just go..

      class Events
      {
      public event Action<string> TextChanged;
      public event Action<int> LengthChanged;

      public virtual void OnTextChanged(string text)
      {
      var t = TextChanged;
      if (t != null)
      t(text);

      var l = LengthChanged;
      if (l != null)
      l(text == null ? 0 : text.Length);
      }
      }

    • staceyw

      @jyates. I think that may not follow the spirt of the challenge.

      1) Your LengthChanged event is always fired after textchanged, even if Length does not change. So it fires too much.

      2) The challenge had a template to follow and not remove getter/setter (i.e. Add, Remove) on the new event afaict.

    • Kurator

      @staceyw

      Nice code, but your Event-Handler Code never prints the final "13", because when the textHandler gets removed, the lenChanged event never gets fired again.

      Here is my code, I'm not really happy with it - but anyway:

        class Events {
          public event Action<string> TextChanged;
      
          public virtual void OnTextChanged(string text) {
            var t = TextChanged;
            if (t != null) {
              t(text);
            }
          }
      
          private int lastLength;
          private Action<int> Length;
      
          public event Action<int> LengthChanged {
            add {
              Length += value;
              TextChanged += (s => {
                if (s.Length != lastLength) {
                  var l = Length;
                  if (l != null) l(s.Length);
                  lastLength = s.Length;
                }
              });
            }
            remove {
              Length -= value;
            }
          }
        }

    • N2Cheval

      I got lost down the rabbit hole of trying to follow the hint. Here's mine:

      class Events {
              public event Action<string> TextChanged;
      
              public virtual void OnTextChanged(string text) {
                  var t = TextChanged;
                  if (t != null)
                      t(text);
              }
      
              private event Action<string> lc; //lengthChanged
              public event Action<int> LengthChanged {
                  add {
                      lc += x => { Console.WriteLine(x == null ? 0 : x.Length); };
                      TextChanged += lc;
                  }
                  remove {
                      TextChanged -= lc;
                  }
              }
          }
      
      class Observables {
              ISubject<string> textChanged = new Subject<string>();
      
              public virtual void OnTextChanged(string text) {
                  textChanged.OnNext(text);
              }
      
              public IObservable<string> TextChanged { get { return textChanged; } }
      
              public IObservable<int> LengthChanged {
                  get {
                      return textChanged.Select(s => s == null ? 0 : s.Length);
                  }
              }
          }

    • c t

      Cool tutorial. Been meaning for the past year to try and learn Rx, and I like the "baby steps" the videos are teaching and the challenges. Here's mines:

      <code>

      class Events
      {
      public event Action<string> TextChanged;

      public virtual void OnTextChanged(string text)
      {
      var t = TextChanged;
      if (t != null)
      t(text);
      }

      private void OnLengthChanged(string text)
      {
      if (_lengthChanged != null)
      _lengthChanged(text.Length);
      }

      private event Action<int> _lengthChanged;

      public event Action<int> LengthChanged
      {
      add
      {
      TextChanged -= OnLengthChanged;
      TextChanged += OnLengthChanged;

      _lengthChanged = (Action<int>)Delegate.Combine(_lengthChanged, value);
      }
      remove
      {
      _lengthChanged = (Action<int>)Delegate.Remove(_lengthChanged, value);
      }
      }
      }

      class Observables
      {
      ISubject<string> textChanged = new Subject<string>();

      public virtual void OnTextChanged(string text)
      {
      textChanged.OnNext(text);
      }

      public IObservable<string> TextChanged
      {
      get
      {
      return textChanged;
      }
      }

      public IObservable<int> LengthChanged
      {
      get
      {
      var lengthChanged = (from t in TextChanged
      select t.Length);

      return lengthChanged;
      }
      }
      }
      </code>

    • staceyw

      @kurator. My code produced the desired output and the 13. It is not dependant on subscribers of TextChanged event. It is dependant on publisher calling OnTextChanged().

    • wiertmir

      I'm not bothering with comparing to previous lengths (as TextChanged doesn't compare to previous text as well).

      The only complication comes from the fact that you have to unsubscribe the proper subscription (not just the last one), otherwise code would be almost as simple and short as observable version is:

      private Dictionary<Action<int>, Action<string>> actions = new Dictionary<Action<int>, Action<string>>();
      
              public event Action<int> LengthChanged
              {
                  add
                  {
                      Action<string> action = text => value(text == null ? 0 : text.Length);
                      actions.Add(value, action);
                      TextChanged += action;
                  }
                  remove
                  {
                      Action<string> action;
                      if (actions.TryGetValue(value, out action))
                      {
                          actions.Remove(value);
                          TextChanged -= action;
                      }
                  }
              }

    • jyates

      @staceyw: Your code blows up if you pass null to OnTextChanged.

      In answer to your points.

      1) Your LengthChanged event is always fired after textchanged, even if Length does not change. So it fires too much.

      As does your TextChanged (and the original).

      2) The challenge had a template to follow and not remove getter/setter (i.e. Add, Remove) on the new event afaict.

      The Challenge is surely to demonstrate the benefits of using Observables over Events. Forcing the unnecessary use of event accessors gives an unfair advantage to Observables in this example.

      So, here's a revision:

      class Events
      {
      public event Action<string> TextChanged;
      public event Action<int> LengthChanged;
      private string lastText;
      private int lastLength;

      public virtual void OnTextChanged(string text)
      {
      if (text != lastText)
      {
      var handler = TextChanged;

      if (handler != null)
      {
      handler(text);
      }

      int length = text == null ? 0 : text.Length;

      if (lastLength != length)
      {
      var lengthChangedHandler = LengthChanged;

      if (lengthChangedHandler != null)
      {
      lengthChangedHandler(length);
      }
      }

      lastText = text;
      lastLength = length;
      }
      }
      }

      And for those who prefer things to be more verbose:

      class Events
      {
      public event Action<string> TextChanged;
      private event Action<int> lengthChanged;
      private string lastText;
      private int lastLength;

      public virtual void OnTextChanged(string text)
      {
      if (text != lastText)
      {
      var handler = TextChanged;

      if (handler != null)
      {
      handler(text);
      }

      int length = text == null ? 0 : text.Length;

      if (lastLength != length)
      {
      OnLenChanged(length);
      }

      lastText = text;
      lastLength = length;
      }
      }

      public virtual void OnLenChanged(int length)
      {
      var handler = lengthChanged;

      if (handler != null)
      {
      handler(length);
      }
      }

      public event Action<int> LengthChanged
      {
      add
      {
      lengthChanged = (Action<int>)Delegate.Combine(lengthChanged, value);
      }
      remove
      {
      lengthChanged = (Action<int>)Delegate.Remove(lengthChanged, value);
      }
      }
      }

      And a correction to The Observables version to take into account duplicate Text Changes.

      class Observables
      {
      ISubject<string> textChanged = new Subject<string>();

      public virtual void OnTextChanged(string text)
      {
      textChanged.OnNext(text);
      }

      public IObservable<string> TextChanged
      {
      get
      {
      return textChanged.DistinctUntilChanged();
      }
      }

      public IObservable<int> LengthChanged
      {
      get
      {
      return TextChanged.Select(s => s == null ? 0 : s.Length).DistinctUntilChanged();
      }
      }
      }

    • staceyw

      "As does your TextChanged (and the original)."

      But calling TextChanged is up to publisher. So publisher would not call if not changed. Changing that behavior was not part of the challenge as I read it.  Was fun series in any event (pun intended). 

    • Niklas

      These guys should seriously move on to another project by now. How long have they been working on this? Must be several years. Try something new guys, MS is a big company!

    • decrypted

      Guess I'm late to the posts, but I think it can be simplified no?

      private Action<string> LengthHandler;
      
              public event Action<int> LengthChanged
              {
                  add
                  {
                      if (LengthHandler == null)
                          LengthHandler = text => value(text.Length);
                      TextChanged += LengthHandler;
                  }
                  remove
                  {
                      TextChanged -= LengthHandler;
                  }
      
              }

       

      and

       

      public IObservable<int> LengthChanged { get { return textChanged.Select(x => x.Length); } }

    • cbae

      I finally got around to doing this.

      class Events
          {
              public event Action<string> TextChanged;
              private event Action<int> TextLengthChanged;
      
              private int _lastlen = 0;
      
              public virtual void OnTextChanged(string text)
              {
                  var t = TextChanged;
                  if (t != null)
                      t(text);
                  OnLengthChange((text ?? String.Empty).Length);
              }
      
              private void OnLengthChange(int len)
              {
                if (len != _lastlen)
                {
                  var t = TextLengthChanged;
                  if (t != null)
                  {
                    t(len);
                  }
                  _lastlen = len;
                }
              }
      
              public event Action<int> LengthChanged
              {
                  add
                  {
                    TextLengthChanged += value;
                  }
                  remove
                  {
                    TextLengthChanged -= value;
                  }
              }
          }
      
          class Observables
          {
              ISubject<string> textChanged = new Subject<string>();
      
              public virtual void OnTextChanged(string text)
              {
                  textChanged.OnNext(text);
              }
      
              public IObservable<string> TextChanged { get { return textChanged; } }
      
              public IObservable<int> LengthChanged
              {
                  get
                  {
                    return textChanged.Select(s => (s ?? String.Empty).Length);
                  }
              }
          }
      

    • IanG

      The "Download the Challenge" link doesn't work - I'm getting a 404.

    • ruby

      @Kurator: Nice one.

    • Valentin​Kuzub

      I dont  think you should be modifying original OnTextChanged, heres my code

       

       class Events
          {
              private string m_LastString = string.Empty;
      
              public event Action<string> TextChanged;
      
              private Action<string> m_ToAttachToTextChanged;
              private Action<int> m_LengthChanged;
              
              public Events()
              {
                  m_ToAttachToTextChanged = text => {
                      if (m_LastString.Length != (text ?? "").Length) {
                          m_LengthChanged((text ?? "").Length);
                      }
                      m_LastString = text ?? "";
                  };
              }
      
              public virtual void OnTextChanged(string text)
              {
                  var t = TextChanged;
                  if (t != null)
                      t(text);
              }
      
              public event Action<int> LengthChanged
              {
                  add
                  {
                      m_LengthChanged += value;
                      TextChanged += m_ToAttachToTextChanged;
                  }
                  remove
                  {
                      m_LengthChanged -= value;
                      TextChanged -= m_ToAttachToTextChanged;
                  }
              }
          }

    • ada

      Events code is implemented correctly by Kurator and Valentinkuzub.
      Observable is implemented correctly by staceyw.
      All others are wrong regardless of whether they give the correct output.

    • andreas_f

      Events code is implemented correctly by Kurator and Valentinkuzub.

      Kurator's implementation leaks memory as it adds a new delegate (lambda) to TextChanged on each call to add_LengthChanged without removing it in remove_LengthChanged. Valentinkuzub's implementation is nice even though it does some unnecessary work by updating the m_LastString variable multiple times if more than one subscriber registers with LengthChanged. staceyw's implementation for Events is fine too as far as I can tell.

       

      Observable is implemented correctly by staceyw.

      All others are wrong regardless of whether they give the correct output.

      staceyw's implementation of Observable breaks down as soon as more than one observer subscribes to LengthChanged. The state of the (shared) lastLen variable is modified when the first subscriber receives the next string. So depending on evaluation order other subscribers may not see the change as for them the value is filtered out in the Where expression. jyates revised solution on the other hand:

       

       return TextChanged.Select(s => s == null ? 0 : s.Length).DistinctUntilChanged();     

       looks perfect imho.

    • andreas_f

      Here is my preferred solution for Events based on Valentinkuzub's implementation:

      class Events
          {
              private int m_LastLength;
              public event Action<string> TextChanged;
              private Action<int> m_LengthChanged;
      
              public Events()
              {
                  TextChanged += s =>
                                     {
                                         var length = (s ?? string.Empty).Length;
                                         if (length != m_LastLength)
                                         {
                                             m_LastLength = length;
                                             m_LengthChanged(length);
                                         }
                                     };
              }
              
              public virtual void OnTextChanged(string text)
              {
                  var t = TextChanged;
                  if (t != null)
                      t(text);
              }
      
              public event Action<int> LengthChanged
              {
                  add { m_LengthChanged += value; }
                  remove { m_LengthChanged -= value; }
              }
          }

       

       

    Comments closed

    Comments have been closed since this content was published more than 30 days ago, but if you'd like to continue the conversation, please create a new thread in our Forums, or Contact Us and let us know.