1
votes

NodeJS + Graphql: Comment puis-je exécuter du code même après avoir envoyé une réponse?

J'ai un point de terminaison qui reçoit des fichiers et crée une tâche d'arrière-plan pour télécharger ces fichiers sur S3.

Afin de mettre en arrière-plan les téléchargements de fichiers, j'utilise Agenda ( https://github.com/agenda/agenda ). La seule limitation est que je dois stocker le fichier dans un format pris en charge par MongoDB (qui est ce qu'Agenda utilise sous le capot). Pour ce faire, je convertis le fichier en tant que tampon avant de l'envoyer à Agenda.

Voici mon code:

Uploading to S3...
Uploading to S3...
Uploading to S3...
ERROR OCCURRED:  BadRequestError: Request disconnected during file upload stream parsing.
    at IncomingMessage.<anonymous> (/Users/edmundmai/Documents/src/acne-tracker/server/node_modules/graphql-upload/lib/processRequest.js:300:35)
    at Object.onceWrapper (events.js:291:20)
    at IncomingMessage.emit (events.js:203:13)
    at IncomingMessage.EventEmitter.emit (domain.js:471:20)
    at resOnFinish (_http_server.js:614:7)
    at ServerResponse.emit (events.js:208:15)
    at ServerResponse.EventEmitter.emit (domain.js:471:20)
    at onFinish (_http_outgoing.js:649:10)
    at onCorkedFinish (_stream_writable.js:678:5)
    at afterWrite (_stream_writable.js:483:3)
    at processTicksAndRejections (internal/process/task_queues.js:77:11) {
  message: 'Request disconnected during file upload stream parsing.',
  expose: true,
  statusCode: 499,
  status: 499
}
Done!

Ceci est rapide sur mon serveur de développement local, mais prend beaucoup de temps à s'exécuter en production à cause du tampon. Les lignes qui suivent console.log (Configuration du tampon ...) prennent beaucoup de temps.

Ce que je voudrais faire est:

  1. Créer et renvoyer un tableau de progressPics, un pour chaque élément du tableau pics
  2. Faites le travail du tampon après l'envoi de la réponse pour qu'elle ne retarde pas le front-end.

Est-ce possible?

============ UPDATE ==========

Donc si Je n'attends pas la promesse, il se plaint que la requête s'est déconnectée avant la fin du buffer:

const uploadProgressPic = async ({
  stream,
  progressPicId,
  userId,
  bodyPart,
  models,
}) => {
  try {
    console.log("Uploading to S3...");
    const { Location: url, Key: key, Bucket: bucket } = await S3.upload({
      stream,
      folder: userId,
      filename: `${progressPicId}-${bodyPart}.jpg`,
    });

    if (url && key && bucket) {
      await models.ProgressPic.findOneAndUpdate(
        { _id: progressPicId },
        { $set: { url, key, bucket } },
        { new: true, useFindAndModify: false }
      );
      console.log("Done!");
    }
  } catch (error) {
    console.log("ERROR OCCURRED: ", error);
  }
};

export default {
  Mutation: {
    batchCreateProgressPics: combineResolvers(
      isAuthenticated,
      async (parent, { pics }, { models, currentUser }) => {
        return pics.map(async (pic, i) => {
          const { file, bodyPart, localPath } = pic;
          const progressPic = await models.ProgressPic.create({
            bodyPart,
            user: currentUser.id,
            url: localPath,
          });

          const { createReadStream } = await file;
          const stream = createReadStream();

          uploadProgressPic({
            stream,
            progressPicId: progressPic.id,
            userId: currentUser.id,
            bodyPart,
            models,
          });

          return progressPic;
        });
      }
    ),
  },
};

Erreur:

ERROR OCCURRED:  BadRequestError: Request disconnected during file upload stream parsing.
    at IncomingMessage.<anonymous> (/Users/edmundmai/Documents/src/acne-tracker/server/node_modules/graphql-upload/lib/processRequest.js:300:35)
    at Object.onceWrapper (events.js:291:20)
    at IncomingMessage.emit (events.js:203:13)
    at IncomingMessage.EventEmitter.emit (domain.js:471:20)
    at resOnFinish (_http_server.js:614:7)
    at ServerResponse.emit (events.js:208:15)
    at ServerResponse.EventEmitter.emit (domain.js:471:20)
    at onFinish (_http_outgoing.js:649:10)
    at onCorkedFinish (_stream_writable.js:678:5)
    at afterWrite (_stream_writable.js:483:3)
    at processTicksAndRejections (internal/process/task_queues.js:77:11) {
  message: 'Request disconnected during file upload stream parsing.',
  expose: true,
  statusCode: 499,
  status: 499
}

========== UPDATE 2 =============

Même en essayant de 1) le simplifier et 2) déplacer createReadStream () en dehors de uploadProgressPic affiche la même erreur:

const uploadProgressPic = async ({ file, progressPicId, userId, bodyPart }) => {
  try {
    const { createReadStream } = await file;
    const stream = createReadStream();

    console.log("Setting up buffer...");
    const buffer = await new Promise((resolve, reject) => {
      var buffers = [];
      stream.on("data", function(data) {
        buffers.push(data);
      });
      stream.on("end", function() {
        const everything = Buffer.concat(buffers);
        resolve(everything);
      });
      stream.on("error", function(e) {
        reject(e);
      });
    });
    console.log("Done.");

    console.log("Creating backgruond task...");
    Agenda.now("uploadProgressPic", {
      userId,
      progressPicId,
      filename: `${progressPicId}-${bodyPart}.jpg`,
      buffer,
    });
  } catch (error) {
    console.log("ERROR OCCURRED: ", error);
  }
};

export default {
  Mutation: {
    batchCreateProgressPics: combineResolvers(
      isAuthenticated,
      async (parent, { pics }, { models, currentUser }) => {
        return pics.map(async (pic, i) => {
          const { file, bodyPart, localPath } = pic;
          const progressPic = await models.ProgressPic.create({
            bodyPart,
            user: currentUser.id,
            url: localPath,
          });

          uploadProgressPic({
            file,
            progressPicId: progressPic.id,
            userId: currentUser.id,
            bodyPart,
          });

          return progressPic;
        });
      }
    ),
  },
};

Erreur:

  Mutation: {
    batchCreateProgressPics: combineResolvers(
      isAuthenticated,
      async (parent, { pics }, { models, currentUser }) => {
        return await Promise.all(
          pics.map(async (pic, i) => {
            const { file, bodyPart, localPath } = pic;
            const { createReadStream } = await file;
            const stream = createReadStream();

            console.log("Setting up buffer...");
            const buffer = await new Promise((resolve, reject) => {
              var buffers = [];
              stream.on("data", function(data) {
                buffers.push(data);
              });
              stream.on("end", function() {
                const everything = Buffer.concat(buffers);
                resolve(everything);
              });
              stream.on("error", function(e) {
                reject(e);
              });
            });

            const progressPic = await models.ProgressPic.create({
              bodyPart,
              user: currentUser.id,
              url: localPath,
            });

            console.log("Creating backgruond task...");
            Agenda.now("uploadProgressPic", {
              userId: currentUser.id,
              progressPicId: progressPic.id,
              filename: `${progressPic.id}-${bodyPart}.jpg`,
              buffer,
            });
            console.log("Done.");

            return progressPic;
          })
        );
      }
    ),
  },

Ce qui est drôle, c'est que je vois encore quelques Done! dans les journaux même si cela se plaint?


0 commentaires

3 Réponses :


0
votes

Je ne suis pas un expert en la matière mais j'ai une idée qui peut marcher et une théorie:

  • IDÉE : si vous avez affaire à un grand nombre d'images, votre problème peut provenir du fichier await Promise.all (). je vous recommande d'utiliser parallelLimit depuis async pour limiter les fonctions parallèles à être exécuté à la fois, sinon vous aurez un problème de performances.

  • THÉORIE : Vous pouvez peut-être libérer l'allocation de mémoire après chaque utilisation de Buffer pour éviter les problèmes de fuite de mémoire et rendre votre serveur plus performant.

Si je me trompe de toute façon, veuillez me corriger. Je m'intéresse moi-même à l'issue de ce problème.


0 commentaires

0
votes

N'attendez pas la promesse.

uploadProgressPic()

return models.ProgressPic.create({
  bodyPart,
  user: currentUser.id,
  url: localPath,
});

De cette façon, vous lancerez la création des tampons, mais n'attendrez pas réellement que le code s'exécute et créerez immédiatement le ProgressPic et renvoyez-la. Parce que l'appel à Agenda.now nécessite la valeur résolue de la promesse, nous la plaçons dans le rappel puis . Notez qu'il est important d'ajouter un catch également - si vous ne le faites pas, vous pourriez vous retrouver avec un rejet non géré.

Vous pouvez utiliser le catch pour enregistrer l'erreur et effectuer un nettoyage supplémentaire. Par exemple, vous pouvez créer le ProgressPic créé (dans ce cas, vous devez déplacer l'appel create au-dessus de la promesse de tampon afin de pouvoir référencer l'instance créée).

Si vous êtes comme moi et mourez un peu à l'intérieur à chaque fois que vous devez taper puis , vous pouvez extraire toute cette logique dans une fonction distincte:

const uploadProgressPic = async (/* parameters omitted for brevity */) => {
  try {
    const buffer = await new Promise(...)
    Agenda.now(...)
  } catch (error) {
    // Do whatever
  }
}


7 commentaires

Je vais essayer ça! L'exécution du code dans la promesse non attendue continuera-t-elle à s'exécuter même après l'envoi de la réponse?


Il semble donc que cela se déconnecte: ERREUR SURVENUE: BadRequestError: Demande déconnectée lors de l'analyse du flux de téléchargement de fichiers. à IncomingMessage. (/ Users / edmundmai / Documents / src / acne-tracker / server / node_mod‌ ules / graphql-upload / ‌ lib / processRequest.j‌ s: 300: 35)


Hmm c'est problématique. En regardant la documentation , ce qui suit est appelé: "Utilisez uniquement createReadStream () avant le résolveur renvoie; les appels tardifs (par exemple dans une fonction asynchrone ou un rappel inattendu) génèrent une erreur. Les flux existants peuvent toujours être utilisés après l'envoi d'une réponse, bien qu'il y ait peu de raisons valables de ne pas attendre leur achèvement. "


@bigpotato J'essaierais de déplacer les lignes wait file et createReadStream () en dehors de la fonction uploadProgressPic et de simplement passer le flux à la fonction à la place.


Je l'ai essayé mais il montre toujours une erreur. Mise à jour de la question


Crud. Le renvoi d'un tableau de promesses devrait être identique à l'utilisation de Promise.all, mais vous pouvez également essayer d'encapsuler la carte dans Promise.all et voir si cela fait une différence: retournez Promise.all (pics.map (async (pic, i) => {


continuons cette discussion dans le chat .



0
votes

J'ai essayé une variété de choses qui ont fini par ne pas fonctionner parce que la création du tampon était trop lente en production pour une raison quelconque. Ma solution ultime qui fonctionne réellement était de diviser le téléchargement en deux requêtes:

Backend:

Requête n ° 1: Créer une image de progression, en utilisant le chemin du fichier local comme URL Requête n ° 2: Téléchargez le fichier et mettez à jour la photo de progression

  const createAndUploadProgressPics = async photos => {
    const {
      data: { createProgressPics: progressPics },
    } = await createProgressPics({
      variables: {
        pics: photos.map((p, i) => ({
          bodyPart: BODY_PARTS[i],
          localPath: p.uri,
        })),
      },
    });

    updateProgressPics({
      variables: {
        pics: progressPics.map(({ id, bodyPart }, i) => {
          return {
            progressPicId: id,
            filename: `${id}-${bodyPart}.jpg`,
            file: photos[i],
          };
        }),
      },
    });

    onFinish(progressPics);
    navigation.goBack();
  };

Le frontend attendra alors la réponse de la requête n ° 1 et enverra la requête n ° 2 mais ignorera la réponse. il peut simplement revenir immédiatement.

import { combineResolvers } from "graphql-resolvers";
import { isAuthenticated } from "./authorization";

import S3 from "../services/s3";

export default {
  Query: {
    progressPics: combineResolvers(
      isAuthenticated,
      async (parent, args, { models, currentUser }) => {
        return await models.ProgressPic.find({ user: currentUser.id });
      }
    ),
  },
  Mutation: {
    createProgressPics: combineResolvers(
      isAuthenticated,
      async (parent, { pics }, { models, currentUser }) => {
        return pics.map(async (pic, i) => {
          const { bodyPart, localPath } = pic;
          return await models.ProgressPic.create({
            bodyPart,
            user: currentUser.id,
            url: localPath,
          });

          return progressPic;
        });
      }
    ),
    updateProgressPics: combineResolvers(
      isAuthenticated,
      async (parent, { pics }, { models, currentUser }) => {
        return pics.map(async (pic, i) => {
          const { file, filename, progressPicId } = pic;
          const { createReadStream } = await file;
          const stream = createReadStream();

          const { Location: url, Key: key, Bucket: bucket } = await S3.upload({
            stream,
            filename,
            folder: currentUser.id,
          });

          return await models.ProgressPic.findOneAndUpdate(
            { _id: progressPicId },
            { $set: { url, key, bucket } },
            { new: true, useFindAndModify: false }
          );
        });
      }
    ),
  },
};


0 commentaires