J'ai un point de terminaison qui reçoit des fichiers et crée une tâche d'arrière-plan pour télécharger ces fichiers sur S3.
Afin de mettre en arrière-plan les téléchargements de fichiers, j'utilise Agenda ( https://github.com/agenda/agenda ). La seule limitation est que je dois stocker le fichier dans un format pris en charge par MongoDB (qui est ce qu'Agenda utilise sous le capot). Pour ce faire, je convertis le fichier en tant que tampon avant de l'envoyer à Agenda.
Voici mon code:
Uploading to S3... Uploading to S3... Uploading to S3... ERROR OCCURRED: BadRequestError: Request disconnected during file upload stream parsing. at IncomingMessage.<anonymous> (/Users/edmundmai/Documents/src/acne-tracker/server/node_modules/graphql-upload/lib/processRequest.js:300:35) at Object.onceWrapper (events.js:291:20) at IncomingMessage.emit (events.js:203:13) at IncomingMessage.EventEmitter.emit (domain.js:471:20) at resOnFinish (_http_server.js:614:7) at ServerResponse.emit (events.js:208:15) at ServerResponse.EventEmitter.emit (domain.js:471:20) at onFinish (_http_outgoing.js:649:10) at onCorkedFinish (_stream_writable.js:678:5) at afterWrite (_stream_writable.js:483:3) at processTicksAndRejections (internal/process/task_queues.js:77:11) { message: 'Request disconnected during file upload stream parsing.', expose: true, statusCode: 499, status: 499 } Done!
Ceci est rapide sur mon serveur de développement local, mais prend beaucoup de temps à s'exécuter en production à cause du tampon. Les lignes qui suivent console.log (Configuration du tampon ...)
prennent beaucoup de temps.
Ce que je voudrais faire est:
pics
Est-ce possible?
============ UPDATE ==========
Donc si Je n'attends pas
la promesse, il se plaint que la requête s'est déconnectée avant la fin du buffer:
const uploadProgressPic = async ({ stream, progressPicId, userId, bodyPart, models, }) => { try { console.log("Uploading to S3..."); const { Location: url, Key: key, Bucket: bucket } = await S3.upload({ stream, folder: userId, filename: `${progressPicId}-${bodyPart}.jpg`, }); if (url && key && bucket) { await models.ProgressPic.findOneAndUpdate( { _id: progressPicId }, { $set: { url, key, bucket } }, { new: true, useFindAndModify: false } ); console.log("Done!"); } } catch (error) { console.log("ERROR OCCURRED: ", error); } }; export default { Mutation: { batchCreateProgressPics: combineResolvers( isAuthenticated, async (parent, { pics }, { models, currentUser }) => { return pics.map(async (pic, i) => { const { file, bodyPart, localPath } = pic; const progressPic = await models.ProgressPic.create({ bodyPart, user: currentUser.id, url: localPath, }); const { createReadStream } = await file; const stream = createReadStream(); uploadProgressPic({ stream, progressPicId: progressPic.id, userId: currentUser.id, bodyPart, models, }); return progressPic; }); } ), }, };
Erreur:
ERROR OCCURRED: BadRequestError: Request disconnected during file upload stream parsing. at IncomingMessage.<anonymous> (/Users/edmundmai/Documents/src/acne-tracker/server/node_modules/graphql-upload/lib/processRequest.js:300:35) at Object.onceWrapper (events.js:291:20) at IncomingMessage.emit (events.js:203:13) at IncomingMessage.EventEmitter.emit (domain.js:471:20) at resOnFinish (_http_server.js:614:7) at ServerResponse.emit (events.js:208:15) at ServerResponse.EventEmitter.emit (domain.js:471:20) at onFinish (_http_outgoing.js:649:10) at onCorkedFinish (_stream_writable.js:678:5) at afterWrite (_stream_writable.js:483:3) at processTicksAndRejections (internal/process/task_queues.js:77:11) { message: 'Request disconnected during file upload stream parsing.', expose: true, statusCode: 499, status: 499 }
========== UPDATE 2 =============
Même en essayant de 1) le simplifier et 2) déplacer createReadStream ()
en dehors de uploadProgressPic
affiche la même erreur:
const uploadProgressPic = async ({ file, progressPicId, userId, bodyPart }) => { try { const { createReadStream } = await file; const stream = createReadStream(); console.log("Setting up buffer..."); const buffer = await new Promise((resolve, reject) => { var buffers = []; stream.on("data", function(data) { buffers.push(data); }); stream.on("end", function() { const everything = Buffer.concat(buffers); resolve(everything); }); stream.on("error", function(e) { reject(e); }); }); console.log("Done."); console.log("Creating backgruond task..."); Agenda.now("uploadProgressPic", { userId, progressPicId, filename: `${progressPicId}-${bodyPart}.jpg`, buffer, }); } catch (error) { console.log("ERROR OCCURRED: ", error); } }; export default { Mutation: { batchCreateProgressPics: combineResolvers( isAuthenticated, async (parent, { pics }, { models, currentUser }) => { return pics.map(async (pic, i) => { const { file, bodyPart, localPath } = pic; const progressPic = await models.ProgressPic.create({ bodyPart, user: currentUser.id, url: localPath, }); uploadProgressPic({ file, progressPicId: progressPic.id, userId: currentUser.id, bodyPart, }); return progressPic; }); } ), }, };
Erreur:
Mutation: { batchCreateProgressPics: combineResolvers( isAuthenticated, async (parent, { pics }, { models, currentUser }) => { return await Promise.all( pics.map(async (pic, i) => { const { file, bodyPart, localPath } = pic; const { createReadStream } = await file; const stream = createReadStream(); console.log("Setting up buffer..."); const buffer = await new Promise((resolve, reject) => { var buffers = []; stream.on("data", function(data) { buffers.push(data); }); stream.on("end", function() { const everything = Buffer.concat(buffers); resolve(everything); }); stream.on("error", function(e) { reject(e); }); }); const progressPic = await models.ProgressPic.create({ bodyPart, user: currentUser.id, url: localPath, }); console.log("Creating backgruond task..."); Agenda.now("uploadProgressPic", { userId: currentUser.id, progressPicId: progressPic.id, filename: `${progressPic.id}-${bodyPart}.jpg`, buffer, }); console.log("Done."); return progressPic; }) ); } ), },
Ce qui est drôle, c'est que je vois encore quelques Done!
dans les journaux même si cela se plaint?
3 Réponses :
Je ne suis pas un expert en la matière mais j'ai une idée qui peut marcher et une théorie:
IDÉE : si vous avez affaire à un grand nombre d'images, votre problème peut provenir du fichier await Promise.all (). je vous recommande d'utiliser parallelLimit depuis async pour limiter les fonctions parallèles à être exécuté à la fois, sinon vous aurez un problème de performances.
THÉORIE : Vous pouvez peut-être libérer l'allocation de mémoire après chaque utilisation de Buffer
pour éviter les problèmes de fuite de mémoire et rendre votre serveur plus performant.
Si je me trompe de toute façon, veuillez me corriger. Je m'intéresse moi-même à l'issue de ce problème.
N'attendez pas la promesse.
uploadProgressPic() return models.ProgressPic.create({ bodyPart, user: currentUser.id, url: localPath, });
De cette façon, vous lancerez la création des tampons, mais n'attendrez pas réellement que le code s'exécute et créerez immédiatement le ProgressPic et renvoyez-la. Parce que l'appel à Agenda.now
nécessite la valeur résolue de la promesse, nous la plaçons dans le rappel puis
. Notez qu'il est important d'ajouter un catch
également - si vous ne le faites pas, vous pourriez vous retrouver avec un rejet non géré.
Vous pouvez utiliser le catch
pour enregistrer l'erreur et effectuer un nettoyage supplémentaire. Par exemple, vous pouvez créer le ProgressPic créé (dans ce cas, vous devez déplacer l'appel create
au-dessus de la promesse de tampon afin de pouvoir référencer l'instance créée).
Si vous êtes comme moi et mourez un peu à l'intérieur à chaque fois que vous devez taper puis
, vous pouvez extraire toute cette logique dans une fonction distincte:
const uploadProgressPic = async (/* parameters omitted for brevity */) => { try { const buffer = await new Promise(...) Agenda.now(...) } catch (error) { // Do whatever } }
Je vais essayer ça! L'exécution du code dans la promesse non attendue continuera-t-elle à s'exécuter même après l'envoi de la réponse?
Il semble donc que cela se déconnecte: ERREUR SURVENUE: BadRequestError: Demande déconnectée lors de l'analyse du flux de téléchargement de fichiers. à IncomingMessage.
Hmm c'est problématique. En regardant la documentation , ce qui suit est appelé: "Utilisez uniquement createReadStream () avant le résolveur renvoie; les appels tardifs (par exemple dans une fonction asynchrone ou un rappel inattendu) génèrent une erreur. Les flux existants peuvent toujours être utilisés après l'envoi d'une réponse, bien qu'il y ait peu de raisons valables de ne pas attendre leur achèvement. "
@bigpotato J'essaierais de déplacer les lignes wait file
et createReadStream ()
en dehors de la fonction uploadProgressPic
et de simplement passer le flux à la fonction à la place.
Je l'ai essayé mais il montre toujours une erreur. Mise à jour de la question
Crud. Le renvoi d'un tableau de promesses devrait être identique à l'utilisation de Promise.all, mais vous pouvez également essayer d'encapsuler la carte
dans Promise.all
et voir si cela fait une différence: retournez Promise.all (pics.map (async (pic, i) => {
continuons cette discussion dans le chat .
J'ai essayé une variété de choses qui ont fini par ne pas fonctionner parce que la création du tampon était trop lente en production pour une raison quelconque. Ma solution ultime qui fonctionne réellement était de diviser le téléchargement en deux requêtes:
Backend:
Requête n ° 1: Créer une image de progression, en utilisant le chemin du fichier local comme URL Requête n ° 2: Téléchargez le fichier et mettez à jour la photo de progression
const createAndUploadProgressPics = async photos => { const { data: { createProgressPics: progressPics }, } = await createProgressPics({ variables: { pics: photos.map((p, i) => ({ bodyPart: BODY_PARTS[i], localPath: p.uri, })), }, }); updateProgressPics({ variables: { pics: progressPics.map(({ id, bodyPart }, i) => { return { progressPicId: id, filename: `${id}-${bodyPart}.jpg`, file: photos[i], }; }), }, }); onFinish(progressPics); navigation.goBack(); };
Le frontend attendra alors la réponse de la requête n ° 1 et enverra la requête n ° 2 mais ignorera la réponse. il peut simplement revenir immédiatement.
import { combineResolvers } from "graphql-resolvers"; import { isAuthenticated } from "./authorization"; import S3 from "../services/s3"; export default { Query: { progressPics: combineResolvers( isAuthenticated, async (parent, args, { models, currentUser }) => { return await models.ProgressPic.find({ user: currentUser.id }); } ), }, Mutation: { createProgressPics: combineResolvers( isAuthenticated, async (parent, { pics }, { models, currentUser }) => { return pics.map(async (pic, i) => { const { bodyPart, localPath } = pic; return await models.ProgressPic.create({ bodyPart, user: currentUser.id, url: localPath, }); return progressPic; }); } ), updateProgressPics: combineResolvers( isAuthenticated, async (parent, { pics }, { models, currentUser }) => { return pics.map(async (pic, i) => { const { file, filename, progressPicId } = pic; const { createReadStream } = await file; const stream = createReadStream(); const { Location: url, Key: key, Bucket: bucket } = await S3.upload({ stream, filename, folder: currentUser.id, }); return await models.ProgressPic.findOneAndUpdate( { _id: progressPicId }, { $set: { url, key, bucket } }, { new: true, useFindAndModify: false } ); }); } ), }, };