J'ai un travail Flink qui a un grand état dans un opérateur Map. Nous prenons savepoint qui a environ 80 Go de stockage sur AWS S3. Nous avons environ 100 parallélismes pour cet opérateur. Cependant, lorsque nous récupérons à partir du point de sauvegarde, il y a toujours une exception comme
Causé par: java.io.InterruptedIOException: Impossible d'ouvrir s3a: // xxxx / UbfK / flink / savepoints / logjoin / savepoint-a9600e-39fd2cc07076 / f9a31490-461a-42f3-88be-ec169145c35f à 0 sur s3a: / adshonor-data-cube-test-apse1 / UbfK / flink / savepoints / logjoin / savepoint-a9600e-39fd2cc07076 / f9a31490-461a-42f3-88be-ec169145c35f: org.apache.flink.fs.s3base.shaded.com.amazonaws .SdkClientException: impossible d'exécuter la requête HTTP: délai d'attente de connexion depuis le pool.
Existe-t-il un paramètre de configuration pour augmenter les paramètres de délai d'attente pour AWS S3 dans Flink ou un autre moyen d'éviter cette erreur?
3 Réponses :
Essayez de définir fs.s3a.connection.maximum
sur quelque chose comme 50 ou 100
pour élaborer un peu sur ce que Steve a dit ... il est probable que le problème soit que le client HTTP utilisé ne dispose pas d'un "pool de connexions" assez grand.
Chaque client S3A interagissant avec un seul compartiment, en tant qu'utilisateur unique, possède son propre pool dédié de connexions HTTP 1.1 ouvertes aux côtés d'un pool de threads utilisé pour les opérations de téléchargement et de copie. Le pool par défaut les tailles visent à trouver un équilibre entre les performances et utilisation de la mémoire / thread.
Pour un bon aperçu des choses que vous pouvez régler (qui inclut fs.s3a.connection.max
voir la section "Options pour régler" de cette page Hadoop .
Étant donné que Flink utilise le code de connexion AWS, le paramètre à bump est fs.s3.maxConnections
, ce qui n'est pas la même chose qu'une configuration Hadoop pure.
Lors de l'exécution sur AWS EMR, vous pouvez vous référer à ce document: https://aws.amazon.com/cn/premiumsupport/knowledge-center/emr-timeout-connection-wait/
Nous avons essayé de modifier le fs.s3a.connection.maximum mais cela ne fonctionne pas. Une erreur de temporisation se produit toujours. Nous résolvons enfin le problème temporellement en réduisant le numéro de slot de chaque taskmanager. Auparavant, nous avons quelques grands gestionnaires de tâches qui ont 30 à 40 emplacements. En rétrogradant la taille de la machine et en réduisant le nombre d'emplacements à 8 et en augmentant le nombre de gestionnaires de tâches, nous éliminons enfin les erreurs de temporisation.