back to top

RxJS: gli operatori più utili e comuni

Analizziamo ora il funzionamento di alcuni degli operatori più utili e comuni degli Observable. La lista degli operatori disponibili è piuttosto lunga e articolata. Per questo motivo risulta impossibile trattarli tutti in un’unica lezione. Per maggiori dettagli e approfondimenti può quindi essere utile consultare la documentazione ufficiale.

Cosa sono gli operatori

Gli operatori sono delle funzioni pure che ricevono in ingresso un Observable e restituiscono un nuovo oggetto sempre di tipo Observable dopo aver eventualmente apportato delle modifiche ai dati che verranno emessi da quest’ultimo. L’Observable sorgente resta invariato. Un caso particolare è costituito dagli operatori che consentono di creare un Observable a partire da un’altra struttura dati o da eventi (per esempio gli eventi del DOM).

Iniziamo allora ad illustrare alcuni di questi operatori che danno la possibilità di generare un nuovo oggetto di tipo Observable più rapidamente di quanto visto in predenza con il metodo Observable.create().

Prima di proseguire ad analizzare il funzionamento dei diversi operatori, è tuttavia opportuno soffermarci a descrivere uno strumento che può risultare particolarmente utile per capire l’andamento e il comportamento di un Observable, specialmente dopo aver applicato uno o più operatori.

Cosa sono e come interpretare i Marble Diagrams

Vediamo dunque come possiamo avvalerci dei cosiddetti Marble Diagrams (letteralmente Diagramma a biglie) per capire meglio e visualizzare l’effetto di un certo operatore sul flusso dei dati in relazione al trascorrere del tempo.

I Marble Diagrams sono dei diagrammi che forniscono una rappresentazione visiva del comportamento di un Observable, delle notifiche emesse nel corso del tempo e dell’effetto che ciascun operatore ha quando viene applicato. Sulla documentazione ufficiale è solitamente presente un’immagine rapprensentante il diagramma relativo ad un certo operatore.

esempio marble diagram operatore interval

Nell’immagine riportata sopra viene mostrato il diagramma dell’operatore interval(n) che emette valori crescenti, un nuovo valore ogni n secondi.

Esiste un modo alternativo di disegnare un Marble diagram, ovvero in forma testuale. Questo tipo di diagrammi prende il nome di ASCII Marble diagrams e può risultare utile per documentare una porzione di codice o addirittura testare un’applicazione che fa uso della libreria RxJS grazie a vari strumenti che sono stati realizzati al fine di semplificare i test di unità.

---a--b--c--d----e----X----|->
(abcd)--e--f--|->

+ I caratteri alfanumerici compresi nei gruppi [a-z] o [0-9] 
  rappresentano valori che vengono emessi nel corso del tempo.
+ Il carattere '|' rappresenta la fine dell'esecuzione dell'observable.
+ Il carattere 'X' indica che si è verificato un errore.
+ Tra parentesi tonde vengono racchiusi i valori che vengono 
  inviati in maniera sincrona.
+ '--->' è la sequenza temporale di una singola esecuzione di un Observable.
+ '-' costituisce un'unità temporale

Operatori per la creazione di un Observable

Operatore of()

Vediamo ora alcuni operatori per la creazione di un Observable. Partiamo dall’operatore statico of() il quale crea un nuovo Observable che emette immediatamente i valori passati come argomento uno dopo l’altro. Al termine di questo processo notifica il completamento dell’esecuzione.

import { Observable, PartialObserver, of } from 'rxjs'; 

// (123)|
const observable: Observable<number> = of(1,2,3);

const observer: PartialObserver<number> = {
  next(value) {console.log(value)},
  complete() {console.log('complete')}
};

observable.subscribe(observer);

Operatore from()

Crea un nuovo Obervable a partire da altre strutture dati come degli Array o delle Promise.

import { Observable, PartialObserver, from} from 'rxjs'; 

// (123)|
const observable: Observable<number> = from([1,2,3])

const observer: PartialObserver<number> = {
  next(value) {console.log(value)},
  complete() {console.log('complete')}
};

observable.subscribe(observer);

Operatore fromEvent()

Crea un Observable a partire da un evento, per esempio un evento del DOM. Il primo argomento è l’elemento target che emette l’evento, il secondo argomento è il tipo di evento.

import { Observable, PartialObserver, fromEvent} from 'rxjs'; 

// Supponendo che vengano effettuati 
// due click nella pagina
// --c----c-|
const observable = fromEvent(document, 'click');

const observer: PartialObserver<MouseEvent> = {
  next(event: MouseEvent) {console.log(event.clientX)},
  complete() {console.log('complete')}
};

observable.subscribe(observer);

Facendo riferimento all’esempio riportato sopra, tutte le volte che si registra un click nella pagina, viene invocato il metodo observer.next() a cui viene passato come argomento un oggetto di tipo MouseEvent da cui estraiamo la coordinata lungo l’asse X del punto in cui si è verificato il click del mouse.

Operatore interval

L’operatore interval(period: number) crea un nuovo Observable che emette una sequenza infinita di numeri crescenti separati da un intervallo period espresso in millisecondi.

import { Observable, PartialObserver, interval} from 'rxjs'; 

// --0--1--2--3--4--...
const observable = interval(1000);

const observer: PartialObserver<number> = {
  next(value: number) {console.log(value)},
  complete() {console.log('complete')}
};

observable.subscribe(observer);

Il metodo pipe

A partire da RxJS 5.5 è stato introdotto il metodo Observable.pipe() che è lo strumento consigliato per applicare degli operatori che attuano delle trasformazioni sui dati emessi da un Observable, specialmente nel caso in cui si vogliano combinare più operatori. Il metodo pipe() prende come argomenti i diversi operatori che si vogliono eseguire in sequenza. I dati emessi da un Observable saranno sottoposti quindi alle trasformazioni specificate attraverso gli operatori nell’ordine in cui vengono passati alla funzione pipe().

Gli operatori map() e filter()

Vediamo attraverso un esempio come utilizzare gli operatori map() e filter() che consentono rispettivamente di trasformare i dati emessi da un Observable e filtrarli in base a un certo criterio espresso attraverso una funzione che viene passata come argomento.

import { Observable, PartialObserver, interval} from 'rxjs'; 
import { filter, map } from 'rxjs/operators'

// --0--1--2--3--4--...
const source = interval(1000);

const newObservable = source.pipe(
  filter(value => value % 2 === 0),
  map(value => value * 2)
)

// source: --0--1--2--3--4--...
// vvvvvv filter(valori pari) vvvvvv
//         --0-----2-----4--...
// vvvvvvv map(value * 2) vvvvvvvv
//         --0-----4-----8--...

const observer: PartialObserver<number> = {
  next(value: number) {console.log(value)},
  complete() {console.log('complete')}
};

newObservable.subscribe(observer);

Nell’esempio riportato sopra, source è un Observable che emette un valore numerico crescente ogni secondo, tale valore "passa" attraverso l’operatore filter() che lascia passare solo i numeri pari. Il valori che non vengono scartati, vengono moltiplicati per 2. Il metodo next() dell’observer riceve quindi solo i valori dopo che sono stati filtrati e moltiplicati per 2.

Altri operatori utili

Analizziamo ora alcuni degli operatori più utili presenti in RxJS.

Operatore tap()

L’operatore tap(), precedentemente noto come do(), è stato rinominato dalla versione 5.5 di RxJS in coincidenza con l’introduzione del metodo pipe. Permette di eseguire delle azioni che causano degli effetti collaterali (side effects) ad ogni emissione di un valore da parte dell’Observable, senza però alterare quest’ultimo. Restituisce quindi un Observable che è identico a quello di input. Per questo motivo risulta utile in fase di debugging.

import { Observable, PartialObserver, of} from 'rxjs'; 
import { filter, map, tap } from 'rxjs/operators'

const source = of(0,1,2,3,4,5);

const newObservable = source.pipe(
  tap(value => console.log('valore prima di filter: ', value)),
  filter(value => value % 2 === 0),
  tap(value => console.log('valore dopo filter: ', value)),
  map(value => value * 2),
  tap(value => console.log('valore dopo map: ', value)),
)

const observer: PartialObserver<number> = {
  next(value: number) {console.log(value)},
  complete() {console.log('complete')}
};

newObservable.subscribe(observer);

Eseguendo il codice riportato sopra, l’operatore tap() stamperà nella console dei messaggi. Ovviamente per i valori che vengono scartati dall’operatore filter(), verrà solo stampato il messaggio del operatore tap() invocato prima dello stesso operatore filter().

Gli operatori take() e first()

Un altro operatore utile è take(n: number) che prende solo i primi n valori emessi dall’Observable sorgente e quindi invia una notifica di completamento.

import { Observable, PartialObserver, interval} from 'rxjs'; 
import { filter, take } from 'rxjs/operators'

// --0--1--2--3--4--5--6--7--8--...
const source = interval(1000);

const newObservable = source.pipe(
  take(5),
  filter(value => value % 2 === 0)
)

// source: --0--1--2--3--4--5--6--7--8--...
// vvvvvv     take(5)         vvvvvv
//         --0--1--2--3--4|
// vvvvvv filter(valori pari) vvvvvv
//         --0-----2-----4|

const observer: PartialObserver<number> = {
  next(value: number) {console.log(value)},
  complete() {console.log('complete')}
};

newObservable.subscribe(observer);

Nell’esempio riportato sopra abbiamo un Observable source che emette un valore ogni secondo. Grazie all’operatore take() prendiamo solo i primi 5 valori ed emettiamo una notifica di completamento. Ovviamente possiamo combinare questo operatore con altri, così come abbiamo fatto nell’esempio in cui abbiamo filtrato i 5 valori lasciati passare da take() e abbiamo selezionato solo i valori pari.

Nella sua forma più semplice l’operatore first() non prevede nessun parametro ed ha lo stesso effetto di take(1) visto che prende solo il primo valore ed emette una notifica di completamento

Operatore skip()

L’operatore skip(n: number) restituisce un Observable che scarta i primi ‘n’ valori emessi dall’Observable sorgente.

import { Observable, PartialObserver, interval} from 'rxjs'; 
import { skip } from 'rxjs/operators'

// --0--1--2--3--4--...
const source = interval(1000);

const newObservable = source.pipe(
  skip(5)
)

// source: --0--1--2--3--4--...
// vvvvvv     skip(5)         vvvvvv
//         -----------------5--6--7...

const observer: PartialObserver<number> = {
  next(value: number) {console.log(value)},
  complete() {console.log('complete')}
};

newObservable.subscribe(observer);

Gli operatori takeLast() e last()

L’operatore takeLast(n: number) ricorda gli ultimi ‘n’ valori emessi dall’Observable di origine e, quando quest’ultimo notifica il suo completamento, li invia agli Observer.

import { Observable, PartialObserver, interval} from 'rxjs'; 
import { take, takeLast } from 'rxjs/operators'

// --0--1--2--3--4--...
const source = interval(1000);

const newObservable = source.pipe(
  take(3),
  takeLast(2)
)

// source: --0--1--2--3--4--...
// vvvvvv     take(3)         vvvvvv
//         --0--1--2|
// vvvvvv     takeLast(2)         vvvvvv
//         ---------(12|)

const observer: PartialObserver<number> = {
  next(value: number) {console.log(value)},
  complete() {console.log('complete')}
};

newObservable.subscribe(observer);

Nell’esempio riportato sopra prendiamo solo i primi 3 valori emessi dall’Observable sorgente grazie all’operatore take(3) che restituisce un Observable il quale invia a sua volta una notifica di completamento dopo aver emesso i tre valori. L’operatore takeLast(2) restituisce quindi un Observable che ricorda gli ultimi 2 valori emessi dall’Observable di ingresso e li consegna agli Observer in modo sincrono seguiti da una notifica di completamento.

L’operatore last() senza alcun argomento è equivalente a takeLast(1).

Operatore takeUntil()

L’operatore takeUntil(notifier: Observable) restituisce un Observable che emette i dati inviati dall’Observable sorgente fin quando un secondo Observable notifier, passato come argomento, emette un valore.

import { 
  Observable,
  PartialObserver, 
  interval,
  timer
} from 'rxjs'; 
import { takeUntil } from 'rxjs/operators'

// --0--1--2--3--4--...
const source = interval(1000);

const timerObs = timer(2500);

const newObservable = source.pipe(
  takeUntil(timerObs)
)

const observer: PartialObserver<number> = {
  next(value: number) {console.log(value)},
  complete() {console.log('complete')}
};

newObservable.subscribe(observer);

L’esempio mostrato sopra stampa a video i primi due valori (0, 1) e poi il messaggio che segnala il completamento dell’Observable.

Operatore startWith()

L’operatore startWith() restituisce un Observable che emette gli elementi specificati come argomenti prima di continuare ad inviare i dati provenienti dall’Observable sorgente.

import { Observable, PartialObserver, range} from 'rxjs'; 
import { startWith} from 'rxjs/operators';

// (345|)
const source = range(3, 3);

const newObservable = source.pipe(
  startWith(0, 1, 2)
)

// source: (345|)
// vvvvvv startWith(0, 1, 2) vvvvvv
//         (012345|)

const observer: PartialObserver<number> = {
  next(value: number) {console.log(value)},
  complete() {console.log('complete')}
};

newObservable.subscribe(observer);

Operatore concat()

L’operatore concat() restituisce un nuovo Observable che concatena più sorgenti emettendo i loro valori in sequenza, un Observable dopo l’altro.

import { 
  Observable,
  PartialObserver, 
  interval
} from 'rxjs'; 
import { concat, take } from 'rxjs/operators'

// ----0----1----2|
const source1 = interval(2000).pipe(take(3));
// --0--1|
const source2 = interval(1000).pipe(take(2));

// ----0----1----2|
// --0--1|
// vv concat() vv
// ----0----1----2--0--1|

const newObservable = source1.pipe(
  concat(source2)
)

const observer: PartialObserver<number> = {
  next(value: number) {console.log(value)},
  complete() {console.log('complete')}
};

newObservable.subscribe(observer);

Operatore merge()

L’operatore statico merge(...Observables) restituisce un nuovo Observable che combina e mescola i valori emessi dai vari Observable passati come argomento.

import { 
  Observable,
  PartialObserver, 
  interval,
  merge
} from 'rxjs'; 
import { take } from 'rxjs/operators'

// ----0----1----2|
const source1 = interval(2000).pipe(take(3));
// --0--1|
const source2 = interval(1000).pipe(take(2));

// ----0----1----2|
// --0--1|
// vv merge() vv
// --0-01---1----2|

const newObservable = merge(
  source1,
  source2
)

const observer: PartialObserver<number> = {
  next(value: number) {console.log(value)},
  complete() {console.log('complete')}
};

newObservable.subscribe(observer);

Operatore debounceTime()

L’operatore debounceTime(timeInMilliseconds) attende timeInMilliseconds millisecondi di silenzio prima di lasciare passare un valore. Se per esempio riceve un valore dall’Observable di ingresso, lo memorizza internamente e attende timeInMilliseconds prima di emettere il valore. Se nessun altro valore arriva in questo arco di tempo, l’operatore lascerà "passare" il valore. Se, invece, durante questo periodo arriva un nuovo valore, debounceTime() cancella il vecchio valore e memorizza internamente il nuovo valore ripetendo la stessa procedura fino alla conclusione dell’Observable sorgente.

Un tipico caso in cui l’operatore debounceTime può risultare utile è quando si ha un campo di input e si vuole eseguire una certa azione quando un utente digita del testo. Per esempio potrebbe essere necessario effettuare una richiesta HTTP ad un server remoto per recuperare delle informazioni relative al testo digitato. Avviare successive richieste senza neanche cancellare le precedenti risulta certamente poco efficiente. Grazie a questo operatore, possiamo limitare in modo semplice il numero di azioni da eseguire, effettuandole solo quando l’utente ha completato di digitare una parola.

Riportiamo quindi nel frammento di codice sottostante un esempio che illustri quanto appena descritto.

import { Observable, PartialObserver, fromEvent} from 'rxjs'; 
import { map, debounceTime } from 'rxjs/operators'

const input = document.querySelector('#city');

const source = fromEvent(input, 'input');

const newObservable = source.pipe(
  map(event => (<HTMLInputElement>event.target).value),
  debounceTime(800)
)

const observer: PartialObserver<string> = {
  next(value: string) {console.log(value)},
  complete() {console.log('complete')}
};

newObservable.subscribe(observer);

Nell’esempio riportato sopra creiamo un Observable che emette un evento ogni volta che digitiamo un carattere all’interno di un campo di testo con id #city. Attraverso il metodo map() estraiamo il valore del campo di input. Grazie al metodo debounceTime() facciamo in modo che venga consegnato un valore all’Observer solo se non si digitano altri caratteri per almeno 800ms dall’ultima immissione.

Operatore distinctUntilChanged()

L’esempio visto sopra permette di ottenere dei vantaggi in termini di prestazioni dato che debounceTime() limita il numero di azioni che vengono completate in seguito all’aggiornamento del valore del campo di input. Possiamo però apportare un’altra modifica che consente di ottimizzare ulteriormente il nostro esempio. Per far ciò ci avvaliamo dell’operatore distinctUntilChanged() che restituisce un Observable il quale confronta ogni volta il nuovo valore ottenuto dall’Observable sorgente con quello ricevuto precedentemente. L’ultimo valore ricevuto viene quindi emesso solo se differisce dal precedente.

Facendo riferimento all’esempio visto in precedenza, supponiamo di digitare il nome della città di ‘Pisa’. Dopo l’ultima vocale verrà stampato nella console il nome della città. A questo punto aggiungiamo le due lettere ‘no’ ottenendo quindi il nome del comune di ‘Pisano’ in provincia di Novara e cancelliamo subito gli ultimi due caratteri (entro gli 800ms). Nella console verrà stampato nuovamente il nome ‘Pisa’. Se avessimo voluto contattare un server remoto, per esempio per ottenere i dati delle previsioni del tempo, avremmo effettuato una nuova richiesta non necessaria. Grazie all’operatore distinctUntilChanged() possiamo risolvere questo tipo di problemi.

Potete testare quanto appena illustrato nel riquadro sottostante o direttamente su stackblitz.

Cold Observable, Hot Observable e l’operatore share()

Abbiamo detto precedentemente che nella maggior parte dei casi viene creata una nuova esecuzione di un Observable ogni volta che un Observer chiede di ricevere delle notifiche attraverso la funzione observable.subscribe(). Ogni Subscriber avvia una propria esecuzione dell’Observable e ottiene il proprio flusso di dati. Ci si riferisce a questa categoria di Observable con il nome di Cold Observable. Si tratta di quel tipo di Observable in cui il flusso dei dati è creato all’interno dello stesso Observable e non vengono inviati dei valori fino a quando non esiste almeno un Subscriber che chiede di riceverli. In precedenza abbiamo incontrato alcuni operatori che permettono di creare dei Cold Observable come interval(), of(), from(). Esiste però un’altra categoria di Observable che prende il nome di Hot Observable in cui il produttore dei valori è in realtà all’esterno dell’Observable stesso che invece si occupa di prelevare i dati e inviarli ai Subscriber. In questo caso i dati vengono prodotti indipendentemente dal fatto che ci sia un Subscriber o meno. Se non ci sono Observer che hanno già richiesto di ricevere delle notifiche tramite il metodo observable.subscribe() prima che vengano emessi dei dati, questi ultimi vengono semplicemente persi e non è possibile recuperarli. Abbiamo visto un esempio di questo tipo di Observable quando abbiamo descritto il funzionamento dell’operatore fromEvent().

In alcuni casi può essere necessario trasformare un Observable da Cold a Hot in modo da condividere lo stesso flusso di dati fra più Observer che quindi non avranno più un esecuzione esclusiva della Subscriber Function. La ragione principale per cui è opportuno eseguire una simile conversione è per questioni di prestazioni ed efficienza. Se per esempio si utilizza un Observable per effettuare delle richieste HTTP al fine di recuperare delle informazioni da un server remoto, può essere utile condividere il flusso di dati fra i vari Observer che lo utilizzano al fine di ridurre il numero di richieste. Potremo così ottimizzare la nostra applicazione e diminuire in modo significativo il rischio di sovraccaricare il server remoto evitando in questo modo di incorrere in problemi di prestazioni lato server o di rischiare di dover sostenere costi eccessivi.

Share() viene in nostro soccorso in simili scenari visto che si tratta di un operatore che trasforma un Observable di tipo Cold in Hot facendo in modo che i diversi Subscriber condividano lo stesso flusso di dati. Finché c’è almeno un Subscriber, l’Observable restituito da share() emetterà dei dati. Quando tutti gli Observer hanno indicato di non voler più ricevere notifiche grazie al metodo observable.unsubscribe(), l’operatore share() procederà ad annullare la ricezione di dati dall’Observable sorgente.

Higher Order Observables

Abbiamo visto nei precedenti esempi che un Observable può emettere diversi tipi di valori. Nulla vieta quindi che questi siano a loro volta di tipo Observable. Si parla in questi casi di Higher Order Observable per indicare dunque degli Observable che emettono dei valori di tipo Observable.

import { interval, Observable } from 'rxjs'; 
import { map, take } from 'rxjs/operators';

const outerInterval: Observable<number> = 
  interval(3000).pipe(
    take(3)
);

const innerInterval: Observable<number> = 
  interval(500).pipe(
    take(4)
);

const higherOrderObservable: Observable<Observable<number>> =
  outerInterval.pipe(
    map(value => innerInterval)
);

higherOrderObservable
  .subscribe((innerObservable: Observable<number>) => {
    console.log(innerObservable);
  });

Se consideriamo il frammento di codice riportato sopra, abbiamo un Observable che per tre volte, a distanza di 3 secondi, emette un nuovo valore. Tramite la funzione map() trasformiamo tale valore in un Observable che invia 4 valori con un intervallo di 500ms fra loro. Nel momento in cui eseguiamo il metodo higherOrderObservable.subscribe() stampiamo nella console 3 oggetti di tipo Observable, ma non viene emesso nessun valore da ciascuno di loro perché non invochiamo il loro metodo subscribe().

Per estrarre i valori degli Observable "interni" dovremo quindi invocare il metodo innerObservable.subscribe() come mostrato sotto.

import { interval, Observable } from 'rxjs'; 
import { map, take } from 'rxjs/operators';

const outerInterval: Observable<number> = 
  interval(3000).pipe(
    take(3)
  );

const innerInterval: Observable<number> = 
  interval(500).pipe(
    take(4)
  );

const higherOrderObservable: Observable<Observable<number>> =
  outerInterval.pipe(
    map(value => innerInterval)
  );

higherOrderObservable
  .subscribe((innerObservable: Observable<number>) => {
    innerObservable.subscribe((value: number) => console.log(value))
  });

A questo punto verrà stampata per 3 volte nella console la sequenza 0,1,2,3. Viene infatti avviato l’Observable esterno che emette per la prima volta un valore il quale viene trasformato in un nuovo Observable che emette in 2 secondi (500ms x 4) l’intera sequenza (0,1,2,3). Quest’ultima viene infine mostrata nella console prima che l’Observable esterno emetta un secondo valore e si ripeta la procedura appena descritta.

Non è tuttavia consigliato annidare delle chiamate al metodo observable.subscribe() come mostrato nel precedente esempio. Per questo motivo esistono degli operatori specifici.

Operatore mergeAll() e mergeMap()

L’operatore mergeAll() permette di ottenere lo stesso risultato dell’esempio visto in precedenza senza dover annidare chiamate ai metodi subscribe() e ottenere i valori dell’observable interno manualmente. Tale operatore si occupa di invocare il metodo subscribe() per ogni Observable "interno" e quindi di estrarre i valori emessi da questi ultimi e presentarli su un’unica linea temporale. L’Observable "esterno" emetterà una notifica di completamento solo se tutti gli Observable interni avranno completato la loro esecuzione.

Possiamo vedere quanto appena descritto nel riquadro riportato sotto.

Facendo riferimento al Marble Diagram presente nel codice, potrebbe accadere, a seconda dell’esecuzione, che due valori che vengono emessi immediatamente uno dopo l’altro (per esempio la prima coppia -02-) vengano presentati in ordine invertito. Ciò può essere riconducibile al modo in cui vengono schedulate le callback della funzione interval().

Tornando al nostro esempio, possiamo vedere che abbiamo usato l’operatore mergeAll() dopo l’operatore map(). Dal momento che si tratta di un’operazione piuttosto frequente, esiste un operatore specifico equivalente alla combinazione dei due, ovvero mergeMap(). (mergeMap = map + mergeAll)

Sia mergeAll() che mergeMap() accettano un argomento opzionale che indica il numero di Observable interni che possono essere eseguiti in modo concorrente. Il valore di default è Number.POSITIVE_INFINITY. Quando viene raggiunto il numero massimo di Observable simultanei, solo se un Observable interno è completo, viene accettato un nuovo Observable da eseguire.

Operatore concatAll() e concatMap()

L’operatore concatAll() ha lo stesso effetto di mergeAll(1) con argomento pari a 1. In questo caso si aspetta che il primo Observable interno finisca di emettere tutti i valori e se arriva un altro Observable, esso sarà salvato internamente e inizierà ad emettere dei valori solo quando termina il primo.

Come nel caso precedente, concatMap() combina i due operatori map() e concatAll(). (concatMap = map + concatAll)

Operatore switch() e switchMap()

L’operatore switchMap() è equivalente alla combinazione degli operatori map e switch(). Al contrario di mergeMap(), questo operatore non combina i valori di più Observable interni, ma cancella un Observable interno in corso di esecuzione non appena un nuovo Observable interno emette un valore come possiamo vedere nell’esempio sottostante. I valori emessi dagli Observable interni saranno quindi presentati su un’unica linea temporale.

Gestione degli errori

RxJS fornisce diversi strumenti per la gestione degli errori. Abbiamo già visto in precedenza che è possibile definire un metodo error() in un Observer per gestire degli errori, ma esistono altri operatori come catchError() che consentono di porre rimedio ad un errore o rilanciare un’eccezione prima che un Observer si registri per la ricezione delle notifiche di un Observable tramite il metodo observable.subscribe()

Operatore catchError()

L’operatore catchError() consente di intercettare un errore. Prevede come unico parametro una funzione a cui a sua volta vengono passati due argomenti. Il primo è l’errore che viene catturato dall’operatore, il secondo è l’Observable sorgente che possiamo restituire riavviando il processo di notifica dei valori come se l’errore non si fosse mai verificato. In alternativa possiamo restituire un nuovo Observable o rilanciare un’eccezione.

import { 
  Observable, 
  Observer,
  throwError
  } from 'rxjs'; 
import { map, catchError } from 'rxjs/operators';


const randomNumbers: Observable<number> = 
  Observable.create(
    (observer: Observer<number>) => {
      const intervalID = setInterval(
        () => {
          let randomNumber = Math.floor(5 * Math.random());
          observer.next(randomNumber);
        },
        1000
      );

      return {
        unsubscribe() {
          clearInterval(intervalID);
        }
      }
    }
  );

const observer: Observer<number> = {
  next(value: number) {console.log(value)},
  error(error: any) {console.error(error)},
  complete() { console.log('complete')}
}

const subscription = randomNumbers.pipe(
  map((value: number) => {
    if (value === 0) {
      throw new Error('Emitted value is 0');
    }
    return value;
  }),
  catchError((error, caught) => throwError(error))
).subscribe(observer);

Nell’esempio mostrato sopra abbiamo creato un nuovo Observable che emette ogni secondo un valore intero casuale compreso fra 0 e 5. Tramite l’operatore map() verifichiamo se il valore generato è pari a 0 o meno. Nel primo caso lanciamo un errore, altrimenti restituiamo il valore senza modificarlo. Al verificarsi dell’errore, quest’ultimo viene intercettato dall’operatore catchError() che in questo caso restituisce un nuovo Observable attraverso l’operatore throwError() il quale non emette nessun valore ma invia immediatamente una notifica di errore. Tale notifica viene ricevuta dal metodo error() dell’observer che stampa un messaggio nella console.

Operatore retry()

Un altro operatore utile in caso di errori è retry(n: number) che restituisce un nuovo Observable identico a quello sorgente con la differenza che quando si verifica un errore proverà fino a un massimo di n volte ad iscriversi nuovamente all’Observable sorgente che può quindi provare a ripetere l’intera sequenza di notifiche interrotta dal verificarsi dell’errore.

// ... resto del codice

const subscription = randomNumbers.pipe(
  map((value: number) => {
    if (value === 0) {
      throw new Error('Emitted value is 0');
    }
    return value;
  }),
  retry(2),
  catchError((error, caught) => throwError(error))
).subscribe(observer);

Possiamo allora modificare l’esempio appena illustrato precedendo l’operatore catchError() da retry() a cui passiamo come argomento il numero di volte che desideriamo ripetere la sequenza di notifiche prima che un nuovo errore venga finalmente intercettato e gestito dall’operatore catchError().

Nel riquadro sottostante riportiamo per intero l’esempio appena illustrato.

RxJS Subject

Finora abbiamo introdotto il tipo di dato Observable che costituisce uno degli elementi chiave di RxJS. Subject rappresenta un altro tipo fondamentale che combina contemporaneamente le funzionalità di un Observable e di un Observer. Per cui può ricevere delle notifiche da un Observable registrandosi tramite il metodo observable.subscribe(), dato che presenta i tre metodi visti per un Observer, ovvero next(), error() e complete().

Allo stesso tempo però anche le istanze di Subject possiedono il metodo subject.subscribe() che permette ad altri Observer di ricevere notifiche da un oggetto di tipo Subject. Al contrario di un normale Observable, gli oggetti di tipo Subject sono multicast e una singola esecuzione viene condivisa da più Observer. Quando si invoca subject.subscribe() viene semplicemente inserito un nuovo Observer in una lista interna. Invocando subject.next() verranno notificati tutti gli Observer registrati. (Quello che sostanzialmente prevede l’Observer pattern)

Come possiamo vedere dall’esempio riportato sopra, i primi due Observer (ObserverA e ObserverB) ricevono le notifiche inviate tramite il metodo subject.next() essendosi registrati prima attraverso il metodo subject.subscribe(). ObserverC al contrario non riceverà mai nessuna notifica essendosi registrato quando ormai le notifiche sono già state inviate.

È anche possibile utilizzare un oggetto di tipo Subject per trasformare un Observable da unicast a multicast.

Nell’esempio riportato sopra observerB si registra per ricevere delle notifiche da subject solo dopo 4 secondi. Per questo motivo non riceve i primi 4 valori, ma solo quelli successivi. Evidenziamo che subject riceve delle notifiche dall’Observable e provvede poi a consegnarle ai diversi observer che si sono registrati tramite il metodo subject.subscribe().

Behavior Subject

La classe BehaviorSubject estende Subject. Gli oggetti di questo tipo devono essere sempre inizializzati con un valore che deve essere passato come argomento del costruttore nel momento in cui si crea una nuova istanza. Hanno inoltre la capacità di ricordare l’ultimo valore emesso che sarà inviato ai subscriber che si registrano con il metodo subject.subscribe() dopo che è stato emesso l’ultimo valore tramite subject.next().

Nell’esempio riportato sopra abbiamo tre Observer: ObserverA riceve tutte le notifiche compreso il valore iniziale passato al costruttore nel momento in cui abbiamo creato l’istanza subject. ObserverB riceve solo il valore 2 perché abbiamo invocato subject.subscribe(observerB) dopo circa un secondo dall’ultima emissione, ma siccome subject è di tipo BehaviorSubject, ricorda l’ultimo valore e lo invia ad ObserverB. Al contrario ObserverC non riceve nessun valore perché nel momento in cui abbiamo chiamato subject.subscribe(observerC), subject ha già emesso la notifica di completamento. Quest’ultima viene comunque recapitata al mentodo complete() di ObserverC.

Replay Subject

ReplaySubject è l’ultimo tipo di Subject che descriveremo brevemente per concludere questa lezione. Gli oggetti di questo tipo consentono di ripetere l’invio di un certo numero di valori agli Observer che si iscrivono dopo la loro emissione. Quando creiamo un’istanza di ReplaySubject dobbiamo quindi indicare quanti valori vogliamo che vengano eventualmente ripetuti. È possibile passare come argomento anche Number.POSITIVE_INFINITY. In questo caso verranno ricordati tutti i valori emessi in passato. Al contrario di BehaviorSubject, gli oggetti di tipo ReplaySubject sono in grado di consegnare dei valori precedenti anche dopo aver emesso una notifica di completamento come è possibile vedere dall’esempio riportato sotto.

Riepilogo

In questa lezione abbiamo analizzato gli elementi fondamentali della libreria RxJS. Abbiamo introdotto il tipo di dato Observable illustrando il suo funzionamento e mettendo in evidenza le differenze rispetto al meccanismo delle Promise. Abbiamo inoltre descritto alcuni degli operatori più utili affindandoci ai cosiddetti Marble Diagrams per mostrare attraverso dei diagrammi l’andamento nel corso del tempo del flusso dei dati associato agli oggetti di tipo Observable. Abbiamo infine parlato del tipo di dato Subject. Nella prossima lezione sposteremo la nostra attenzione sui Servizi che costituiscono uno dei blocchi cardini di Angular.

Pubblicitร