J'utilise Scala sur Flink avec l'API DataSet. Je souhaite re-partitionner mes données sur les nœuds. Spark a une fonction qui permet à l'utilisateur de re-partitionner les données avec un paramètre numberOfPartitions donné ( link ) et je crois que Flink ne prend pas en charge une telle fonction. Ainsi, je voulais y parvenir en implémentant une fonction de partitionnement personnalisée.
Mes données sont de type DataSet (Double, SparseVector) Un exemple de ligne à partir des données:
data.partitionCustom(new myPartitioner(),1)
Puisque mon "Double" est binaire (1 ou -1), je souhaite partitionner mes données en fonction de la longueur du SparceVector. Mon partitionneur personnalisé est le suivant:
class myPartitioner extends Partitioner[SparseVector] { override def partition(key: SparseVector, numPartitions: Int): Int = { key.size % numPartitions } }
J'appelle ce partitionneur personnalisé comme suit:
(1.0 SparseVector((2024,1.0), (2025,1.0), (2030,1.0), (2045,1.0), (2046,1.41), (2063,1.0), (2072,1.0), (3031,1.0), (3032,1.0), (4757,1.0), (4790,1.0), (177196,1.0), (177197,0.301), (177199,1.0), (177202,1.0), (1544177,1.0), (1544178,1.0), (1544179,1.0), (1654031,1.0), (1654190,1.0), (1654191,1.0), (1654192,1.0), (1654193,1.0), (1654194,1.0), (1654212,1.0), (1654237,1.0), (1654238,1.0)))
Quelqu'un peut-il m'aider s'il vous plaît pour comprendre comment spécifier le nombre de partitions comme argument "numPartitions" lors de l'appel de la fonction myPartitioner dans Scala.
Merci.
3 Réponses :
Spark utilise la fonction repartition (n: Int) pour redistribuer les données en n partitions, qui seront traitées par n tâches. De mon point de vue, cela inclut deux changements: la redistribution des données et le nombre de tâches en aval.
Par conséquent, dans Apache Flink, je pense que le partitionneur est mappé à la redistribution des données et que le parallélisme est mappé au nombre de tâches en aval, ce qui signifie que vous pouvez utiliser setParallelism pour déterminer les "numPartitions".
Dans Flink, vous pouvez définir setParallelism
pour un seul opérateur ou pour tous les opérateurs en utilisant enviornment.setParallelism
. J'espère que ce lien vous aidera.
Oui, data.partitionCustom (new myPartitioner (), 1) .setParallelism (7)
devrait fonctionner. Et puis vous pouvez utiliser ExecutionEnvironment.getExecutionPlan ()
pour vérifier que le plan ressemble à ce que vous voulez (collez le json dans flink.apache.org/visualizer ).
Je suppose que vous utilisez la longueur du SparseVector
juste pour avoir quelque chose qui vous donne des valeurs relativement aléatoires à utiliser pour le partitionnement. Si c'est vrai, vous pouvez simplement faire un DataSet.rebalance ()
. Si vous suivez cela par n'importe quel opérateur (y compris un Sink
) où vous définissez le parallélisme sur numPartitions
, alors vous devriez obtenir des données bien repartitionnées.
Mais votre description de ... veulent re-partitionner mes données sur les nœuds
me fait penser que vous essayez d'appliquer le concept de Spark de RDD
s à Flink, ce qui n'est pas vraiment valable. Par exemple. en supposant que vous ayez des opérateurs parallèles numPartition
traitant les données (re-partitionnées) dans votre DataSet, alors ces opérateurs seront exécutés dans les emplacements fournis par les TaskManagers disponibles, et ces emplacements peuvent ou non être sur des emplacements physiques différents. serveurs.
Votre hypothèse est juste et je comprends maintenant le concept. Merci pour toutes les informations.