7
votes

diviser un flux dans de nombreux

J'aimerais savoir s'il y a un moyen élégant d'atteindre quelque chose comme ça: xxx

en fait, j'ai besoin d'une telle opération sur des flux de parallélisation - pour diviser les données sur plusieurs acteurs sans chargement. Toute la matière dans la mémoire.


0 commentaires

6 Réponses :


2
votes

La seule chose que je peux penser:

def distribute[T](n: Int)(x: Stream[T]) = (0 until n).map { p =>
  x.zipWithIndex.collect {
    case (e,i) if i % n == p => e
  }
}


1 commentaires

Oui, j'ai pensé à cela. Je dois fusionner les résultats des acteurs et le problème est que les résultats intermédiaires consomment également beaucoup de mémoire et je souhaite qu'il y ait quelques acteurs et le même nombre de tâches / résultats. Mais néanmoins, je pourrais modifier des acteurs pour réutiliser les résultats des tâches précédentes et je vais suivre de cette façon s'il n'y a pas de moyen simple de diviser un flux.



0
votes
scala> (1 to 30 grouped 3).toList.transpose foreach println
List(1, 4, 7, 10, 13, 16, 19, 22, 25, 28)
List(2, 5, 8, 11, 14, 17, 20, 23, 26, 29)
List(3, 6, 9, 12, 15, 18, 21, 24, 27, 30)

4 commentaires

Stream.from (1). Groupé (3)


Euh, ça fait mal. Après avoir examiné la mise en œuvre, j'ai vu que transpose vérifie si toutes les collections ont la même taille. Bien que ce soit un chèque valide, cela fait mal la nature paresseuse des flux . Merci d'avoir souligné cela, je vais apporter cela aux listes de diffusion ...


Je viens de poser une question à ce sujet. Stackoverflow.com/questions/17116061/... < / a> Cela semble être une limitation de l'abstraction du constructeur. Le problème est fondamentalement la ligne 170 dans generictraverableTemplate.scala github.com/scala/scala/blob/v2.10.2/src/library/scala/...


Donc, ce n'est pas le bon endroit pour vérifier si quelque chose est un bogue ou non, pas pas plus que les bonnes personnes de décider cela. Je garderai cela à l'esprit, si aucune bonne réponse n'apparaît, je vais apporter cela aux listes de diffusion.



2
votes

Une approche simple consiste à générer une séquence arithmétique pour les indices que vous souhaitez, puis en mappant que dans le flux. La méthode appliquée retirera les valeurs correspondantes:

def comb[A]( s:Stream[A], first:Int, step:Int ):Iterator[A] = new Iterator {
  var i       = first - step
  def hasNext = true
  def next    = { i += step; s(i) }
}
def g[A]( s:Stream[A], n:Int ) =
  0 until n map ( i => comb(s,i,n) )

g( Stream from 1, 3 ) map ( _ take 4 mkString "," )
// Vector(1,4,7,10, 2,5,8,11, 3,6,9,12)


3 commentaires

Créer des itérateurs a l'air bien. La seule chose qui, dans votre implémentation, Hasnext retourne toujours vrai - il ne traite que des collections infinies, car le code sera plus complexe. J'ai utilisé des acteurs de la bibliothèque Standard Scala, mais il semble que Akka vaut la peine d'apprendre, merci.


Voir aussi Stackoverflow.com/Questtions/11132788/... pour le cas fini.


Aie! La fonction "coulissante" avec l'étape a fait le tour. Il convient aussi aux flux. Donc, l'écriture d'un itérateur personnalisé pourrait être évitée.



0
votes

Je n'ai trouvé aucune fonction de cette fonction dans la bibliothèque Scala, j'ai donc rétrogradé la variante d'itératrice de la réponse d'Amigonico. Le code traite les collections finies et infinies.

  def splitRoundRobin[A](s: Iterable[A], n: Int) = {
    def comb[A](s: Iterable[A], first: Int, step: Int): Iterator[A] = new Iterator[A] {
      val iter = s.iterator
      var nextElem: Option[A] = iterToNext(first)
      def iterToNext(elemsToSkip: Int) = {
        iterToNextRec(None, elemsToSkip)
      }
      def iterToNextRec(next: Option[A], repeat: Int): Option[A] = repeat match {
        case 0 => next
        case _ => if (iter.hasNext) iterToNextRec(Some(iter.next()), repeat - 1) else None
      }
      def hasNext = nextElem.isDefined || {
        nextElem = iterToNext(step)
        nextElem.isDefined
      }
      def next = {
        var result = if (nextElem.isDefined) nextElem.get else throw new IllegalStateException("No next")
        nextElem = None
        result
      }
    }
    0 until n map (i => comb(s, i, n))
  }  

  splitRoundRobin(1 to 12 toStream, 3) map (_.toList.mkString(","))
 // Vector(3,6,9,12, 1,4,7,10, 2,5,8,11)

  splitRoundRobin(Stream from 1, 3) map (_.take(4).mkString(","))
//> Vector(3,6,9,12, 1,4,7,10, 2,5,8,11)


0 commentaires

0
votes
scala> roundRobin(3, Stream.from(0).take(10)).map(_.mkString).mkString(" ")
res5: String = 0369 147 258

0 commentaires

5
votes

La réponse de diviser une liste de scala en n Les listes d'entrelacement répondent entièrement aux conditions, un peu modifiées en fonction de flux:

def round[A](seq: Iterable[A], n: Int) = {
  (0 until n).map(i => seq.drop(i).sliding(1, n).flatten)
}
round(Stream.from(1),3).foreach(i => println(i.take(3).toList))
List(1, 4, 7)
List(2, 5, 8)
List(3, 6, 9)


0 commentaires