J'aimerais savoir s'il y a un moyen élégant d'atteindre quelque chose comme ça: en fait, j'ai besoin d'une telle opération sur des flux de parallélisation - pour diviser les données sur plusieurs acteurs sans chargement. Toute la matière dans la mémoire. p> p>
6 Réponses :
La seule chose que je peux penser:
def distribute[T](n: Int)(x: Stream[T]) = (0 until n).map { p => x.zipWithIndex.collect { case (e,i) if i % n == p => e } }
Oui, j'ai pensé à cela. Je dois fusionner les résultats des acteurs et le problème est que les résultats intermédiaires consomment également beaucoup de mémoire et je souhaite qu'il y ait quelques acteurs et le même nombre de tâches / résultats. Mais néanmoins, je pourrais modifier des acteurs pour réutiliser les résultats des tâches précédentes et je vais suivre de cette façon s'il n'y a pas de moyen simple de diviser un flux.
scala> (1 to 30 grouped 3).toList.transpose foreach println List(1, 4, 7, 10, 13, 16, 19, 22, 25, 28) List(2, 5, 8, 11, 14, 17, 20, 23, 26, 29) List(3, 6, 9, 12, 15, 18, 21, 24, 27, 30)
Stream.from (1). Groupé (3)
Euh, ça fait mal. Après avoir examiné la mise en œuvre, j'ai vu que transpose code> vérifie si toutes les collections ont la même taille. Bien que ce soit un chèque valide, cela fait mal la nature paresseuse des flux
code>. Merci d'avoir souligné cela, je vais apporter cela aux listes de diffusion ...
Je viens de poser une question à ce sujet. Stackoverflow.com/questions/17116061/... < / a> Cela semble être une limitation de l'abstraction du constructeur. Le problème est fondamentalement la ligne 170 dans generictraverableTemplate.scala code> github.com/scala/scala/blob/v2.10.2/src/library/scala/...
Donc, ce n'est pas le bon endroit pour vérifier si quelque chose est un bogue ou non, pas pas plus que les bonnes personnes de décider cela. Je garderai cela à l'esprit, si aucune bonne réponse n'apparaît, je vais apporter cela aux listes de diffusion.
Une approche simple consiste à générer une séquence arithmétique pour les indices que vous souhaitez, puis en mappant que dans le flux. La méthode appliquée retirera les valeurs correspondantes:
def comb[A]( s:Stream[A], first:Int, step:Int ):Iterator[A] = new Iterator { var i = first - step def hasNext = true def next = { i += step; s(i) } } def g[A]( s:Stream[A], n:Int ) = 0 until n map ( i => comb(s,i,n) ) g( Stream from 1, 3 ) map ( _ take 4 mkString "," ) // Vector(1,4,7,10, 2,5,8,11, 3,6,9,12)
Créer des itérateurs a l'air bien. La seule chose qui, dans votre implémentation, Hasnext retourne toujours vrai - il ne traite que des collections infinies, car le code sera plus complexe. J'ai utilisé des acteurs de la bibliothèque Standard Scala, mais il semble que Akka vaut la peine d'apprendre, merci.
Voir aussi Stackoverflow.com/Questtions/11132788/... pour le cas fini.
Aie! La fonction "coulissante" avec l'étape a fait le tour. Il convient aussi aux flux. Donc, l'écriture d'un itérateur personnalisé pourrait être évitée.
Je n'ai trouvé aucune fonction de cette fonction dans la bibliothèque Scala, j'ai donc rétrogradé la variante d'itératrice de la réponse d'Amigonico. Le code traite les collections finies et infinies.
def splitRoundRobin[A](s: Iterable[A], n: Int) = { def comb[A](s: Iterable[A], first: Int, step: Int): Iterator[A] = new Iterator[A] { val iter = s.iterator var nextElem: Option[A] = iterToNext(first) def iterToNext(elemsToSkip: Int) = { iterToNextRec(None, elemsToSkip) } def iterToNextRec(next: Option[A], repeat: Int): Option[A] = repeat match { case 0 => next case _ => if (iter.hasNext) iterToNextRec(Some(iter.next()), repeat - 1) else None } def hasNext = nextElem.isDefined || { nextElem = iterToNext(step) nextElem.isDefined } def next = { var result = if (nextElem.isDefined) nextElem.get else throw new IllegalStateException("No next") nextElem = None result } } 0 until n map (i => comb(s, i, n)) } splitRoundRobin(1 to 12 toStream, 3) map (_.toList.mkString(",")) // Vector(3,6,9,12, 1,4,7,10, 2,5,8,11) splitRoundRobin(Stream from 1, 3) map (_.take(4).mkString(",")) //> Vector(3,6,9,12, 1,4,7,10, 2,5,8,11)
scala> roundRobin(3, Stream.from(0).take(10)).map(_.mkString).mkString(" ") res5: String = 0369 147 258
La réponse de diviser une liste de scala en n Les listes d'entrelacement répondent entièrement aux conditions, un peu modifiées en fonction de flux:
def round[A](seq: Iterable[A], n: Int) = { (0 until n).map(i => seq.drop(i).sliding(1, n).flatten) } round(Stream.from(1),3).foreach(i => println(i.take(3).toList)) List(1, 4, 7) List(2, 5, 8) List(3, 6, 9)