9
votes

Pyspark: la tâche sérialisée dépasse le maximum autorisé. Envisagez d'augmenter spark.rpc.message.maxSize ou d'utiliser des variables de diffusion pour les grandes valeurs

Je fais des calculs sur un cluster et à la fin quand je demande des statistiques récapitulatives sur mon dataframe Spark avec df.describe (). show () j'obtiens une erreur:

La tâche sérialisée 15: 0 était de 137500581 octets , qui dépasse le maximum autorisé: spark.rpc.message.maxSize (134217728 octets). Envisagez d'augmenter spark.rpc.message.maxSize ou d'utiliser des variables de diffusion pour les grandes valeurs

Dans ma configuration Spark, j'ai déjà essayé d'augmenter le paramètre susmentionné:

dfscoring=dfscoring.repartition(100)

J'ai aussi essayé de repartitionner mon dataframe en utilisant:

spark = (SparkSession
         .builder
         .appName("TV segmentation - dataprep for scoring")
         .config("spark.executor.memory", "25G")
         .config("spark.driver.memory", "40G")
         .config("spark.dynamicAllocation.enabled", "true")
         .config("spark.dynamicAllocation.maxExecutors", "12")
         .config("spark.driver.maxResultSize", "3g")
         .config("spark.kryoserializer.buffer.max.mb", "2047mb")
         .config("spark.rpc.message.maxSize", "1000mb")
         .getOrCreate())

mais j'obtiens toujours la même erreur.

Mon environnement: Python 3.5, Anaconda 5.0 , Spark 2

Comment puis-je éviter cette erreur?


1 commentaires

comment avez-vous créé df? de la lecture d'un fichier ou d'objets python?


3 Réponses :


5
votes

J'ai eu le même problème et j'ai perdu une journée de ma vie que je ne reviens jamais. Je ne sais pas pourquoi cela se produit, mais voici comment je l'ai fait fonctionner pour moi.

Étape 1: Assurez-vous que PYSPARK_PYTHON et PYSPARK_DRIVER_PYTHON sont correctement définis. Il s'est avéré que python dans worker (2.6) avait une version différente de celle du driver (3.6). Vous devriez vérifier si les variables d'environnement PYSPARK_PYTHON et PYSPARK_DRIVER_PYTHON sont correctement définies.

Je l'ai corrigé en changeant simplement mon noyau de Python 3 Spark 2.2.0 à Python Spark 2.3.1 dans Jupyter. Vous devrez peut-être le configurer manuellement. Voici comment vous assurer que votre PySpark est correctement configuré https: //mortada.net/3-easy-steps-to-set-up-pyspark.html

ÉTAPE 2: Si cela ne fonctionne pas, essayez de contourner le problème: Ce commutateur de noyau fonctionnait pour les DF auxquels je n'ai ajouté aucune colonne: spark_df -> panda_df -> back_to_spark_df .... mais cela n'a pas fonctionné sur les DF où j'avais ajouté 5 colonnes supplémentaires. Donc, ce que j'ai essayé et cela a fonctionné était le suivant:

# 1. Select only the new columns: 

    df_write = df[['hotel_id','neg_prob','prob','ipw','auc','brier_score']]


# 2. Convert this DF into Spark DF:



     df_to_spark = spark.createDataFrame(df_write)
     df_to_spark = df_to_spark.repartition(100)
     df_to_spark.registerTempTable('df_to_spark')


# 3. Join it to the rest of your data:

    final = df_to_spark.join(data,'hotel_id')


# 4. Then write the final DF. 

    final.write.saveAsTable('schema_name.table_name',mode='overwrite')

J'espère que cela aide!


0 commentaires

6
votes

J'ai le même problème, alors je le résous. la cause est spark.rpc.message.maxSize si la valeur par défaut est 128M , vous pouvez la changer lors du lancement d'un client Spark, je travaille dans pyspark et définissez la valeur sur 1024, donc j'écris comme ceci:

pyspark --master yarn --conf spark.rpc.message.maxSize=1024

résolvez-le.


0 commentaires

1
votes

J'ai eu le même problème mais en utilisant Watson studio. Ma solution était:

sc.stop()
configura=SparkConf().set('spark.rpc.message.maxSize','256')
sc=SparkContext.getOrCreate(conf=configura)
spark = SparkSession.builder.getOrCreate()

J'espère que cela aidera quelqu'un ...


1 commentaires

Cela m'a beaucoup aidé! Je vous remercie! Cette solution m'a sauvé.