1
votes

La bibliothèque de "demande" Node.js prend-elle en charge un flux de réponse asynchrone?

Je suis un peu nouveau dans les bibliothèques Node.js et j'essaie de comprendre comment utiliser l'itération asynchrone sur un flux de réponse HTTP. Mon objectif général est de lire un grand flux de réponse et de le traiter au fur et à mesure que les morceaux arrivent, actuellement via une fonction de générateur. Je ne peux pas stocker l'intégralité de la réponse en mémoire pour traitement.

J'utilise la bibliothèque request pour exécuter la requête HTTP comme suit.

const request = require("request");

// contrived chunk-by-chunk stream processing 
async function* getChunks(stream) {
  for await (const chunk of stream) {
    yield chunk[0];
  }
}

async function doWork() {
  var response = request.get("https://pastebin.com/raw/x4Nn0Tby");
  for await (c of getChunks(response)) {
    console.log(c);
  }
}

Quand j'exécute doWork () , j'obtiens une erreur indiquant que la variable stream de getChunks () n'est pas async-iterable. p>

TypeError: le flux n'est pas itérable de manière asynchrone

C'est surprenant, car je pensais que tous les flux lisibles sont généralement itérables de manière asynchrone, et que la bibliothèque de requêtes renvoie un flux lorsqu'aucun rappel n'est fourni. Lorsque je remplace request.get (...) par fs.createReadStream (...) dans un fichier local, tout fonctionne comme prévu.

Peut-être que la bibliothèque request ne prend pas en charge cela. Si tel est le cas, que dois-je faire pour traiter les flux de réponse HTTP via une itération asynchrone?

Utiliser Node.js 11.13 et request 2.88.0.

p>


0 commentaires

4 Réponses :


0
votes

Il semblerait que vous deviez utiliser d'autres alternatives comme elles l'ont mentionné dans la documentation du module request que vous pouvez trouver ici https://www.npmjs.com/package/request

async function doMyWork() {
try {
 const response = await myOwnRequest(url); 
 } catch (e) {
   console.log ('the error', e);
 }  
}

function myOwnRequest(url) {
  return new Promise(function (resolve, reject) {
   const resp = request.get(url);
   if(resp) {
    resolve();
   } else {
     reject();
   }
});
}


4 commentaires

Merci pour la suggestion. Je ne suis pas sûr d'avoir besoin d'une Promise retournée par request.get () car j'ai vraiment juste besoin d'opérer sur le flux résultant (que j'ai supposé pris en charge attendre nativement comme fs.createReadStream () ). Si une Promise est retournée (via request-promise-native), alors l'appel de wait request.get (...) résout le flux de réponse entier en une chaîne.


@Steve Guildi Veuillez voir si ma réponse mise à jour peut vous aider.


@SteveGuidi était la réponse ci-dessus (mise à jour) une aide? faites-moi savoir si vous voulez que je mette à jour avec quoi que ce soit


Il y a une petite erreur dans myOwnRequest - resolution () doit prendre l'objet qui est retourné, à savoir resp . Sinon, la promesse se comporte comme prévu. Malheureusement, le résultat de request.get (...) n'est tout simplement pas du tout asynchrone, qu'il soit renvoyé via Promise ou directement. J'ai fini par utiliser la bibliothèque axios à la place, qui renvoie des flux qui sont itérables de manière asynchrone (voir ma réponse pour un exemple).



2
votes

J'ai encore expérimenté les bibliothèques request et request-promise-native et je ne pense pas que cela soit possible avec l'implémentation actuelle. Le flux résultant ne semble pas du tout itérable de manière asynchrone. De plus, une implémentation correcte devra attendre le retour de la réponse avant de traiter le flux (comme suggéré par @ Réponse de JBone ). Mais si vous appelez wait request.get (...) , vous récupérez tout le contenu de la réponse, ce qui n'est pas souhaitable pour les réponses volumineuses.

const axios = require("axios");

async function doWork() {
  var response = await axios.request({
    method: "GET",
    url: "https://pastebin.com/raw/x4Nn0Tby",
    responseType: "stream",
  });

  for await (c of getChunks(response.data)) {  // async-iteration over response works as expected.
    console.log(c);
  }
}

Ma solution à ce problème était de remplacer l'utilisation de request et request-promise-native par la bibliothèque axios . Les bibliothèques sont fonctionnellement similaires, mais axios vous permet de spécifier qu'une requête doit être résolue en un flux; comme prévu, le flux peut être itéré de manière asynchrone.

const r = require("request");
const rpn = require("request-promise-native");

// contrived chunk-by-chunk stream processing 
async function* getChunks(stream) {
  for await (const chunk of stream) {
    yield chunk[0];
  }
}

async function doWork() {
  const url = "https://pastebin.com/raw/x4Nn0Tby";
  const response = r.get(url);         // returns a non-async-iterable object.
  const response2 = await rp.get(url); // returns the contents of url

  for await (c of getChunks(response)) {  // yields response not async-iterable error.
    console.log(c);
  }
}


2 commentaires

Quelle version d'axios utilisez-vous? Avec 0.19.0 j'obtiens la réponse n'est pas asynchrone . Je n'ai trouvé aucun document indiquant qu'axios soutiendrait cela. Des idées?


@mattpr: J'utilisais 0.18.0 lorsque j'ai écrit l'exemple de code ci-dessus. Si je me souviens bien, vous devez définir le type de réponse sur stream pour que cela fonctionne correctement.



1
votes

Réponse simple: non, ce n'est pas le cas. Vous souhaiterez peut-être utiliser un wrapper basé sur des promesses autour de la requête , tel que request-promise , qui fonctionne alors aussi avec async / await .

Détails: veuillez noter que request a été obsolète par son créateur , et sera donc interrompu. Cela signifie que tôt ou tard, vous devrez probablement passer à une autre solution, telle que axios , superagent ou aiguille , pour n'en nommer que quelques-uns.

Bien sûr, c'est à vous d'évaluer ces modules et de déterminer lequel correspond le mieux à vos besoins, mais ma recommandation personnelle serait de commencer par axios , car j'ai eu de très bonnes expériences avec lui dans le passé, cependant, YMMV.


0 commentaires

0
votes

L'option stream d'axios n'a pas fonctionné pour moi en utilisant l'exemple de code dans la réponse ci-dessus sur axios 0.19.0. Peut-être un problème entre la chaise et le clavier, mais dans tous les cas ... voici une approche alternative utilisant request .

J'ai fini par adapter le streaming de requêtes à un générateur asynchrone (avec un tampon entre les deux bien sûr). Cela permet une interface de type "streaming" où les lectures et écritures des données peuvent être entrelacées ... cela ne garantit pas une faible consommation de mémoire. demandez des tubes ("push") à notre Writable aussi vite que possible et il n'y a pas moyen pour nous de mettre cela en pause ou de le retourner dans une interface de type "pull" (pour autant que je sache). Donc, si nous lisons les données du tampon plus lentement que les données sont écrites: le tampon deviendra très gros et l'utilisation de la mémoire sera élevée.

Donc, s'il est essentiel de réduire l'utilisation de la mémoire et que vous analyser de gros fichiers à partir de sources http ... puis probablement faire un peu de surveillance / rapport sur la taille de la mémoire tampon pendant le "streaming" afin de voir si votre code de consommation est plus rapide ou plus lent que le flux afin que vous sachiez si le tampon deviendra énorme ou restera petit . Bien sûr, si vous testez avec un serveur http très lent ... alors tous les paris sont ouverts.

Cela pourrait éventuellement être résolu en définissant une taille de tampon fixe et en bloquant _write il y a plus de lecture (faisant de la place dans le tampon) ... c'est-à-dire que la requête doit attendre pour écrire plus de données dans le tube. Cependant, la requête peut tamponner en interne ... donc cela n'aidera pas à la consommation de mémoire si les données s'accumulent à la fin des requêtes de toute façon. Devrait vérifier.

Exemple de code:

const httpModule = require('./path/to/above/module');
var httpGen = httpModule('https://www.google.com'),
    chunk;
for await (chunk of httpGen) {
    // do something with chunk.
}

Ensuite, utilisez-le comme ceci:

const request = require('request'),
    Writable = require('stream').Writable,
    EventEmitter = require('events');

module.exports = function (url, MAX_BYTES=1024) {
    var response = new ResponseBuffer(MAX_BYTES);

    request
        .get(url)
        .on('error', function(err) { throw err; })
        .pipe(response)
        .on('error', function(err) { throw err; });

    return response.reader();
};

class ResponseBuffer extends Writable {
    constructor (MAX_BYTES=1024) {
        super();
        this.buffer = '';
        this.open = true;
        this.done = null;  // callback to call when done reading.
        this.MAX_BYTES = MAX_BYTES;
        this.events = new EventEmitter();
    }
    _write(chunk, enc, next) {
        this.buffer += chunk;
        this.events.emit('data');
        next();
    }
    _final(done) {
        this.open = false; // signal to reader to return after buffer empty.
        return done();
    }
    async * reader () {
        while (true) {
            if (this.buffer.length == 0) {
                // buffer empty and Writable !open. return.
                if (!this.open) { return; }
                else { // buffer empty.  wait for data.
                    await new Promise(resolve => this.events.once('data', resolve));
                }
            }
            let read_bytes = this.buffer.length < this.MAX_BYTES ? this.buffer.length : this.MAX_BYTES;
            yield this.buffer.slice(0, read_bytes);
            this.buffer = this.buffer.slice(read_bytes);
        }
    }
}

Une approche alternative (si vous êtes préoccupé par l'utilisation de la mémoire en particulier) consiste simplement à télécharger sur le disque (streaming vers le rédacteur de fichiers), puis à lire de manière incrémentielle hors du disque (vous pouvez asynchroniser un fs.createReadStream (...) )


0 commentaires