1, reactive programming (Reactive Programming)

1. What is reactive programming: Reactive programming, referred to as Rx.net, is an asynchronous programming model based on the observer pattern written in LINQ style. Simply say Rx = Observables + LINQ + Schedulers.

2. Why is this style programming model generated?  An example of using events:

var watch = new FileSystemWatcher();
  watch.Created += (s, e) =>
      var fileType = Path.GetExtension(e.FullPath);
      if (fileType.ToLower() == "jpg")
          //do some thing

This code defines a FileSystemWatcher and then registers an anonymous function on the Watcher event. The use of events is an imperative code style. Is there a way to write a more declarative code style? We know that using higher-order functions can make the code more declarative. The entire LINQ extension is a high-level function library. The common LINQ style code is as follows:

var list = Enumerable.Range(1, 10)
                .Where(x => x > 8)
                .Select(x => x.ToString())

Can I use such a style to write events?

3, the event stream
LINQ is a series of extension methods for IEnumerable<T>, we can simply think of IEnumerable<T> as a collection. When we put the event in a time range, the event becomes a collection. We can understand this collection of events as an event stream.

The emergence of the event stream gave us an inspiration for LINQ operations on events.

2, two important types in reactive programming

The event model is essentially the observer pattern, so IObservable<T> and IObserver<T> are also the highlights of the model. Let’s take a look at the definition of these two interfaces of reactive programming:

public interface IObservable<out T>
      //Notifies the provider that an observer is to receive notifications.
      IDisposable Subscribe(IObserver<T> observer);
public interface IObserver<in T>
    //Notifies the observer that the provider has finished sending push-based notifications.
    void OnCompleted();
    //Notifies the observer that the provider has experienced an error condition.
    void OnError(Exception error);
    //Provides the observer with new data.
    void OnNext(T value);

These two names accurately reflect the two responsibilities: IObservable<T>-observable, IObserver<T>-observer.

IObservable<T> has only one method, Subscribe (IObserver<T> observer), which is used to register an observer with the event stream.

IObserver<T> has three callback methods. When a new event occurs in the event stream, it will call back OnNext (T value), and the observer will get the data in the event. OnCompleted() and OnError(Exception error) are used to notify the observer that the event stream has ended and the event stream has an error.

Obviously the event flow is observable, we use Rx to rewrite the above example:

Observable.FromEventPattern<FileSystemEventArgs>(watch, "Created")
                .Where(e => Path.GetExtension(e.EventArgs.FullPath).ToLower() == "jpg")
                .Subscribe(e =>
                    //do some thing

Note: Using reactive programming (Rx) programming under .net requires installing the following Nuget components:

Install-Package Rx-main

3, use Reactive programming (Rx) in UI programming

The reactive programming (Rx) model not only makes the code more declarative, but Rx can also be used in UI programming.

1, the first piece of reactive programming (Rx) code in UI programming

To simply show how to use Rx in UI programming, let’s take the Button in Winform as an example to see how the event model differs from reactive programming (Rx).

private void BindFirstGroupButtons()
     btnFirstEventMode.Click += btnFirstEventMode_Click;
 void btnFirstEventMode_Click(object sender, EventArgs e)
     MessageBox.Show("hello world");

A Button has been added, and a dialog pops up when you click on the Button. Use Rx to do the same:

//get Button Click  event
var clickedStream = Observable.FromEventPattern<EventArgs>(btnFirstReactiveMode, "Click");
//add a Subscrible on sream 
clickedStream.Subscribe(e => MessageBox.Show("Hello world"));

A friend pointed out that the string “Click” is very unpleasant, which is really a problem. Since Click is an event type, you can’t get its name in the expression tree. Finally, I want to use the extension method to achieve:

public static IObservable<EventPattern<EventArgs>> FromClickEventPattern(this Button button)
     return Observable.FromEventPattern<EventArgs>(button, "Click");
 public static IObservable<EventPattern<EventArgs>> FromDoubleClickEventPattern(this Button button)
     return Observable.FromEventPattern<EventArgs>(button, "DoubleClick");

The types of events we usually use are just a few, and can be implemented temporarily through this scheme. This scheme is not perfect, but it is much more elegant than using strings directly.

                .Subscribe(e => MessageBox.Show("hello world"));

2. There is a very common scenario in UI programming: when the registrant of an event blocks the thread, the entire interface is in a suspended state. The asynchronous model in .net has evolved from APM, EAP, and TPL until the emergence of the async/await model makes asynchronous programming easier to use. Let’s take a look at the interface animation:

void btnSecondEventMode_Click(object sender, EventArgs e)
     btnSecondEventMode.BackColor = Color.Coral;
     lblMessage.Text = "event mode";

Thread.Sleep (2000); simulates a long-term operation. When you click the Button, the entire interface is in a suspended state and the program at this time cannot respond to other interface events. The traditional solution is to use multithreading to solve the fake death:

BtnSecondEventAsyncModel.BackColor = Color.Coral;
  Task.Run(() =>
      Action showMessage = () => lblMessage.Text = "async event mode";

The complexity of this code is that ordinary multithreading can’t operate on the UI. In Winform, you need to use Control.BeginInvoke(Action action) to wrap the UI operation in multithreading, WPF should use Dispatcher.BeginInvoke. (Action action) wrapper.

reactive programming (Rx.net) example:

                .Subscribe(e =>
                    Observable.Start(() =>
                        btnSecondReactiveMode.BackColor = Color.Coral;
                        return "reactive mode";
                        .Subscribe(x =>
                            lblMessage.Text = x;

A SubscribeOn(ThreadPoolScheduler.Instance) ran a time-consuming operation in a new thread, and ObserveOn(this) lets the latter observer run in the UI thread.

Note: Using ObserveOn(this) requires Rx-WinForms

Install-Package Rx-WinForms

Although this example is successful, there is no obvious improvement over the BeginInvoke (Action action) scheme. Using Ovservable.Start() again in an event stream to open a new observer is even more confusing. This is not a problem with Rx, but the event model has limitations in UI programming: it is inconvenient to use asynchronous, not testable. The UI programming model with XMAL and MVVM as the core will be dominant in the future. Because the UI can be bound to a Command in MVVM, the event model is decoupled.

The open source project ReactiveUI provides a Rx-based UI programming solution that can be used in UI programming with XMAL and MVVM as the core, such as Xamarin, WFP, Windows Phone8 and other developments.

Note: Using ObserveOn() in WPF requires Rx-WPF to be installed.

Install-Package Rx-WPF

3, another reactive programming (Rx) example, let us feel the charm of reactive programming (Rx)

There are two Buttons on the interface for the + and – operations, click the + button for +1, click the – button for -1, and the final result is displayed in a Label.
Such a requirement to use the classic event model only needs to maintain an internal variable, and the Click event of the two buttons can add 1 or minus 1 to the variable respectively.
As a functional programming model, reactive programming (Rx) emphasizes immutable-unvariable, that is, does not use variables to maintain internal state.

var increasedEventStream = btnIncreasement.FromClickEventPattern()
    .Select(_ => 1);
var decreasedEventStream = btnDecrement.FromClickEventPattern()
    .Select(_ => -1);
    .Scan(0, (result, s) => result + s)
    .Subscribe(x => lblResult.Text = x.ToString());

This example uses the “predicate” of IObservable<T> to do some work on the event stream.

  • Select is similar to the Linq operation, which transforms the events of the two buttons into IObservable<int>(1) and IObservable<int>(-1) respectively;
  • The Merge operation combines two event streams into one;
  • Scan is a bit more complicated, doing a folding operation on the event stream, giving an initial value, and accumulating the result and the next value through a function;

Let’s take a look at the “predicates” commonly used in IObservable<T>

4, the predicate in IObservable<T> of reactive programming (rx)

IObservable<T> is inspired by LINQ, so many operations are similar to those in LINQ, such as Where, First, Last, Single, Max, Any.
There are also some “predicates” that are new, such as the above mentioned “Merge”, “Scan”, etc. In order to understand the meaning of these “predicates”, we invite an artifact RxSandbox .

1, Merge operation, from the following figure we can clearly see that the Merge operation combines the events in the three event streams on the same timeline.


reactive programming sandbox

reactive programming sandbox

2. The Where operation is to filter out events based on specified criteria.

reactive programming sandbox config where

reactive programming sandbox config where

With this tool we can more easily understand the use of these “predicates”.

5, the creation of IObservable<T> of reactive programming (rx)

The Observable class provides a number of static methods for creating IObservable<T>. In the previous example, we used the FromEventPattern method to convert events to IObservable<T>, and then look at other methods.

Return can create a specific IObservable<T>:

public static void UsingReturn()
     var greeting = Observable.Return("Hello world");

Create can also create an IObservable<T> with a richer overload:

public static void UsingCreate()
     var greeting = Observable.Create<string>(observer =>
         observer.OnNext("Hello world");
         return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));

The Range method can produce a specified range of IObservable<T>

Observable.Range(1, 10)
          .Subscribe(x => Console.WriteLine(x.ToString()));

The Generate method is a reverse operation of a folding operation, also known as the Unfold method:

public static void UsingGenerate()
     var range = Observable.Generate(0, x => x < 10, x => x + 1, x => x);

The Interval method can generate an IObservable<T> at regular intervals:

           .Subscribe(x => Console.WriteLine(x.ToString()));

The Subscribe method has an overload that defines a callback function for the Observable exception and the Observable completion.

Observable.Range(1, 10)
          .Subscribe(x => Console.WriteLine(x.ToString()), e => Console.WriteLine("Error" + e.Message), () => Console.WriteLine("Completed"));

You can also convert IEnumerable<T> to an IObservable<T> type:

Enumerable.Range(1, 10).ToObservable()
          .Subscribe(x => Console.WriteLine(x.ToString()));

You can also convert IObservable<T> to IEnumerable<T>

var list= Observable.Range(1, 10).ToEnumerable();
reactive programming concept

reactive programming concept

6 , reactive programming concept Scheduler

The core of reactive programming concept (Rx) is observer mode and asynchronous, and reactive programming concept (Rx) Scheduler is born for asynchronous. We have already touched some specific Schedulers in the previous examples, so what do they specifically do?

1, first look at the following code:

public static void UsingScheduler()
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Create<int>(
    o =>
        Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
        Console.WriteLine("Finished on threadId:{0}",Thread.CurrentThread.ManagedThreadId);
        return Disposable.Empty;
    o => Console.WriteLine("Received {1} on threadId:{0}",Thread.CurrentThread.ManagedThreadId,o),
    () => Console.WriteLine("OnCompleted on threadId:{0}",Thread.CurrentThread.ManagedThreadId));
    Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

When we don’t use any Scheduler, the entire Rx observer and theme run in the main thread, which means that it is not executed asynchronously. As the screenshot below, all operations are run in a thread with threadId=1.

reactive programming .net example 1

reactive programming .net example 1

When we use SubscribeOn(NewThreadScheduler.Default) or SubscribeOn(ThreadPoolScheduler.Instance), both the observer and the theme run in the thread with theadId=3.

reactive programming .net example 2

reactive programming .net example 2

The difference between the two Scheduler is that NewThreadScheduler is used to perform a long-term operation, and ThreadPoolScheduler is used to perform short-time operations.

2, the difference between SubscribeOn and ObserveOn

The above example only shows the SubscribeOn() method, and there is an ObserveOn() method in Rx. There is one such problem on stackoverflow: What’s the difference between SubscribeOn and ObserveOn , a simple example that explains this difference very well.

public static void DifferenceBetweenSubscribeOnAndObserveOn()
    Thread.CurrentThread.Name = "Main";
    IScheduler thread1 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread1" });
    IScheduler thread2 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread2" });
    Observable.Create<int>(o =>
        Console.WriteLine("Subscribing on " + Thread.CurrentThread.Name);
        return Disposable.Create(() => { });
    .Subscribe(x => Console.WriteLine("Observing '" + x + "' on " + Thread.CurrentThread.Name));

When we comment out: SubscribeOn(thread1) and ObserveOn(thread2) the results are as follows:

reactive programming .net example 3

reactive programming .net example 3

  • Both the observer and the theme run in the thread whose name is Main.
  • When we let go of SubscribeOn(thread1):
reactive programming .net example 4

reactive programming .net example 4

  • The theme and the observer are running in the thread named Thread1
  • When we comment out: SubscribeOn(thread1), the result when we release ObserveOn(thread2) is as follows:
reactive programming .net example 5

reactive programming .net example 5

  • The theme runs in the main thread whose name is Main, and the observer runs in the thread with name=Thread2.
  • When we release both SubscribeOn(thread1) and ObserveOn(thread2), the results are as follows:
reactive programming .net example 6

reactive programming .net example 6

  • The theme runs in a thread whose name is Thread1, and the observer runs in a thread named Thread2.

At this point, the conclusion should be very clear: SubscribeOn() and ObserveOn() control the asynchronous of the subject and the observer, respectively.

7, other reactive programming (Rx) resources

In addition to Rx.net in .net, other languages ​​have also launched their own Rx framework.


Reference resources: