2
votes

Réinitialisation de la connexion lors de l'envoi de données d'AWS Lambda à la file d'attente SQS

J'utilise AWS SDK pour Java dans lequel j'envoie des données d'AWS Lambda à SQS.

Nous obtenons une exception:

 List<SendMessageBatchRequestEntry> sqsList= new LinkedList<SendMessageBatchRequestEntry>();
    int batchId = 0; //To send a unique batchId for each msg in a batch
    for (Metadata metadata: metadataList) {
        String jsonString = new Gson().toJson(metadata);
        sqsList.add(new SendMessageBatchRequestEntry(batchId + "", jsonString));
        batchId++;
    }
    amazonSqs.sendMessageBatch(new SendMessageBatchRequest(queueUrl, sqsList));

Code:

Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
at sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:886)
at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:857)
at sun.security.ssl.AppOutputStream.write(AppOutputStream.java:123)
at org.apache.http.impl.io.SessionOutputBufferImpl.streamWrite(SessionOutputBufferImpl.java:124)
at org.apache.http.impl.io.SessionOutputBufferImpl.write(SessionOutputBufferImpl.java:160)
at org.apache.http.impl.io.ContentLengthOutputStream.write(ContentLengthOutputStream.java:113)
at org.apache.http.impl.io.ContentLengthOutputStream.write(ContentLengthOutputStream.java:120)
at org.apache.http.entity.StringEntity.writeTo(StringEntity.java:167)
at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:156)
at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:160)
at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:238)
at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doSendRequest(SdkHttpRequestExecutor.java:63)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)

Contexte ce que nous essayons de faire:

Nous avons une fonction Lambda principale qui crée et initialise une file d'attente SQS, et contient des détails pour chaque enregistrement qui doit être traité. La file d'attente SQS doit maintenant être configurée pour créer des lots de X nombre de messages à partir de la file d'attente et appeler automatiquement une autre fonction SQS Lambda pour chacun des lots.


0 commentaires

3 Réponses :


0
votes

On dirait que votre code va bien, et pour autant que je me souvienne (j'ai vu cette erreur moi-même plusieurs fois), cela se produit de temps en temps lors de l'utilisation du SDK en raison de la façon dont le SDK réutilise les connexions HTTP. Cette erreur vous indique seulement que votre Lambda, eh bien, a réinitialisé la connexion HTTP, mais le SDK a intégré des fonctionnalités pour réessayer les demandes ayant échoué, donc ça devrait aller si vous ne voyez pas cette erreur sur chaque demande.


7 commentaires

Merci Matus ... Même cela ne me dérange pas si le SDK a basculé pour cela. Mais ça m'arrive à chaque fois. Le nombre total d'enregistrements qui tentent d'envoyer est d'environ 20k. Le nombre de données qui pose problème est-il?


@Raushan Utilisez-vous FIFO ou file d'attente standard, et quelle est la taille d'un message que vous envoyez dans la file d'attente?


C'est une file d'attente standard. Et la taille est de 6800 Ko :(. Comment décomposer cela? Je veux dire qu'il doit envoyer des lots à la file d'attente pour que l'autre Lambda le traite par lots (car nous sommes limités à 15 minutes).


@Raushan La taille maximale d'un message que vous pouvez envoyer à la file d'attente est de 256 Ko, tout ce qui est plus grand sera supprimé. Il existe quelques solutions de contournement pour cela, mais j'aurais besoin de connaître le cas d'utilisation et le contenu des messages. Pourquoi le message est-il si grand?


Eh bien ... Nous transférons nos tâches CRON de l'instance EC2 vers Lambda. Un des travaux qui a 20k enregistrements et pour chaque enregistrement doit collecter ses métadonnées à partir du service. Donc, 15 minutes seront courtes. Ensuite, nous avons pensé à pousser ces données dans la file d'attente et à les gérer plus tard à partir d'un autre lambda qui traite les enregistrements X en un seul appel. La file d'attente SQS doit donc être créée par lots de messages X. C'est pourquoi nous faisons cela. J'espère que je vais dans la bonne direction !!!


Je ne sais pas s'il s'agit du meilleur cas d'utilisation de Lambda. Mais voici une chose que vous pouvez faire même sans utiliser SQS. Utilisez une fonction Lambda qui recevra l'ensemble du travail. Utilisez-le uniquement pour diviser le travail en morceaux raisonnables (quelque chose qu'une seule fonction peut traiter) et répartissez ces tâches plus petites en plusieurs Lambdas. Vous pouvez ensuite envoyer ces résultats partiels dans S3 et les recombiner. Notez que cela ne convient pas à tous les scénarios et que vous devez évaluer si cela vous convient ou non.


Pourquoi ne pas essayer les fonctions step?



3
votes

Le nombre maximum de messages par lot est de 10. Vous ne pouvez pas remplir la file d'attente SQS avec 20k à la fois et envoyer cette demande. Essayez de le diviser en 10.

https://docs.aws .amazon.com / AWSSimpleQueueService / latest / SQSDeveloperGuide / sqs-limits.html # limits-queues


1 commentaires

Merci Dejan. Un simple ajustement du code a fonctionné. Ajout de code dans la réponse à cette question.



2
votes

Nous pouvons l'envoyer par lot de 10. Code de travail:

List<SendMessageBatchRequestEntry> sqsList= new LinkedList<SendMessageBatchRequestEntry>();
    int batchId = 1; //To send a unique batchId for each msg in a batch
    for (Metadata metadata: metadataList) {
        String jsonString = new Gson().toJson(metadata);
        if (sqsList.size() == 10) {
            amazonSqs.sendMessageBatch(new SendMessageBatchRequest(queueUrl, sqsList));
            sqsList.clear();
        } 
        sqsList.add(new SendMessageBatchRequestEntry(batchId + "", jsonString)); 
        batchId++;
    }
    if(sqsList.size()>0) {
        amazonSqs.sendMessageBatch(new SendMessageBatchRequest(queueUrl, sqsList));
    }


0 commentaires