back to top

Node.js Stream: Cosa sono e perché usarli

In questa lezione parleremo di uno dei concetti fondamentali di Node.js, ovvero gli Stream. Vengono ampliamente usati e permettono fra l’altro di ottimizzare le prestazioni delle applicazioni in cui si lavora con file e socket. Vedremo che gli Stream saranno particolarmente utili quando si lavora con file di grandi dimensioni. Parleremo quindi delle Pipe che useremo insieme agli Stream.

Schema Node.js stream

Cosa sono gli Stream

Uno Stream può essere visto come un flusso di dati, suddivisi in diversi pezzi (chunk), che scorre da un produttore a un consumatore in un certo arco di tempo. I diversi blocchi che costituiscono il flusso finale potrebbero essere di dimensione complessiva superiore a quella della memoria disponibile. Man mano che i diversi blocchi arrivano al consumatore, si può iniziare subito a lavorare sui pezzi di dati ricevuti senza dover necessariamente attendere la lettura/scrittura dell’intero blocco.

In Node.js esiste la ‘classe’ Stream che permette di lavorare con questo flusso di dati. Tutti gli oggetti di tipo Stream sono istanze di EventEmitter. Ciò significa, come abbiamo già visto, che possono emettere degli eventi e registrare delle funzioni listener con lo stesso meccanismo descritto negli articoli precedenti. Tutti gli Stream ‘nativi’ presenti in Node.js operano su oggetti di tipo String o Buffer. È possibile creare però degli Stream per lavorare su altri tipi di dati Javascript. Per far ciò, in fase di implementazione di un nuovo Stream dovremo esplicitamente indicare che opererà in modalità objectMode. (Vedremo a breve un esempio)

In Node.js esistono quattro tipi di Stream:

Tipi di stream in Node.js
  • Readable, ovvero degli stream da cui è possibile leggere i dati;
  • Writable, stream da usare per la scrittura dei dati;
  • Duplex, cioè degli stream che sono sia Readable che Writeble;
  • Transform, un caso particolare di Duplex Stream usato per trasformare i dati che lo attraversano.

Gli Stream di tipo Writable e Readable mantengono internamente un buffer che usano per regolare il flusso dei dati fra produttore e consumatore. Il numero massimo di byte (o oggetti in modalità objectMode) che possono essere potenzialmente inseriti nel buffer prima che sia saturo, è determinato dal valore dell’opzione highWaterMark passata come argomento in fase di creazione di un oggetto di tipo Stream.

Node.js schema funzionamento stream produttore consumatore

Perché usare gli Stream in Node.js

Le ragioni per cui conviene usare gli Stream sono diverse. Gli Stream, specie se usati con il meccanismo delle Pipe, permettono di scrivere i nostri programmi in maniera sintetica ed espressiva. Ma il motivo principale per cui gli Stream risultano veramente vantaggiosi, è la possibilità di leggere o scrivere dei dati in piccoli blocchi. (chunk) Quando infatti usiamo un metodo come fs.ReadFile(), l’intero contenuto del file viene caricato in memoria. Se si lavora con un numero elevato di file o se questi sono di grandi dimensioni, si rischia di usare una quantità notevole di memoria. Nei casi più estremi può capitare che l’applicazione necessiti di una quantità di memoria superiore alla dimensione massima consentita in Node.js (Ogni istanza di tipo Buffer può avere una dimensione massima pari a buffer.constants.MAX_LENGTH che assume valori differenti su computer con architettura a 32bit o 64bit, circa 1GB e 2GB rispettivamente). In questi casi, non sarebbe quindi possibile leggere l’interno file usando fs.ReadFile() (viene mostrato un messaggio di errore come quello mostrato in basso), mentre non ci sono assolutamente problemi ad eseguire la lettura usando gli Stream.

// Messaggio di Errore per file di elevata dimensione

RangeError: File size is greater than possible Buffer: 0x7fffffff bytes
at FSReqWrap.readFileAfterStat [as oncomplete] (fs.js:392:11)

Vediamo ora alcuni esempi in cui useremo gli stream di tipo Readable, Writable e Transform messi a disposizione dall’API di Node.js. Realizzeremo poi degli esempi in cui creeremo degli Stream che estendono le ‘classi’ base Readable, Writable e Transform.

Stream di tipo Readable

Negli articoli precedenti abbiamo usato in più occasioni il metodo asincrono fs.readFile() per la lettura di un file. Il metodo fs.readFile(), come abbiamo appena visto, legge un intero file in memoria. Per risolvere questo inconveniente possiamo adoperare gli Stream. In particolare useremo il metodo fs.createReadStream() per creare un oggetto di tipo fs.ReadStream.

Modalità Readable Stream: Paused Mode e Flowing Mode

Gli Stream di tipo Readable possono operare in due modalità diverse: una detta Paused Mode e una denominata Flowing Mode. Tutti gli stream di tipo Readable vengono avviati in Paused Mode. In questo caso bisogna invocare esplicitamente una funzione oggettoReadStream.read() per leggere dei blocchi di dati (chunk). In modalità Flowing Mode invece i dati vengono letti direttamente dal buffer interno e passati al consumatore usando il sistema degli eventi tipico degli Event Emitter. Si può passare in modalità Flowing Mode in uno dei seguenti modi:

  • Registrando una funzione listener per l’evento ‘data’ emesso dallo stream;
  • Invocando il metodo oggettoReadStream.resume();
  • Invocando il metodo oggettoReadStream.pipe(oggettoWriteStream) (parleremo a breve del metodo pipe);

Si può tornare nuovamente in modalità Paused Mode in uno dei seguenti modi:

  • invocando il metodo stream.pause(), se non è stato invocato precedentemente il metodo pipe();
  • se è stato invocato precedentemente il metodo pipe(), invocando oggettoStream.unpipe() o rimuovendo eventuali funzioni listener registrate per l’evento ‘data’ emesso dallo stream;

Esempi con Readable Stream

Vediamo subito un esempio. All’interno di una nuova cartella creiamo un file app.js in cui inseriremo il codice Javascript per la lettura di un secondo file ausiliare testo.txt (la dimensione di testo.txt è 141KB) nel quale abbiamo inserito del testo casuale.

// file app.js

const fs = require('fs');

let count = 0;

const readStream = fs.createReadStream(
  'testo.txt', 
  {highWaterMark: 16 * 1024}
);

readStream.on('data', (chunk) => {
  console.log(`Dimensione chunk #${count}: ${chunk.length} Bytes`);
  count++;
});

readStream.on('end', () => console.log('Fine Lettura!'));

Abbiamo importato il modulo nativo fs. Abbiamo quindi creato un oggetto di tipo fs.ReadStream invocando il metodo fs.createReadStream() che restituisce uno stream caratterizzato da un valore di highWaterMark (dimensione del buffer interno) pari a 64KB. Noi decidiamo di settare il valore di highWaterMark pari a 16KB. Dal momento che uno stream è un EventEmitter, registriamo una funzione listener per l’evento ‘data’. In questo modo lo stream passerà in modalità Flowing Mode e riceveremo i dati il più velocemente possibile. Al termine della lettura, stampiamo un messaggio nella shell. Lanciamo il programma con il comando node app.js e all’interno della shell vedremo un output come quello riportato in basso.

Dimensione chunk #0: 16384 Bytes
Dimensione chunk #1: 16384 Bytes
Dimensione chunk #2: 16384 Bytes
Dimensione chunk #3: 16384 Bytes
Dimensione chunk #4: 16384 Bytes
Dimensione chunk #5: 16384 Bytes
Dimensione chunk #6: 16384 Bytes
Dimensione chunk #7: 16384 Bytes
Dimensione chunk #8: 10102 Bytes
Fine Lettura!

Possiamo anche usare gli Stream insieme al modulo readline per leggere un file di testo una linea alla volta.

const fs = require('fs');
const readline = require('readline');

let count = 1;

const readLine = readline.createInterface({
  input: fs.createReadStream('testo.txt'),
});

readLine.on('line', (linea) => {
  console.log(`${count})t ${linea}`);
  count++;
});

Dopo aver importato i moduli fs e readline, otteniamo un’istanza di readline.Interface che riceverà il contenuto del file testo.txt usando un oggetto di tipo ReadStream. Verrà quindi stampata nella shell ogni linea letta dal file. (Il file viene letto una linea alla volta)

// output

1)       Nel mezzo del cammin di nostra vita
2)       mi ritrovai per una selva oscura,
3)       ché la diritta via era smarrita.
4)
5)       Ahi quanto a dir qual era è cosa dura
6)       esta selva selvaggia e aspra e forte
7)       che nel pensier rinova la paura!

Vediamo ora come realizzare una nostra implementazione di Readable Stream. In questo caso creeremo tre file all’interno di una nuova cartella. Un primo file stream_giocatori.js contiene l’implementazione del nuovo stream di tipo Readable. All’interno di un secondo file app.js useremo lo stream creato. Infine, utilizziamo i dati presenti all’interno del file giocatori.json relativi ai sei acquisti più costosi della storia del calciomercato. All’interno del file app.js useremo lo stream in modalità Pause mode.

// giocatori.json
[
  {
    "nome": "Neymar",
    "ruolo": "Attaccante",
    "dataNascita": "1992-02-05",
    "nazione": "Brasile",
    "prezzo": "222M",
    "squadraOrigine": {
      "campionato": "La Liga",
      "nazione": "Spagna",
      "nomeSquadra": "Barcellona"
    },
    "squadraDestinazione": {
      "campionato": "Ligue 1",
      "nazione": "Francia",
      "nomeSquadra": "Paris Saint-Germain"
    }
  },
  {
    "nome": "Kylian Mbappé",
    "ruolo": "Attaccante",
    "dataNascita": "1998-12-20",
    "nazione": "Francia",
    "prezzo": "180M",
    "squadraOrigine": {
      "campionato": "Ligue 1",
      "nazione": "Francia",
      "nomeSquadra": "Monaco"
    },
    "squadraDestinazione": {
      "campionato": "Ligue 1",
      "nazione": "Francia",
      "nomeSquadra": "Paris Saint-Germain"
    }
  },
  {
    "nome": "Ousmane Dembélé",
    "ruolo": "Attaccante",
    "dataNascita": "1997-05-15",
    "nazione": "Francia",
    "prezzo": "105M",
    "squadraOrigine": {
      "campionato": "Bundesliga",
      "nazione": "Germania",
      "nomeSquadra": "Borussia Dortmund"
    },
    "squadraDestinazione": {
      "campionato": "La Liga",
      "nazione": "Spagna",
      "nomeSquadra": "Barcellona"
    }
  },
  {
    "nome": "Paul Pogba",
    "ruolo": "Centrocampista",
    "dataNascita": "1993-03-15",
    "nazione": "Francia",
    "prezzo": "105M",
    "squadraOrigine": {
      "campionato": "Serie A",
      "nazione": "Italia",
      "nomeSquadra": "Juventus"
    },
    "squadraDestinazione": {
      "campionato": "Premier League",
      "nazione": "Inghilterra",
      "nomeSquadra": "Manchester United"
    }
  },
  {
    "nome": "Gareth Bale",
    "ruolo": "Attaccante",
    "dataNascita": "1989-07-16",
    "nazione": "Galles",
    "prezzo": "100M",
    "squadraOrigine": {
      "campionato": "Premier League",
      "nazione": "Inghilterra",
      "nomeSquadra": "Tottenham Hotspur"
    },
    "squadraDestinazione": {
      "campionato": "La Liga",
      "nazione": "Spagna",
      "nomeSquadra": "Real Madrid"
    }
  },
  {
    "nome": "Cristiano Ronaldo",
    "ruolo": "Attaccante",
    "dataNascita": "1985-02-05",
    "nazione": "Portogallo",
    "prezzo": "94M",
    "squadraOrigine": {
      "campionato": "Premier League",
      "nazione": "Inghilterra",
      "nomeSquadra": "Manchester United"
    },
    "squadraDestinazione": {
      "campionato": "La Liga",
      "nazione": "Spagna",
      "nomeSquadra": "Real Madrid"
    }
  }
]

Chiameremo il nostro stream StreamGiocatori che estenderà la ‘classe’ base fs.Readable. Useremo la sintassi di ES2015.

// stream_giocatori.js

const { Readable } = require('stream');
const arrayGiocatori = require('./giocatori.json');

class StreamGiocatori extends Readable {
  constructor() {
    super({objectMode: true});

    this.giocatori = arrayGiocatori;
    this.numeroGiocatori = this.giocatori.length;
    this.indiceGiocatoreCorrente = 0;

  }
  /*
  * Il metodo _read() viene usato internamente da StreamGiocatori.
  * Non deve mai essere invocato dall'esterno. Tuttle le 'classi' che
  * estendono stream.Readable, devono implementare il metodo _read()
  */
  _read() {
    if (this.indiceGiocatoreCorrente < this.numeroGiocatori) {

      const giocatore = this.giocatori[this.indiceGiocatoreCorrente];

      console.log(`> Giocatore prelevato: [${this.indiceGiocatoreCorrente}] ${giocatore.nome}`);

      this.indiceGiocatoreCorrente++;

      // La funzione push() viene ereditata dalla classe base
      this.push(giocatore);

    } else {

      this.push(null);

    }
  }
}

module.exports = StreamGiocatori;

Importiamo il modulo stream e il file giocatori.json contenente i dati dei calciatori. La costante arrayGiocatori conterrà un riferimento a un array di oggetti javascript. (Node.js effettua automaticamente il parsing del file JSON) All’interno di StreamGiocatori definiamo il costruttore di stream.Readable, specificando che il nostro stream dovrà lavorare in modalità objectMode (Lavorerà con oggetti Javascript piuttosto che stringhe o buffer). Affinché StreamGiocatori possa funzionare, dovremo implementare una funzione particolare ereditata da stream.Readable. Si tratta del metodo _read() che non deve essere mai invocato al di fuori di StreamGiocatori. Sarà usato internamente dallo stream. Nel metodo _read() usiamo il metodo push(chunk), ereditato dalla classe base, per inserire i nuovi dati nel buffer interno da cui il consumatore preleverà i dati usando il metodo oggettoStream.read(). Quando la dimensione del buffer interno raggiunge il valore specificato da highWaterMark (ricordiamo che si tratta di un parametro dello Stream), lo stream smetterà temporaneamente di inserire nuovi dati nel buffer in attesa che il consumatore li prelevi e faccia posto per nuovi blocchi.

Schema funzionamento readable stream

All’intero del file app.js abbiamo invece inserito il codice del ‘consumatore’. In questo caso operiamo in modalità Pause Mode. Se avessimo voluto lavorare in modalità Flowing mode, sarebbe bastato registrare una funzione listener per l’evento ‘data’. In quest’ultimo caso non sarebbe stato necessario invocare streamGiocatori.read().

// app.js

const StreamGiocatori = require('./stream_giocatori');

const streamGiocatori = new StreamGiocatori();

let count = 0;

streamGiocatori.on('readable', () => {
  let giocatore;

  giocatore = streamGiocatori.read();

  while (giocatore !== null) {
    console.log(`## app.js ha ricevuto: [${count++}] ${giocatore.nome}`);
    giocatore = streamGiocatori.read();
  }
});

streamGiocatori.on('end', () => {
  console.log('Fine lettura!')
})

In app.js creiamo un’istanza di StreamGiocatori che inizierà a inserire nel buffer interno i dati letti dal file.json. Quando ci sono abbastanza dati all’interno del buffer, viene emesso un evento ‘readable’. A quel punto iniziamo a leggere i dati dal buffer interno con la funzione streamGiocatori.read() (notate che NON è presente il simbolo _ davanti al nome read()). Quando non ci sono più oggetti all’interno del buffer, viene emesso un nuovo evento ‘end’ che useremo per segnalare la fine della lettura. Lanciando node app.js, visualizzeremo nella shell una sequenza simile a quella sottostante.

> Giocatore prelevato: [0] Neymar
> Giocatore prelevato: [1] Kylian Mbappé
## app.js ha ricevuto: [0] Neymar
> Giocatore prelevato: [2] Ousmane Dembélé
## app.js ha ricevuto: [1] Kylian Mbappé
> Giocatore prelevato: [3] Paul Pogba
## app.js ha ricevuto: [2] Ousmane Dembélé
> Giocatore prelevato: [4] Gareth Bale
## app.js ha ricevuto: [3] Paul Pogba
> Giocatore prelevato: [5] Cristiano Ronaldo
## app.js ha ricevuto: [4] Gareth Bale
## app.js ha ricevuto: [5] Cristiano Ronaldo
Fine lettura!

Stream di tipo Writable

È possibile creare uno stream di tipo Writable in maniera simile a quanto abbiamo già visto per gli stream di tipo Readable. Esiste, per esempio, il metodo fs.createWriteStream() che può essere usato per creare un nuovo oggetto di tipo WriteStream e scrivere dati in un file usando gli Stream.

var fs = require('fs');
var writeStream = fs.createWriteStream('output.txt');
writeStream.write('Prima rigan');
writeStream.write('Seconda rigan');
writeStream.write('Terza rigan');
writeStream.end('-- FINE DEL FILE --');

Nell’esempio in alto verrà creato un nuovo file con il contenuto specificato attraverso le chiamate ai metodi write() e end()

Anche in questo caso è possibile realizzare uno Stream di tipo Writable personalizzato estendendo stream.Writable. Useremo ancora una volta il file giocatori.json, ma modifichiamo leggermente il file stream_giocatori.js per segnalare quando sono stati letti tutti i giocatori. Creiamo poi un nuovo file writable_stream_giocatori.js in cui implementiamo uno stream di tipo Writable.

// stream_giocatori.js

const { Readable } = require('stream');
const arrayGiocatori = require('./giocatori.json');

class StreamGiocatori extends Readable {
  constructor() {
    super({objectMode: true});

    this.giocatori = arrayGiocatori;
    this.numeroGiocatori = this.giocatori.length;
    this.indiceGiocatoreCorrente = -1;

  }
  /*
  * La funzione _read() viene usata internamente da StreamGiocatori.
  * Non deve mai essere invocata dall'esterno. Tuttle le 'classi' che
  * estendono stream.Readable devono implementare il metodo _read()
  */
  _read() {

    this.indiceGiocatoreCorrente++;

    if (this.indiceGiocatoreCorrente < this.numeroGiocatori) {

      const giocatore = this.giocatori[this.indiceGiocatoreCorrente];

      console.log(`READ > Giocatore prelevato: [${this.indiceGiocatoreCorrente}] ${giocatore.nome}`);

      // La funzione push() viene ereditata dalla classe base
      this.push(giocatore);

    } else if (this.indiceGiocatoreCorrente === this.numeroGiocatori) {
      console.log(`--- Letti tutti i Giocatori ---`);

      this.push({done: true});
    } else {
      this.push(null);
    }
  }
}

module.exports = StreamGiocatori;

Così come abbiamo dovuto implementare il metodo _read() per gli stream di tipo Readable, allo stesso modo dobbiamo implementare il metodo _write() per gli stream di tipo Writable. In questo particolare caso creiamo un nuovo oggetto contenente il nome e il prezzo del cartellino del giocatore e ci limitiamo a stamparlo nella console. Per segnalare che abbiamo completato una scrittura, dobbiamo invocare la funzione callback che Node.js passa come terzo argomento. Il metodo _final() verrà invocato solo al termine dell’intera operazione di scrittura.

// writable_stream_giocatori.js

const { Writable } = require('stream');

class WritableStreamGiocatori extends Writable {
  constructor() {
    super({objectMode: true});
  }
  _write(chunk, encoding, callback) {
    // chunk è un oggetto Javascript 
    // perché abbiamo settato objectMode: true nel costruttore
    if (!chunk.done) {

      const giocatore = {nome: chunk.nome, prezzo: chunk.prezzo};

      console.log(`WRITE > ${JSON.stringify(giocatore)}`);
    }
    // invochiamo la callback per segnalare che abbiamo 
    // effettuato una scrittura
    callback();
  }
  _final(callback) {
    console.log('FINAL > Sto per terminare la scrittura!');
    callback();
  }
}

module.exports = WritableStreamGiocatori;

Creiamo quindi un file main.js all’interno del quale inseriamo il seguente frammento di codice.

// main.js

const WritableStreamGiocatori = require('./writable_stream_giocatori');
const StreamGiocatori = require('./stream_giocatori');

const writableStreamGiocatori = new WritableStreamGiocatori();
const streamGiocatori = new StreamGiocatori();

writableStreamGiocatori.on('finish', () => {
  console.log('## FINE SCRITTURA ##');
});

streamGiocatori.on('end', () => {
  console.log('Fine lettura!')
})

streamGiocatori.on('data', (giocatore) => {
  if (giocatore.done) {
    writableStreamGiocatori.end();
  } else {
    writableStreamGiocatori.write(giocatore);
  }
});

Avviamo la lettura in modalità Flowing mode. Ogni volta che viene letto un oggetto contenente le informazioni di un giocatore, usiamo il metodo writableStreamGiocatori.write(giocatore) per stampare le informazioni nella shell. Quando sono stati letti tutti gli oggetti, invochiamo il metodo writableStreamGiocatori.end(). Di seguito riportiamo l’output dell’esecuzione.

READ > Giocatore prelevato: [0] Neymar
READ > Giocatore prelevato: [1] Kylian Mbappé
WRITE > {"nome":"Neymar","prezzo":"222M"}
READ > Giocatore prelevato: [2] Ousmane Dembélé
WRITE > {"nome":"Kylian Mbappé","prezzo":"180M"}
READ > Giocatore prelevato: [3] Paul Pogba
WRITE > {"nome":"Ousmane Dembélé","prezzo":"105M"}
READ > Giocatore prelevato: [4] Gareth Bale
WRITE > {"nome":"Paul Pogba","prezzo":"105M"}
READ > Giocatore prelevato: [5] Cristiano Ronaldo
WRITE > {"nome":"Gareth Bale","prezzo":"100M"}
--- Letti tutti i Giocatori ---
WRITE > {"nome":"Cristiano Ronaldo","prezzo":"94M"}
Fine lettura!
FINAL > Sto per terminare la scrittura!
## FINE SCRITTURA ##

Node.js Stream e il meccanismo delle Pipe

Nell’esempio appena visto abbiamo usato lo stream streamGiocatori in modalità Flowing mode. Abbiamo registrato una funzione listener per l’evento ‘data’ che viene invocata ogni volta è disponibile un nuovo oggetto contenente le informazioni di un calciatore. All’interno di questa funzione passiamo i nuovi dati ricevuti allo stream writableStreamGiocatori che invoca il metodo write(). Questo tipo di operazioni è piuttosto frequente quando si lavora con gli Stream. Per questo motivo gli stream di tipo Readable contengono il metodo readableStream.pipe(writableStream). In basso riportiamo quindi due frammenti di codice che sono sostanzialmente equivalenti. (ovviamente l’implementazione del metodo pipe() è più sofisticata. Chi fosse interessato può visualizzare il codice su Github)

// ...

// evento 'data'
readableStream.on('data', (chunk) => {
  writableStream.write(chunk);
});

// pipe()
readableStream.pipe(writableStream);

// ...

Come detto, il metodo readable.pipe(writable[, options]) è disponibile per tutti gli oggetti di tipo Readable Stream e accetta come primo argomento uno stream di tipo Writable (stream Duplex e Transform sono ovviamente consentiti dal momento che sono sia Readable che Writable). È importante sottolineare che il metodo pipe() restituisce un riferimento allo stream passato come primo argomento. Possiamo così concatenare invocazioni consecutive del metodo pipe();

const readable = ottieniReadableStream();
const writable = ottieniWritableStream();
const transform = ottieniTransformStream();

readable.pipe(transform).pipe(writable);

// o in maniera equivalente

readable.pipe(transform);
transform.pipe(writable);

Stream di tipo Transform

schema Transform stream

Gli Stream di tipo Transform sono un caso particolare degli Stream di tipo Duplex. Li possiamo usare per modificare dei dati man mano che attraversano lo stream. Node.js fornisce anche la ‘classe’ stream.PassThrough la quale è una banale implementazione di Transform stream, che si limita a passare semplicemente i byte che riceve in ingresso in uscita. Oggetti di tipo stream.PassThrough possono essere utili in fase di test dell’applicazione.

Vediamo subito un semplice esempio in cui impieghiamo il meccanismo delle pipe insieme a tre diversi stream. Leggiamo il contenuto di un file attraverso uno Stream di tipo Readable, comprimiamo il contenuto del file letto usando uno Stream di tipo Transform e infine creiamo un nuovo file contenente il contenuto compresso grazie a uno stream di tipo Writable.

const zlib = require('zlib');
const fs = require('fs');

const gzip = zlib.createGzip();

const readable = fs.createReadStream('testo.txt');
const writable = fs.createWriteStream('testo.txt.gz');

readable.pipe(gzip).pipe(writable);

In poche linee di codice abbiamo generato un file compresso ‘testo.txt.gz’ (3KB) a partire da un file di testo che nel nostro esempio aveva una dimensione di 155KB.

Per concludere, riprendiamo l’esempio visto in precedenza che usava il file giocatori.json. Dopo aver creato un Readable Stream che legge dal file JSON un oggetto Javascript alla volta e uno stream di tipo Writable che si limita a stampare il nome e il prezzo del giocatore nella shell, creiamo uno stream di tipo Transform che seleziona solo i calciatori di nazionalità francese e uno che trasforma il nome di ogni calciatore in sole lettere maiuscole. Per far ciò estendiamo stream.Transform e implementiamo il metodo _transform() derivato dalla classe base.

// NomeMaiuscolo.js

const { Transform } = require('stream');

class NomeMaiuscolo extends Transform {
  constructor() {
    super({objectMode: true});
  }
  _transform(chunk, encoding, callback) {
    if (!chunk.done) {
      chunk.nome = chunk.nome.toUpperCase();
      this.push(chunk);
    }
    callback();
  }
}

module.exports = NomeMaiuscolo;

Abbiamo creato il primo stream di tipo Transform che andremo a usare nel nostro esempio. Il metodo _transform() si occupa di ricevere i dati che vengono passati allo Stream, li modifica e li inserisce in un buffer interno attraverso il metodo push(). I dati così inseriti nel buffer verranno letti da un altro stream. In questo specifico esempio, riceviamo un oggetto Javascript alla volta, modifichiamo i caratteri del nome in modo che siano presenti solo lettere maiuscole e invochiamo il metodo push() a cui passiamo come argomento l’oggetto modificato.

// SelezionaGiocatoriFrancesi.js

const { Transform } = require('stream');

class SelezionaGiocatoriFrancesi extends Transform {
  constructor() {
    super({objectMode: true});
  }
  _transform(chunk, encoding, callback) {
    if (!chunk.done && chunk.nazione === "Francia") {
      this.push(chunk);
    }
    callback();
  }
}

module.exports = SelezionaGiocatoriFrancesi;

Nell’esempio in alto, creiamo uno stream che permetterà di filtrare i calciatori e selezionare solo quelli di nazionalità francese.

// index.js

const WritableStreamGiocatori = require('./writable_stream_giocatori');
const StreamGiocatori = require('./stream_giocatori');
const SelezionaGiocatoriFrancesi = require('./SelezionaGiocatoriFrancesi');
const NomeMaiuscolo = require('./NomeMaiuscolo');

const writableStreamGiocatori = new WritableStreamGiocatori();
const streamGiocatori = new StreamGiocatori();
const selezionaGiocatoriFrancesi = new SelezionaGiocatoriFrancesi();
const nomeMaiuscolo = new NomeMaiuscolo();

const onPipe = (produttore) => {
  const streamObjectName = produttore.constructor.name;
  console.log(
    '|~| ' + streamObjectName + 
    ' sta per inviare dati usando il metodo pipe()'
  );
};

nomeMaiuscolo.on('pipe', onPipe);

writableStreamGiocatori.on('pipe', onPipe);

writableStreamGiocatori.on('finish', () => {
  console.log('### FINE SCRITTURA ###');
});

streamGiocatori.on('end', () => {
  console.log('### FINE LETTURA ###')
})

streamGiocatori
  .pipe(selezionaGiocatoriFrancesi)
  .pipe(nomeMaiuscolo)
  .pipe(writableStreamGiocatori);

Nel file index.js importiamo tutti gli Stream implementati finora. Creiamo un’istanza di ognuno. Con il metodo pipe() concateniamo i vari Stream. Lo stream di tipo Readable streamGiocatori legge un oggetto alla volta e lo passa al primo stream di tipo Transform che lascerà passare solo gli oggetti relativi ai giocatori francesi i cui nomi saranno poi modificati dallo strem nomeMaiuscolo. Infine writableStreamGiocatori stampa i giocatori selezionati nella console in cui possiamo vedere tutti i vari passaggi che si verificano nel corso dell’esecuzione.

// node index.js

|~| SelezionaGiocatoriFrancesi sta per inviare dati usando il metodo pipe()
|~| NomeMaiuscolo sta per inviare dati usando il metodo pipe()
READ > Giocatore prelevato: [0] Neymar
READ > Giocatore prelevato: [1] Kylian Mbappé
READ > Giocatore prelevato: [2] Ousmane Dembélé
WRITE > {"nome":"KYLIAN MBAPPÉ","prezzo":"180M"}
READ > Giocatore prelevato: [3] Paul Pogba
WRITE > {"nome":"OUSMANE DEMBÉLÉ","prezzo":"105M"}
READ > Giocatore prelevato: [4] Gareth Bale
WRITE > {"nome":"PAUL POGBA","prezzo":"105M"}
READ > Giocatore prelevato: [5] Cristiano Ronaldo
--- Letti tutti i Giocatori ---
### FINE LETTURA ###
FINAL > Sto per terminare la scrittura!
### FINE SCRITTURA ###

Per comodità possiamo usare il meccanismo delle pipe nella shell e il comando grep per semplificare l’output e verificare che writableStreamGiocatori stampa solo i nomi di giocatori francesi.

// node index.js | grep WRITE

WRITE > {"nome":"KYLIAN MBAPPÉ","prezzo":"180M"}
WRITE > {"nome":"OUSMANE DEMBÉLÉ","prezzo":"105M"}
WRITE > {"nome":"PAUL POGBA","prezzo":"105M"}

Conclusioni

Dopo aver parlato degli Stream, nel prossimo inizieremo a parlare dell’oggetto global ed esploreremo altre interessanti funzionalità di Node.js.

Pubblicitร