3
votes

Calculer les valeurs de deux dataframes dans PySpark

>

J'essaye de calculer la formule X = false / (false + true) pour chaque card_scheme et j'obtiens toujours un dataframe à la fin.

Je suis attend quelque chose comme:

| card_scheme | X |
|-------------|---|
| jcb         | 1 |
| ....        | . |
| visa        | 0.9846| (which is 126372 / (126372 + 1975)        
| ...         | . |


0 commentaires

5 Réponses :


3
votes

Création de l'ensemble de données

from pyspark.sql.window import Window
df = df.groupBy("card_scheme", "failed").agg(sum("count"))\
  .withColumn("X", col("sum(count)")/sum("sum(count)").over(Window.partitionBy(col('card_scheme'))))\
  .where(col('failed')== False).drop('failed','sum(count)')
df.show()

+----------------+------------------+
|     card_scheme|                 X|
+----------------+------------------+
|            VISA|               1.0|
|             jcb|               1.0|
|      MASTERCARD|               1.0|
|         maestro|               1.0|
|            AMEX|               1.0|
|      mastercard|0.9732717137548239|
|american express|               1.0|
|              CB|               1.0|
|        discover|               1.0|
|            visa|0.9846120283294506|
+----------------+------------------+

Méthode 1: Cette méthode sera plus lente, car elle implique une transposition via pivot.

df=df.groupBy("card_Scheme").pivot("failed").sum("count")
df=df.withColumn('X',when((col('True').isNotNull()),(col('false')/(col('false')+col('true')))).otherwise(1))
df=df.select('card_Scheme','X')
df.show()
+----------------+------------------+
|     card_Scheme|                 X|
+----------------+------------------+
|            VISA|               1.0|
|             jcb|               1.0|
|      MASTERCARD|               1.0|
|         maestro|               1.0|
|            AMEX|               1.0|
|      mastercard|0.9732717137548239|
|american express|               1.0|
|              CB|               1.0|
|        discover|               1.0|
|            visa|0.9846120283294506|
+----------------+------------------+

Méthode 2: Utilisez SQL - vous pouvez le faire via les fenêtres fonction. Ce sera beaucoup plus rapide.

myValues = [('jcb',False,4),('american express', False, 22084),('AMEX',False,4),('mastercard',True,1122),('visa',True,1975),('visa',False,126372),('CB',False,6),('discover',False,2219),('maestro',False,2),('VISA',False,13),('mastercard',False,40856),('MASTERCARD',False,9)]
df = sqlContext.createDataFrame(myValues,['card_Scheme','failed','count'])
df.show()
+----------------+------+------+
|     card_Scheme|failed| count|
+----------------+------+------+
|             jcb| false|     4|
|american express| false| 22084|
|            AMEX| false|     4|
|      mastercard|  true|  1122|
|            visa|  true|  1975|
|            visa| false|126372|
|              CB| false|     6|
|        discover| false|  2219|
|         maestro| false|     2|
|            VISA| false|    13|
|      mastercard| false| 40856|
|      MASTERCARD| false|     9|
+----------------+------+------+


1 commentaires

Très utile ! Merci beaucoup :)



1
votes

data.groupBy ("card_scheme"). pivot ("failed"). agg (count ("card_scheme")) devrait fonctionner. Je ne suis pas sûr de la agg (count (any_column)) , mais l'indice est la fonction pivot . En conséquence, vous obtiendrez deux nouvelles colonnes: false et true . Ensuite, vous pouvez facilement calculer le x = false / (false + true) .


1 commentaires

Eh bien, c'est vraiment proche de ce que je veux mais cela me donne parfois des valeurs «nulles»



1
votes

Une solution simple serait de faire un deuxième groupby:

val grouped_df = data.groupBy("card_scheme", "failed").count() // your dataframe
val with_countFalse = grouped_df.withColumn("countfalse", when($"failed" === "false", $"count").otherwise(lit(0)))
with_countFalse.groupBy("card_scheme").agg(when($"failed" === "false", $"count").otherwise(lit(0)))) / sum($"count")).show()

L'idée est que vous pouvez créer une deuxième colonne dont l'échec a échoué = false et 0 sinon. Cela signifie que la somme de la colonne count vous donne false + true tandis que la somme du countfalse donne juste le false. Ensuite, faites simplement un deuxième groupe par

Remarque: Certaines des autres réponses utilisent pivot. Je pense que la solution de pivot serait plus lente (elle fait plus), cependant, si vous choisissez de l'utiliser, ajoutez les valeurs spécifiques à l'appel de pivot, c'est-à-dire pivot ("failed", ["true", "false"]) pour améliorer les performances, sinon spark devrait faire deux chemins (le premier pour trouver les valeurs)


0 commentaires

1
votes
c.withColumn('X', c.num/c.den)

1 commentaires

Marche parfaitement ! N'a pas besoin de changer quoi que ce soit :)



2
votes

Commencez par diviser le dataframe racine en deux dataframes:

from pyspark.sql.functions import col,when
df_result = df_true.join(df_false,df_true.card_scheme == df_false.card_scheme, "outer") \
    .select(when(col("df1.card_scheme").isNotNull(), col("df1.card_scheme")).otherwise(col("df2.card_scheme")).alias("card_scheme") \
            , when(col("df1.failed").isNotNull(), (col("df2.count")/(col("df1.count") + col("df2.count")))).otherwise(1).alias("X"))

Ensuite, en effectuant une jointure externe complète, nous pouvons obtenir le résultat final:

df_true = data.filter(data.failed == True).alias("df1")
df_false =data.filter(data.failed == False).alias("df2")

Pas besoin pour faire groupby , juste deux dataframes supplémentaires et rejoindre.


2 commentaires

Votre code fonctionnera parfaitement et résoudra le problème actuel. J'ai juste une petite remarque à faire - les jointures sont une opération assez coûteuse si les données sont stockées sur plusieurs partitions, car cela impliquera un mélange. Où en tant que groupBy peut d'abord faire le regroupement sur les partitions locales, puis faire le mélange. Ainsi, le nombre de lignes mélangées sera beaucoup moins élevé, ce qui rend le travail plus efficace. Cordialement,


Oui. Je suis d'accord :)