5
votes

Fonction de partition personnalisée Flink

J'utilise Scala sur Flink avec l'API DataSet. Je souhaite re-partitionner mes données sur les nœuds. Spark a une fonction qui permet à l'utilisateur de re-partitionner les données avec un paramètre numberOfPartitions donné ( link ) et je crois que Flink ne prend pas en charge une telle fonction. Ainsi, je voulais y parvenir en implémentant une fonction de partitionnement personnalisée.

Mes données sont de type DataSet (Double, SparseVector) Un exemple de ligne à partir des données:

data.partitionCustom(new myPartitioner(),1)

Puisque mon "Double" est binaire (1 ou -1), je souhaite partitionner mes données en fonction de la longueur du SparceVector. Mon partitionneur personnalisé est le suivant:

class myPartitioner extends Partitioner[SparseVector]
{ 
    override def partition(key: SparseVector, numPartitions: Int): Int = {
         key.size % numPartitions
    } 
}

J'appelle ce partitionneur personnalisé comme suit:

(1.0 SparseVector((2024,1.0), (2025,1.0), (2030,1.0), (2045,1.0), (2046,1.41), (2063,1.0), (2072,1.0), (3031,1.0), (3032,1.0), (4757,1.0), (4790,1.0), (177196,1.0), (177197,0.301), (177199,1.0), (177202,1.0), (1544177,1.0), (1544178,1.0), (1544179,1.0), (1654031,1.0), (1654190,1.0), (1654191,1.0), (1654192,1.0), (1654193,1.0), (1654194,1.0), (1654212,1.0), (1654237,1.0), (1654238,1.0)))

Quelqu'un peut-il m'aider s'il vous plaît pour comprendre comment spécifier le nombre de partitions comme argument "numPartitions" lors de l'appel de la fonction myPartitioner dans Scala.

Merci.


0 commentaires

3 Réponses :


2
votes

Spark utilise la fonction repartition (n: Int) pour redistribuer les données en n partitions, qui seront traitées par n tâches. De mon point de vue, cela inclut deux changements: la redistribution des données et le nombre de tâches en aval.

Par conséquent, dans Apache Flink, je pense que le partitionneur est mappé à la redistribution des données et que le parallélisme est mappé au nombre de tâches en aval, ce qui signifie que vous pouvez utiliser setParallelism pour déterminer les "numPartitions".


0 commentaires

3
votes

Dans Flink, vous pouvez définir setParallelism pour un seul opérateur ou pour tous les opérateurs en utilisant enviornment.setParallelism . J'espère que ce lien vous aidera.


1 commentaires

Oui, data.partitionCustom (new myPartitioner (), 1) .setParallelism (7) devrait fonctionner. Et puis vous pouvez utiliser ExecutionEnvironment.getExecutionPlan () pour vérifier que le plan ressemble à ce que vous voulez (collez le json dans flink.apache.org/visualizer ).



2
votes

Je suppose que vous utilisez la longueur du SparseVector juste pour avoir quelque chose qui vous donne des valeurs relativement aléatoires à utiliser pour le partitionnement. Si c'est vrai, vous pouvez simplement faire un DataSet.rebalance () . Si vous suivez cela par n'importe quel opérateur (y compris un Sink ) où vous définissez le parallélisme sur numPartitions , alors vous devriez obtenir des données bien repartitionnées.

Mais votre description de ... veulent re-partitionner mes données sur les nœuds me fait penser que vous essayez d'appliquer le concept de Spark de RDD s à Flink, ce qui n'est pas vraiment valable. Par exemple. en supposant que vous ayez des opérateurs parallèles numPartition traitant les données (re-partitionnées) dans votre DataSet, alors ces opérateurs seront exécutés dans les emplacements fournis par les TaskManagers disponibles, et ces emplacements peuvent ou non être sur des emplacements physiques différents. serveurs.


1 commentaires

Votre hypothèse est juste et je comprends maintenant le concept. Merci pour toutes les informations.