RXJS Frequently Used Features In Angular

RXJS Frequently Used Features In Angular

Coding is all about problem solving and finding the right tools to fix the right problems. In the case of RxJS, the problem being solved is the ability to handle asynchronous calls with multiple events. But what does that mean exactly?

Imagine that you are writing a function that carries data along a series of actions and there is an error. If you are just using functions to handle the series of requests, there may be some unnecessary steps that are taken to return the error. Rather than passing the error through all of the functions, it should be able to take the error and update the view without running through the now unnecessary Ajax requests.

You may note that promises are made to solve this type of error handling, but RxJS takes the concept of funneling consecutive actions to the next level. A promise can only handle a single value, limiting it use cases. Additionally, promises are not cancellable, meaning that it has the potential to block the thread and use up unnecessary resources (important consideration for resource constrained devices).

1. Using map() to transform data

Using map() on an Observable is identical to using it on an array. It:

  1. Accepts a callback as an argument;
  2. Executes it on every element of the array you called it on; and
  3. Returns a new array with each element of the original array replaced with the result of calling the callback on it.

The only differences when using map() on Observables are that:

  1. Instead of returning a new array, it returns a new Observable; and
  2. It executes every time the Observable emits a new item, instead of immediately and all at once.

We can use map() to transform our stream of user data into just a list of their website names:

source.
  map((user) => user.website)

Here, we’ve used map to “replace” every user object in the incoming stream with each user’s website.

RxJS also allows you to call map() as select(). Both names refer to the same function.

2. Filtering results

Like map(), filter() is much the same on Observables as on arrays. To find every user with either a .net or a .org website address, we’d write:

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'));
})

This selects only the users whose websites end with ‘net’ or ‘org’.

filter() also has the alias where().

3. Collecting results with reduce()

reduce() allows us to use all of our individual values and turn them into a single result.

reduce() tends to be the most confusing of the basic list operations, because, unlike filter() or map(), it behaves differently from use to use.

In general, reduce() takes a collection of values, and turns it into a single data point. In our case, we’ll feed it a stream of website name, and use reduce() to convert that stream into an object that counts how many websites we’ve found, and the sum of the lengths of their names.

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  reduce((data, website) => {
    return {
      count       : data.count += 1,
      name_length : data.name_length += website.length
    }
  }, { count : 0, name_length : 0 })

Here, we reduce our stream to a single object, which tracks:

  1. How many sites we’ve seen; and
  2. The total length of all their names.

Keep in mind that reduce() only returns a result when the source Observable completes. If you want to know the state of the accumulator every time the stream receives a new item, use scan() instead.

4. Limiting results with take()

take() and takeWhile() round out the basic functions on simple streams.

take(n) reads n values from a stream, and then unsubscribes.

We can use scan() to emit the our object every time we receive a website, and only take() the first two values.

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  scan((data, website) => {
      return {
        count       : data.count += 1,
        name_length : data.name_length += website.length
      }
    }, { count : 0, name_length : 0 }).
  take(2);

RxJS also offers takeWhile(), which allows you to take values until some boolean test holds true. We can write the above stream with takeWhile() like this:

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  scan((data, website) => {
    return {
      count       : data.count += 1,
      name_length : data.name_length += website.length
    }
  }, { count : 0, name_length : 0 }).
  takeWhile((data) =>  data.count < 3)

5. Squashing streams with flatMap()

. . . As a matter of fact, we’re already using one!

We made calls to fromPromise() and flatMap() when we defined our source stream:

const source   =
        // Take a Promise and convert it to an Observable
        Rx.Observable.fromPromise(makeRequest(ENDPOINT))
          // Flatten Promise
          .flatMap(Rx.Observable.from); 

This uses three pieces of new machinery:

  1. FromPromise
  2. Rx.Observable.from; and
  3. Flatmap

Observables from promises

A Promise represents a single future value we’ll get back asynchronously—the result of a call to the server, for instance.

One of the defining features of a Promise is that it represents just one future value. It can’t return multiple asynchronous pieces of data; that’s what Observables do, and is a fundamental difference between the two.

This means that, when we use Rx.Observable.fromPromise(), we get an Observable that emits a single value—either:

  1. The value the Promise resolves to; or
  2. The error the Promise rejects with.

When a Promise returns a string or a number, we don’t need to do anything special. But when it returns an array, as it does in our case, we’d prefer to create an Observable that emits the contents of the array, not the array itself as a single value.

6. Using flatMap()

This process is called flattening, which flatMap() takes care of. It has a number of overloads, but we’ll only use the simplest and the most common of them.

When using flatMap(), we:

  1. Call flatMap() on an Observable that emits the single-value resolution or rejection of a Promise; and
  2. Pass it a function to create a new Observable with.

In our case, we pass Rx.Observable.from(), which creates a sequence from the values of an array:

Rx.Observable.from([1, 2, 3]).
  subscribe(
      onNext (value) => console.log(`Next: ${value}`))

// Prints: 
//  Next: 1
//  Next: 2
//  Next: 3

That covers the code in our little prelude:

const source =
  // Create an Observable emitting the VALUE or REJECTION of a Promise...
  Rx.Observable.fromPromise(makeRequest(ENDPOINT))
    // ...And turn it into a new Observable that emits every item of the
    //  array the Promise resolves to.
    .flatMap(Rx.Observable.from)

RxJS has an alias for flatMap(), as well: selectMany().

7. Combining streams with concat() and merge()

Concatenation and merging are two of the most common ways to combine streams.

Concatenation creates a new stream by emitting the values of a first stream until it completes, and then emitting the values of a second stream.

Merging creates a new stream from many streams by emitting values from whichever stream is active

Think of talking to two people at once on Facebook Messenger. concat() is the scenario where you get messages from both, but finish your conversation with one person before responding to the other. merge() is like creating a group chat and receiving both streams of messages simultaneously.

source1.
  concat(source2).
  subscribe(
    onNext(value) => console.log(`Next: ${value}`))
    // Prints 'Source 1' values first, THEN 'Source 2'

source1.
  merge(source2).
  subscribe(
    onNext(value) => console.log(`Next: ${value}`))
    // INTERLEAVES 'Source 1' and 'Source 2' values

The concat() stream will print all of the values from source1 first, and only begin printing values from source2 after source1 completes.

The merge() stream will print values from source1 and source2 as it receives them: It won’t wait for the first stream to complete before emitting values from the second.

8. Using switch()

Often, we want to listen to an Observable emitting Observables, but only pay attention to the latest emission from the source.

To extend the Facebook Messenger analogy further, switch() is the case where you . . . Well, switch who you respond to, based on who’s currently sending messages.

For this, RxJS provides switch.

User interfaces furnish several good use cases for switch(). If our app fires off a request every time a user selects what they want to search for, we can assume they only want to see results from the latest selection. So, we use switch() to listen to only the result from the latest selection.

While we’re at it, we should make sure not to waste bandwitdh by only hitting the server for the last selection a user makes every second. The function we use for this is called debounce()

If you want to go in the other direction, and only honor the first selection, you’d use throttle().It has the same API, but opposite behavior.

9. Creating Subject

A Subject, in contrast to an observable, is simply an observer that's also able to emit values. It's both an observable and an observer simultaneously. This is unlike an observable, as an observer that's subscribed to an observable can only read values emitted from an observable.

Let's create a Subject using the project from the previous tutorial, and let's get started with the following code in /src/code.ts:

import { Subject } from "rxjs/Subject";

var subject = new Subject()


// Our handy function for showing the values:
// We will use this shortly...
function addItem(val:any) {
    var node = document.createElement("li");
    var textnode = document.createTextNode(val);
    node.appendChild(textnode);
    document.getElementById("output").appendChild(node);
}

As you can see, creating an actual Subject is ridiculously simple.

Next, we have to .subscribe to the Subject to create an observer:

var subject = new Subject()

subject.subscribe(
    data ?=> addItem('Observer 1: '+ data),
    err => addItem(err),
    () => addItem('Observer 1 Completed')
)

If you save the project, at this point nothing happens. We have to use .next to start emitting values from the observer:

// Previous code from above removed for brevity

subject.next('The first thing has been sent')

Simple enough, but let's make it more interesting by defining another observer and pushing some more values:

// Previous code from above removed for brevity

var observer2 = subject.subscribe(
    data ?=> addItem('Observer 2: '+ data)
)

subject.next('The second thing has been sent')
subject.next('A third thing has been sent')

10. BehaviorSubject

We've just created a regular Subject, but what about BehaviorSubject?

BehaviorSubject is a special type of Subject whose only different is that it will emit the last value upon a new observer's subscription.

For instance, in the above example of a regular Subject, when Observer 2 subscribed, it did not receive the previously emitted value 'The first thing has been sent' -- In the case of a BehaviorSubject, it would. 

Let's create a BehaviorSubject using our existing code. Simply change lines 1 and 3 to the following:

import { BehaviorSubject } from "rxjs/BehaviorSubject";

var subject = new BehaviorSubject('First')

Also, just after our first call to .next(), make the adjustment:

subject.next('The first thing has been sent')

// Add this
subject.next('...Observer 2 is about to subscribe...')


要查看或添加评论,请登录

Raghvendra Pratap Singh的更多文章

社区洞察

其他会员也浏览了