back to top

Introduzione a RxJS per le applicazioni Angular

A partire dalla versione 2 del framework, il team Angular ha introdotto numerose novità e, in diversi casi, ha utilizzato un approccio completamente differente rispetto alla versione precedente per la risoluzione degli stessi problemi. In tale contesto rientra l’uso della libreria RxJS (Reactive Extensions for JavaScript) che fornisce un’implementazione del tipo di dato Observable il quale viene spesso impiegato per l’esecuzione di operazioni asincrone. Un esempio sono i form di tipo Reactive i quali hanno delle proprietà che fanno proprio uso degli Observable per monitorare la variazione dei valori dei campi di input. Anche alcuni moduli come quello HTTP utilizzano internamente gli Observable per l’invio di richieste AJAX a server remoti.

La libreria RxJS fornisce quindi gli strumenti necessari per realizzare delle applicazioni che seguono i principi di Reactive Programming, una tipologia di programmazione che cerca di semplificare la manipolazione di flussi di dati asincroni.

In questa lezione cercheremo di sintetizzare brevemente quelli che sono i concetti chiave della libreria RxJS e in particolare del tipo Observable in modo tale da poter affrontare nelle prossime lezioni alcuni argomenti, come quello dei servizi, con più facilità.

Gli Observable

Sulla documentazione ufficiale gli Observable sono definiti come "Lazy Push Collections of Multiple Values". Per capire in pieno il significato e comprendere meglio il loro funzionamento, iniziamo a vedere come creare un oggetto di tipo Observable grazie a RxJS. Esistono tanti modi per testare e prendere dimestichezza con la libreria. Il più semplice consiste nell’usare uno dei tanti servizi online come CodePen, JSFiddle o JSBin che consentono di verificare il funzionamento di piccoli frammenti di codice HTML, JavaScript e CSS e avere immediatamente un riscontro visivo del risultato ottenuto. Per semplicità nel resto della lezione riporteremo i frammenti di codice testati su StackBlitz grazie al quale possiamo iniziare ad usare TypeScript e la versione 6 di RxJS in pochi click.

screenshot pagina iniziale stackblitz

Il metodo Rx.Observable.create() rappresenta un modo per creare una nuova istanza di Observable. Riceve come argomento una funzione (Subscriber Function) che viene eseguita solo nel momento in cui viene invocato il metodo observable.subscribe(). Questo è il motivo per cui gli Observable sono considerati "Lazy", proprio perché l’esecuzione della funzione passata come argomento a Rx.Observable.create() viene rimandata fino a quando non viene chiamato il metodo observable.subscribe(). Fino a quel momento non vengono allocate risorse in memoria né eseguite le operazioni specificate nella Subscriber Function. Solo quando un Observer si registra per ricevere le notifiche di un Observable tramite la funzione observable.subscribe(), vengono eseguite le istruzioni presenti nella Subscriber Function. Solitamente ciascun Observer avrà la propria esecuzione indipendente dagli altri. Semplificando, possiamo vedere gli Observable come i produttori di dati i quali vengono consegnati ai consumatori (Observer) che lo hanno richiesto.

import { Observable, Observer } from 'rxjs'; 
const source = Observable.create((observer: Observer<number>) => { });

Nel frammento di codice riportato sopra, usiamo il metodo Observable.create() per creare un oggetto di tipo Observable. Ad Observable.create() abbiamo passato come argomento una funzione che prevede a sua volta un singolo parametro observer di tipo Observer<number>. In generale Observer<T> è un tipo generico. Nel caso specifico abbiamo invece indicato che i nostri oggetti sono di tipo Observer<number> perché lavoreranno con dei valori numerici. Ciò significa che ci aspettiamo che l’Observable consegni a ciascun Observer dei numeri.

Nel nostro primo esempio abbiamo assegnato alla costante source un riferimento al nuovo Observable creato. Alcuni sviluppatori preferiscono usare una convenzione particolare che prende il nome di Finnish Notation e prevede di rendere al plurale tutti i nomi di variabili che si riferiscono a degli Observable. Visto che solitamente il plurale dei nomi in inglese viene costruito aggiungendo una ‘-s’ alla fine della parola, viene usato il carattere ‘-$’ al posto della ‘-s’.

import { Observable, Observer } from 'rxjs'; 
// nominiamo la costante seguendo la cosiddetta Finnish Notation
const value$ = Observable.create((observer: Observer<number>) => { });

In questo modo è facile individuare all’interno del codice una variabile che contiene un riferimento ad un Observable anche se può risultare scomodo digitare ogni volta il carattere ‘$’. Inoltre non tutti i sostantivi formano il plurale aggiungendo una ‘-s’ finale, per cui l’uso del solo carattere ‘$’ non ha sempre senso. Esistono altri tipi di notazioni ideate per risolvere i problemi appena esposti. In ogni caso si tratta di scelte soggettive, l’importante è sempre mantenere uno stile coerente.

Tornando al nostro esempio, dal momento che Observable.create() è un alias del costruttore, la sintassi sottostante consente di ottenere lo stesso risultato di quella riportata sopra.

const source = new Observable((observer: Observer<number>) => { });

Anche in questo caso passeremo al costruttore una funzione all’interno della quale aggiungeremo la logica del nostro Observable per consegnare dei valori ai diversi Observer che lo richiedono.

Gli Observer sono degli oggetti in grado di ricevere le notifiche inviate da un Observable e presentano, nella loro forma completa, tre diversi metodi in grado di interpretare altrettanti tipi di notifiche inviate da un Observable.

interface Observer<T> {
  closed?: boolean;
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}
  • il metodo next(value) deve essere sempre definito visto che viene invocato dall’oggetto Observable per consegnare un nuovo valore agli Observer. (notifiche di tipo Next)
  • il metodo error(error) rappresenta invece la funzione callback attraverso la quale un Observer riceve delle notifiche di tipo Error.
  • il metodo complete() è una funzione callback utilizzata per ricevere una notifica senza valore di tipo Complete dall’Observable. Ad ogni Observer possono essere consegnati un numero non definito di valori. Se viene però ricevuta una notifica di tipo Error o Complete, non può più ricevere altri valori.
  • la proprietà closed è opzionale ed indica se un Observer ha già chiesto di non voler più ricevere notifiche da un Observable o meno. (Vedremo a breve come cancellare l’iscrizione ad un Observable)

In RxJS possiamo definire non solo degli Observer che hanno tutti e tre i metodi appena elencati e rispettano quindi la struttura specificata dall’interfaccia Observer riportata sopra, ma anche degli Observer parziali in cui vengono implementati solo alcuni dei tre metodi. (Vedremo a breve un esempio in cui impieghiamo degli observer che implementano l’interfaccia PartialObserver)

Riprendendo l’esempio visto in precedenza un Observable può consegnare un nuovo valore a ciascun Observer che l’ha richiesto invocando il metodo observer.next() nella Subscriber Function.

import { Observable, Observer } from 'rxjs'; 
const source = Observable.create((observer: Observer<number>) => {
  observer.next(1);
});

In questo modo verrà consegnato il valore 1 a tutti gli Observer che si sono registrati con il metodo Observable.subscribe(). Dal momento che quest’ultimo non è stato ancora invocato, la Subscriber Function non viene mai eseguita. Procediamo quindi a modificare il codice riportato sopra come segue.

import { Observable, Observer, PartialObserver } from 'rxjs'; 

const observable = Observable.create((observer: Observer<number>) => {
  observer.next(1);
});

const observer: PartialObserver<number> = {
  next(value:number) {console.log('next: ', value);}
};

observable.subscribe(observer);

Abbiamo invocato il metodo observable.subscribe() che accetta un argomento opzionale di tipo PartialObserver<T>, union type così definito.

interface NextObserver<T> {
  closed?: boolean;
  next: (value: T) => void;
  error?: (err: any) => void;
  complete?: () => void;
}

interface ErrorObserver<T> {
  closed?: boolean;
  next?: (value: T) => void;
  error: (err: any) => void;
  complete?: () => void;
}

interface CompletionObserver<T> {
  closed?: boolean;
  next?: (value: T) => void;
  error?: (err: any) => void;
  complete: () => void;
}

type PartialObserver<T> = NextObserver<T> | ErrorObserver<T> | CompletionObserver<T>;

Facendo sempre riferimento all’esempio, abbiamo un observer con un solo metodo next() che si limita a stampare a video il valore che gli viene passato. Infatti, dal momento che ora esiste almeno un observer che si è registrato tramite il metodo observable.subscribe() per ricevere delle notifiche dall’Observable, la Subscriber Function viene eseguita e al suo interno viene passato il valore 1 (observer.next(1)) al solo Observer presente finora.

Ecco perché un Observable viene considerato un sistema di tipo Push, visto che il produttore (Observable in questo caso) determina quando inviare i dati al consumatore (Observer) che non è a conoscenza di quando e con che frequenza riceverà tali informazioni. L’Observable svolge un ruolo attivo, mentre l’Observer riceve passivamente i dati senza avere alcun controllo sul loro invio.

È bene sottolineare che la Subscriber Function viene eseguita ogni volta in maniera esclusiva per ogni singolo Observer che si registra attraverso il metodo observable.subscribe().

import { Observable, Observer, PartialObserver } from 'rxjs'; 

const observable = Observable.create((observer: Observer<number>) => {
  observer.next(1);
  setTimeout(() => observer.next(2), 1000)
});

const observer1: PartialObserver<number> = {
  next(value: number) { console.log('observer1.next: ', value);}
}

const observer2: PartialObserver<number> = {
  next(value: number) { console.log('observer2.next: ', value);}
}

const observer3: PartialObserver<number> = {
  next(value: number) { console.log('observer3.next: ', value);}
}

observable.subscribe(observer1);
observable.subscribe(observer2);

Nell’esempio riportato sopra, abbiamo creato un Observable che invia immediatamente il valore 1 agli Observer che si sono registrati con la funzione observable.subscribe() e dopo un secondo invia il valore 2. Solo i due Observer observer1 e observer2 ricevono però i due valori numerici e li stampano a video. Possiamo notare che, sebbene sia stato creato un terzo Observer (observer3), il metodo next() di quest’ultimo non sarà mai invocato visto che non ha richiesto di ricevere delle notifiche dall’Observable. (observer3 non si è registrato con il metodo observable.subscribe()). Ribadiamo ancora una volta che per ciascun Observer che ha richiesto di ricevere le notifiche dell’Observable, viene eseguita in maniera esclusiva la Subscriber Function. Nel nostro caso avremo quindi due distinte esecuzioni dell’observable per ognuno dei due observer. (con il termine "Observable execution", ovvero esecuzione dell’observable, ci si riferisce proprio all’esecuzione del codice presente all’interno della Subscriber Function)

Abbiamo detto in precedenza che un observable può consegnare uno o più valori finché non viene invocato il metodo observer.complete() o si notifica un errore con observer.error().

import { Observable, Observer, PartialObserver } from 'rxjs'; 

const observable = Observable.create((observer: Observer<number>) => {
  const threshold = 3;
  let count = 0;

  while(count < threshold) {
    observer.next(count++);
  }
  observer.complete();
  // 3 non viene più consegnato a causa della
  // precedente chiamata ad observer.complete()
  observer.next(count);

  setTimeout(
    () => observer.next(++count) // 4 non viene consegnato
    , 1000);
});

const observer1: PartialObserver<number> = {
  next(value: number) { console.log('observer1.next: ', value);}
}

const observer2: PartialObserver<number> = {
  next(value: number) { console.log('observer2.next: ', value);}
}


observable.subscribe(observer1);
observable.subscribe(observer2);

Nel frammento di codice riportato sopra, qualsiasi chiamata al metodo observer.next(), dopo aver invocato observer.complete(), non consegna nessun nuovo valore dal momento che observer.complete() completa l’esecuzione dell’Observable.

Un comportamento simile si ha se viene chiamato il metodo observer.error().

import { Observable, Observer, PartialObserver } from 'rxjs'; 

const observable = Observable.create((observer: Observer<number>) => {
  try {
    observer.next(1);
    throw new Error('Errore casuale');
  } catch(error) {
    observer.error(error);
  }
  observer.next(2);
});

const observer1: PartialObserver<number> = {
  next(value: number) { console.log('observer1.next: ', value)},
  error(error: any) { console.error('observer1.error: ', error)}
}

const observer2: PartialObserver<number> = {
  next(value: number) { console.log('observer2.next: ', value)},
  error(error: any) { console.error('observer2.error: ', error)}
}


observable.subscribe(observer1);
observable.subscribe(observer2);

Nell’esempio precedente ciascuno dei due observer riceve solo il valore 1 visto che viene immediatamente lanciata un’eccezione in seguito alla quale viene invocato il metodo observer.error(). Ciascun observer stampa a video il valore ricevuto e riporta successivamente il messaggio dell’errore che è stato generato.

esempio observable con errore

Un Observer, non solo ha la possibilità di ricevere un numero infinito di valori, ma può anche cancellare la propria esecuzione dell’Observable a cui si era registrato in precedenza. A tale scopo, RxJS prevede che il metodo observable.subscribe() restituisca un oggetto di tipo Subscription che rappresenta l’esecuzione in corso e dispone di un metodo subscription.unsubscribe() grazie al quale è possibile interrompere l’esecuzione stessa. Quando creiamo un nuovo Observable con il metodo create(), all’interno della Subscriber Function possiamo restituire una funzione o un oggetto contenente un metodo unsubscribe() in cui è opportuno assicurarsi di rilasciare eventuali risorse, rimuovere gestori di eventi precedentemente registrati o cancellare l’esecuzione di azioni ripetute avviate con funzioni come setInterval() in modo da evitare di sprecare memoria e/o causare in ultimo memory leaks.

import { Observable, Observer, PartialObserver } from 'rxjs'; 

const observable: Observable<number> = Observable.create((observer: Observer<number>) => {
  let count = 0;

  observer.next(count);

  const intervalID = setInterval(() => observer.next(++count), 1000);

  return {
    unsubscribe() {
      clearInterval(intervalID);
    }
  }
});

const observer1: PartialObserver<number> = {
  next(value: number) { console.log('observer1.next: ', value);},
  error(error: any) { console.error('observer1.error ', error)}
}

const observer2: PartialObserver<number> = {
  next(value: number) { console.log('observer2.next: ', value);},
  error(error: any) { console.error('observer2.error ', error)}
}

const subscription1 = observable.subscribe(observer1);
const subscription2 = observable.subscribe(observer2);

setTimeout(() => subscription1.unsubscribe(), 4000);

setTimeout(() => subscription2.unsubscribe(), 8000);

Nell’esempio mostrato sopra, abbiamo un Observable che consegna subito un primo valore, mentre i successivi vengono inviati ad intervalli di circa un secondo uno dall’altro. Abbiamo quindi due Observer che si registrano tramite il metodo observable.subscribe() per ricevere i valori dell’Observable. Per entrambi salviamo un riferimento all’oggetto restituito ogni volta dalla Subscriber Function. In questo modo possiamo chiamare il metodo subscription.unsubscribe() che interrompe un’esecuzione dell’observable. Nel primo caso invochiamo subscription1.unsubscribe() dopo circa 4 secondi. Il secondo observer invece riesce a ricevere i primi nove valori inviati dall’observable e interrompe l’esecuzione di quest’ultimo dopo circa 8 secondi. (Cliccando sul pulsante sottostante ‘Run Project’, è possibile visualizzare il codice e un’anteprima dell’esempio riportato sopra)

Differenze fra Observable e Promise

A questo punto iniziamo ad avere un’idea su come funzionano gli Observable che per certi aspetti rappresentano uno strumento più completo e potente delle Promise. Prima di continuare il nostro viaggio ed illustrare il funzionamento di alcuni degli operatori più utili di RxJS che permettono di lavorare col tipo di dato Observable, riassumiamo in pochi punti quali sono le caratteristiche principali che differenziano gli Observable dalle Promise.

Promise

Un oggetto di tipo Promise rappresenta un valore che potrebbe non essere ancora disponibile, ma potrà esserlo in futuro. Al contrario degli Observable le Promise non sono ‘Lazy’, ma ‘Eager’. Con ciò si indica il fatto che una Promise inizia immediatamente ad eseguire le operazioni per cui è stata costruita, non appena viene invocato il costruttore. Una Promise lavora alla fine su un singolo valore. Quando quest’ultimo non è ancora disponibile, la Promise si trova in uno stato iniziale ‘Pending’ da cui può transitare esclusivamente in uno dei due stati finali: Fulfilled (se il valore finale diventa disponibile) o Rejected (se un errore ha impedito di determinare il valore finale). Una Promise infine non prevede nessun operatore che permette di riportarla dallo stato finale a quello iniziale e ritentare le operazioni necessarie alla determinazione del valore finale che avrebbe dovuto ottenere.

Observable

Al contrario delle Promise, per gli Observable possiamo fare le seguenti considerazioni:

  • gli Observable sono ‘Lazy’ in quanto, come visto in precedenza, la loro esecuzione viene di default rinviata fino al momento in cui un Observer si registra attraverso il metodo observable.subscribe();
  • possono lavorare su più valori nel corso del tempo, il che permette di tener traccia del progresso di una certa operazione;
  • è possibile annullare la loro esecuzione invocando il metodo observable.unsubscribe();
  • presentano numerosi operatori, alcuni dei quali sono simili a quelli di altre strutture dati come gli array.
  • dispongono di operatori che permettono di ripetere l’esecuzione dell’Observable. Ciò può risultare particolarmente utile in caso di errori.
Pubblicitร