1 Rx.net in Action:Introduction

An API for asynchronous programming with observable streams.
ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming .

About Reactive , there is a Reactive Manifesto [Reactive Declaration]: Reactive System (Reactive System) has the following characteristics: Responsive, Resilient, Elastic and Message Driven.

Rx.net in Action:Introduction

Rx.net in Action:Introduction

Obviously developing a reactive system is not straightforward.
That article will talk about how to use reactive programming based on Rx.NET to develop a more flexible, loosely coupled, scalable reactive system.

2.Rx.net in Action:Programming paradigm

Before we begin, it is necessary to understand the following programming paradigms: imperative programming, declarative programming, functional programming, and reactive programming.

Imperative programming : The main idea of ​​imperative programming is to focus on the steps performed by the computer, that is, telling the computer step by step what to do first.

//1. var
List<int> results = new List<int>();
//2. loop
foreach(var num in Enumerable.Range(1,10))
    //3. conditon
    if (num > 5)
        //4.  logic

Declarative programming : Declarative programming is the logic that expresses program execution in the form of data structures. Its main idea is to tell the computer what to do, but not specify what to do.

var nums = from num in Enumerable.Range(1,10) where num > 5 select num

Functional programming : The main idea is to write the operation as much as possible into a series of nested function calls.

Enumerable.Range(1, 10).Where(num => num > 5).ToList().ForEach(Console.WriteLine);

reactive Programming : reactive programming is a programming paradigm for data flow and change propagation designed to simplify the implementation of event-driven applications. reactive programming focuses on how to create a data flow that is dependent on the change and respond to changes.

IObservable<int> nums = Enumerable.Range(1, 10).ToObservable();

IDisposable subscription = nums.Where(num => num > 5).Subscribe(Console.WriteLine);


3. Rx.net in Action:Hello Rx.NET

Start with a simple demo.
Suppose we now simulate the electric kettle boiling water and output the current water temperature in real time. Generally we will do this:

Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine);
// do something else.  block

Assume that the current program is a central control device of a smart home, not only controlling the electric kettle to boil water, but also controlling other devices in order to avoid blocking the main thread. Usually we will create a Thread or Task to do it.

Task.Run(() => Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine));
// do something else. non-block

Suppose now that we not only want to output in the console but also alarm through the speaker in real time. At this point we should think of commissions and events.

class Heater
    private delegate void TemperatureChanged(int temperature);
    private event TemperatureChanged TemperatureChangedEvent;
    public void BoilWater()
        TemperatureChangedEvent += ShowTemperature;
        TemperatureChangedEvent += MakeAlerm;
            () =>
        Enumerable.Range(1, 100).ToList().ForEach((temperature) => TemperatureChangedEvent(temperature))
    private void ShowTemperature(int temperature)
    private void MakeAlerm(int temperature)
class Program
    static void Main(string[] args)
        Heater heater = new Heater();        

The amount of instant code goes up. But with Rx.NET, we can simplify it into the following code:

var observable = Enumerable.Range(1, 100).ToObservable(NewTheadScheduler.Default);//
Subject<int> subject = new Subject<int>();//Subject
subject.Subscribe((temperature) => Console.WriteLine($"current:{temperature}"));//subscrible subject
subject.Subscribe((temperature) => Console.WriteLine($"current:{temperature}"));//subscrible  subject
observable.Subscribe(subject);//subscrible  observable

Just through the following three steps:

  1. The call ToObservableconverts the enumeration sequence into an observable sequence.
  2. Specify NewTheadScheduler.Defaultto enumerate in a separate thread by specifying .
  3. Call the Subscribemethod to register the event.
  4. With Subjectmulticast transmission

From the above we can see that Rx.NET greatly simplifies the steps of event processing, and this is just the tip of the iceberg of Rx.

4.Rx.net in Action:Rx.NET core

Reactive Extensions (Rx) is a library that provides a reactive programming model for .NET applications to build asynchronous event stream-based applications and System.Reactivereference them by installing Nuget packages. Rx abstracts the event stream into Observable sequences to represent asynchronous data streams, uses LINQ operators to query asynchronous data streams, and uses it Schedulerto control concurrency in asynchronous data streams. Simply put: Rx = Observables + LINQ + Schedulers.

Rx.net in Action:Rx.NET core

Rx.net in Action:Rx.NET core

In a software system, an event is a message used to indicate that something has happened. Events are raised by the Event Source and used by the Event Handler.
In Rx, the event source can be represented by an observable, and the event handler can be represented by an observer.
But how the data used by the application is represented, such as data in a database or data obtained from a web server. In the application, we generally deal with two types of data: static data and dynamic data. But no matter what type of data is used, it can be observed as a stream. In other words, the data stream itself is also observable. This means that we can also use observable to represent the data stream.

Rx.net in Action:Rx.NET core data stream

Rx.net in Action:Rx.NET core data stream

Having said that, the core of Rx.NET is clear at a glance:

  1. Everything is data flow
  2. Observable is an abstraction of the data stream
  3. Observer is a response to Observable

In Rx, the IObservable<T>and the IObserver<T>interface are used to represent observable sequences and observers , respectively . They are preset under the system namespace and are defined as follows:

public interface IObservable<out T>
      //Notifies the provider that an observer is to receive notifications.
      IDisposable Subscribe(IObserver<T> observer);

public interface IObserver<in T>
    //Notifies the observer that the provider has finished sending push-based notifications.
    void OnCompleted();
    //Notifies the observer that the provider has experienced an error condition.
    void OnError(Exception error);
    //Provides the observer with new data.
    void OnNext(T value);
Rx.net in Action:Rx.NET core observable and observer

Rx.net in Action:Rx.NET core observable and observer

5.Rx.net in Action:Create an IObservable

IObservable<T>There are several ways to create it:
1. Directly implement the IObservable<T>interface 
2. Use Observable.Createcreate

    for (int i = 0; i < 5; i++)
    return Disposable.Empty;

3. Use Observable.Defferfor delay creation (created when there is an observer subscription). For
example, if you want to connect to the database for query, if there is no observer, the database connection will always be occupied, which will cause waste of resources. Deffer can be used to solve this problem.

Observable.Defer(() =>
    var connection = Connect(user, password);
    return connection.ToObservable();

4. Use Observable.Generatean observable sequence that creates an iteration type

IObservable<int> observable =
        0,              //initial state
        i => i < 10,    //condition (false means terminate)
        i => i + 1,     //next iteration step
        i => i * 2);      //the value in each iteration

5. Use the Observable.Rangeobservable sequence that creates the specified interval

IObservable<int> observable = Observable.Range (0, 10).Select (i => i * 2);

6. Create special-purpose observable sequences

Observable.Return ("Hello World");//Create a element
Observable.Never<string> ();//Create a list
Observable.Throw<ApplicationException> (
new ApplicationException ("something bad happened"))//Create a exe
Observable.Empty<string> ()//Create a immedialy exit list
7. Use ToObservableconversion IEnumerateand task types
Enumerable.Range(1, 10).ToObservable();
IObservable<IEnumerable<string>> resultsA = searchEngineA.SearchAsync(term).ToObservable();

8. Use Observable.FromEventPattern<T>andObservable.FromEvent<TDelegate, TEventArgs> perform event conversion

public delegate void RoutedEventHandler(object sender,
 System.Windows.RoutedEventArgs e)
IObservable<EventPattern<RoutedEventArgs>> clicks =
                Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
                    h => theButton.Click += h,
                    h => theButton.Click -= h);
clicks.Subscribe(eventPattern => output.Text += "button clicked" + Environment.NewLine);

9. Use Observable.Usingfor resource release

IObservable<string> lines =
    Observable.Using (
        () => File.OpenText ("TextFile.txt"), // opens the file and returns the stream we work with
        stream =>
        Observable.Generate (
            stream, //initial state
            s => !s.EndOfStream, //we continue until we reach the end of the file
            s => s, //the stream is our state, it holds the position in the file 
            s => s.ReadLine ()) //each iteration will emit the current line (and moves to the next)

10. Use the Observable.IntervalCreate Specified Interval Observable Sequence

Rx.net in Action:Observable.Interval

Rx.net in Action:Observable.Interval

11. Use to Observable.Timercreate an observable timer

Rx.net in Action:Observable.Timer

Rx.net in Action:Observable.Timer

6.Rx.net in Action:RX operator

Create IObservableAfter that, we can apply the series Linq operator to it, query, filter, aggregate, and so on. Rx has the following series of operators built in:

Rx.net in Action:RX operator

Rx.net in Action:RX operator

The following diagram illustrates the role of common operators:

Rx.net in Action:common operators

Rx.net in Action:common operators

7.Rx.net in Action:Multicast transmission depends on: Subject

Based on the above example, we learned that Rx can simplify the implementation of the event model, which is essentially an extension of the observer pattern. Referring to the observer pattern, we know that a Subject can be subscribed by multiple observers to complete the multicast of the message. Similarly, in Rx, a Subject is also introduced for multicast message transmission, but the Subject in Rx has a dual identity—that is, the observer is also the observer.

interface ISubject<in TSource, out TResult> : IObserver<TSource>,IObservable<TResult>

The following four implementations are provided by default in Rx:

  • Subject – Broadcast each notification to all observers
Rx.net in Action:Subject

Rx.net in Action:Subject

AsyncSubject – Yes and only one notification is sent when the observable sequence is complete

Rx.net in Action:Async Subject

ReplaySubject – Cache specified notifications to replay observers for subsequent subscriptions

Rx.net in Action:Replay Subject

Rx.net in Action:Replay Subject

BehaviorSubject – Push the default or latest value to the observer

Rx.net in Action:Behavior Subject

Rx.net in Action:Behavior Subject

But for the first one Subject<T>, it should be pointed out that when there are multiple observer sequences, once one stops sending messages, the Subject stops broadcasting any messages sent subsequently by all other sequences.

Rx.net in Action:Broadcast

Rx.net in Action:Broadcast

8.Rx.net in Action:observable sequence

For Observable, they are temperature-bearing and have hot and cold points. The differences between them are shown in the following figure:

Rx.net in Action:observable sequence

Rx.net in Action:observable sequence

Cold Observable : Notifications are sent only if there is an observer subscription, and each observer has a complete sequence of observers.
Hot Observable : A notification is sent with or without an observer subscription, and all observers share the same observer sequence.

9.Rx.net in Action:Everything is in control: Scheduler

In Rx, use Scheduler to control concurrency. For Scheduler, we can understand program scheduling, and use Scheduler to specify when and where to do something. Rx provides the following types of Scheduler:

  1. NewThreadScheduler: that is executed on the new thread
  2. ThreadPoolScheduler: execute in the thread pool
  3. TaskPoolScheduler: similar to ThreadPoolScheduler
  4. CurrentThreadScheduler: executed in the current thread
  5. ImmediateScheduler: Immediate execution on the current thread
  6. EventLoopScheduler: Create a background thread to perform all operations in order

For example:

.Subscribe(str=>Console.WriteLine($"{str} on ThreadId:{Thread.CurrentThread.ManagedThreadId}")
Console.WriteLine($"Current ThreadId:{Thread.CurrentThread.ManagedThreadId}");

Current ThreadId:1
Hello on ThreadId:4

10.Rx.net in Action:Finally

Rory’s finally combed the contents of the book “Rx.NET In Action” and got a deeper understanding of Rx. Rx extended the observer mode to support data and event sequences, with built-in series operations. The symbols allow us to combine these sequences in a declarative way, without worrying about the underlying implementation for event-driven development: threads, synchronization, thread safety, concurrent data structures, and non-blocking IO.

But things are fine, and it is inevitable to avoid omissions. If you are interested in reactive programming, you may wish to read this book. I believe it will be of great benefit to you.






resources from https://www.cnblogs.com/sheng-jie/p/10399049.html

Rx.NET in Action.pdf 
Reactive Programming in ReactiveX