Rx Workshop: Programming the Cloud

Download this episode

Download Video

Description

Learn about the special challenges of distributed reactive applications and how to use distributed schedulers to write powerful reactive programs.

Download the Challenge

Embed

Format

Available formats for this video:

Actual format may change based on video formats available and browser capability.

    The Discussion

    • User profile image
      Simply Ged

      I got this to work, although I'm not sure if I could have made the action into an anonymous method?

      public IDisposable Subscribe(IObserver<R> observer)
      {
          GenerateState state = new GenerateState
          {
              condition = this.condition,
              current = this.initial,
              iterate = this.iterate,
              observer = observer,
              resultSelector = this.resultSelector
          };
      
          Action<GenerateState, Action<GenerateState>> action = (_state, self) =>
          {
              if (_state.condition(_state.current))
              {
                  var result = _state.resultSelector(_state.current);
                  _state.observer.OnNext(result);
                  _state.current = _state.iterate(_state.current);
      
                  self(_state);
              }
              else
                  _state.observer.OnCompleted();
          };
      
          return scheduler.Schedule(state, action);
      }
      

    • User profile image
      dmarsh
      Great stuff. Have you guys spoken with the compiler teams about potentially just adding the SerializableAttribute to closure classes and/or anonymous types in their next versions?? Having to manually introduce an MBV wrapper class for scenarios where multiple values need to be passed in seems like a it's going to turn into an exercise in typing.
    • User profile image
      AceHack

      |requires observer != null
      |ensures result != null
      public IDisposable Subscribe(IObserver<R> observer)
      {
          var generateState = new GenerateState()
          {
              condition = condition,
              current = initial,
              iterate = iterate,
              observer = observer,
              resultSelector = resultSelector,
          };
          return scheduler.Schedule(generateState, (state, self) =>
          {
              if (state.condition(state.current))
              {
                  var result = state.resultSelector(state.current);
                  state.observer.OnNext(result);
                  state.current = state.iterate(state.current);
                  self(state);
              }
              else
                  state.observer.OnCompleted();
          });
      }
      

      This is what I came up with.  It runs in AppDomainScheduler named scheduler.  I want to build one of these schedulers to send remote entity framework queries around to different machines using WCF underneath.  Is this possible?  Any good getting started guides?

      Also I run code contracts static checker and it complains that I might return null.  I tried a few things like returning Disposable.Empty if IDisposable returned from Schedule function is null but that failed also.  What is the correct behavior here to satisfy the contract.  Thanks so much.

    • User profile image
      Mouhammad Fakhoury

      Thanks for the intro series on Rx they are very helpful! I was very excited to start using it and the first thing I tried failed miserably! I just did not have enough examples to show us how to hook up Rx to an svc service.

      I just put an IEnumrable off an svc service and tried to subscribe to it from a client. I got an error service abnormally disconnected.

      Can you show us an example on how to hookup Rx client to web service?
      Thanks!

    • User profile image
      gt

      @dmarsh: good point - compiler could figure out serializable & mbr types

      Wonder why the remote queries weren't using the QObservable "provider" where expressions & values were remoted rather than type references...

    • User profile image
      Bnaya

      the following challenge solution is
      using the (T state, Func<IScheduler,T,IDisposable> f) signature
      it more complext than the Action signature
      but may be better for real cloud schenarios

      class GenerateObservable<T, R> : IObservable<R>
      {
      private IScheduler scheduler;
      private GenerateState _state;

      public GenerateObservable(
      T initial, Func<T, bool> condition,
      Func<T, T> iterate,
      Func<T, R> resultSelector,
      IScheduler scheduler)
      {
      _state = new GenerateState
      {
      current = initial,
      condition = condition,
      iterate = iterate,
      resultSelector = resultSelector
      };
      this.scheduler = scheduler;
      }

      public IDisposable Subscribe(IObserver<R> observer)
      {
      var state = _state.Clone(observer);

      Func<IScheduler, GenerateState, IDisposable> _f =
      (scheduler_, state_) =>
      {
      Func<IScheduler, GenerateState, IDisposable> f_ = null;
      f_ =(scd, st) =>
      {
      string domainName = string.Format("Domain = {0}", AppDomain.CurrentDomain.FriendlyName);
      Trace.WriteLine(domainName);
      if (st.condition(st.current))
      {
      var result = st.resultSelector(st.current);
      st.observer.OnNext(result);
      st.current = st.iterate(st.current);
      scd.Schedule(st, f_);
      Scheduler.Immediate.Schedule(st, f_);
      //scd.Schedule(st, f_);
      }
      else
      st.observer.OnCompleted();

      return Disposable.Empty;
      };
      return f_(scheduler_, state_);
      };

      return scheduler.Schedule<GenerateState>(state, _f);
      }

      [Serializable]
      class GenerateState
      {
      public T current;
      public Func<T, bool> condition;
      public Func<T, T> iterate;
      public Func<T, R> resultSelector;
      public IObserver<R> observer;

      #region Clone

      public GenerateState Clone(IObserver<R> observer)
      {
      var state = new GenerateState
      {
      current = current,
      condition = condition,
      iterate = iterate,
      resultSelector = resultSelector,
      observer = observer
      };

      return state;
      }

      #endregion // Clone
      }
      }
      }

    • User profile image
      Madu Alikor

      Could somebody update this series to reflect the changes in the Rx Framework and especially around all the changes around the IScheduler interfaces

    • User profile image
      Aggelos​Biboudis

      I endorse @dmarsh 's thought!

    • User profile image
      maciejw

      I think Erlang is much much simpler... this stuff ... it simply smells from a mile:)

      I know that its possible to do it, but is it easy stuff, can you explain it to a junior quickly?

      what you are trying to do with this azure scheduler is similar to what erlang can do of of a box, you have a problem passing stuff across boundaries, in Erlang you can send new code to execute on remote machine, and do upgrade on a fly, I don't expect that behavior here. Its almost indistinguishable, to pass a message across processes on a single node and across processes on different nodes, I know that only form of cross process communication in Erlang is message passing, and the whole VM and language semantics is built on that notion.

      This code is self-explanatory 

      Pid ! {self(), Message}.

      or

      {process_name, Node} ! {self(), Message}.

      you see the code and you say immediately, a got it, the only what you have to know is that "!" sends a message, whereas in your example, aaaa ok I have to remember that we go remote and we have to pass values in a certain way, and we have to remember that this work should be done on remote sheduler, etc, etc, it is not simple... very long way ahead of you... locally RX is great stuff, and I'm fascinated by it, but this azure scheduler stuff is to put it simply is over the top.

    Comments closed

    Comments have been closed since this content was published more than 30 days ago, but if you'd like to send us feedback you can Contact Us.