J'ai une fonction générant une très longue séquence d'éléments de travail. La génération de ces éléments est rapide, mais il y en a trop au total pour en stocker une liste en mémoire. Le traitement des éléments ne produit aucun résultat, juste des effets secondaires.
Je voudrais traiter ces éléments sur plusieurs threads. Une solution est d'avoir un thread lu depuis le générateur et d'écrire dans une file d'attente limitée simultanée, et un certain nombre de threads exécuteurs interrogeant le travail de la file d'attente limitée, mais c'est beaucoup de choses à configurer.
Y a-t-il quelque chose dans la bibliothèque standard qui pourrait m'aider à faire cela?
J'avais d'abord essayé
items.map { async(executor) process(it) }.forEach { it.await() }
Mais, comme indiqué dans comment implémenter le mappage parallèle pour les séquences dans kotlin , cela ne fonctionne pas t fonctionne pour des raisons qui sont évidentes rétrospectivement.
Y a-t-il un moyen rapide de le faire (éventuellement avec une bibliothèque externe), ou est-ce que la configuration manuelle d'une file d'attente limitée au milieu est ma meilleure option? p>
3 Réponses :
Vous pouvez consulter les coroutines combinés avec canaux .
Si tous les éléments de travail peuvent être envoyés à la demande avec le canal producteur. Ensuite, il est possible d'attendre chaque élément et de le traiter avec un pool de threads.
Un exemple:
sealed class Stream {
object End: Stream()
class Item(val data: Long): Stream()
}
val produceCtx = newSingleThreadContext("producer")
// A dummy producer that send one million Longs on its own thread
val producer = CoroutineScope(produceCtx).produce {
for (i in (0 until 1000000L)) send(Stream.Item(i))
send(Stream.End)
}
val workCtx = newFixedThreadPoolContext(4, "work")
val workers = Channel<Unit>(4)
repeat(4) { workers.offer(Unit) }
for(_nothing in workers) { // launch 4 times then wait for a task to finish
launch(workCtx) {
when (val item = producer.receive()) {
Stream.End -> workers.close()
is Stream.Item -> {
workFunction(item.data) // Actual work here
workers.offer(Unit) // Notify to launch a new task
}
}
}
}
Votre mot magique serait .asSequence():
items
.asSequence() // Creates lazy executable sequence
.forEach { launch { executor.process(it) } } // If you don't need the value aftrwards, use 'launch', a.k.a. "fire and forget"
mais il y en a trop au total pour en stocker une liste en mémoire
Alors ne mappez pas à la liste et ne collectez pas les valeurs, peu importe si vous travaillez avec Kotlin ou Java.
Tant que vous êtes sur la JVM, vous pouvez écrire vous-même une fonction d'extension, qui fonctionne la séquence par blocs et génère des futures pour toutes les entrées d'un bloc. Quelque chose comme ceci:
items
.mapParallel { /* process an item */ }
.forEach { /* handle the result */ }
Vous pouvez alors l'utiliser simplement comme:
@Suppress("UNCHECKED_CAST")
fun <T, R> Sequence<T>.mapParallel(action: (value: T) -> R?): Sequence<R?> {
val numThreads = Runtime.getRuntime().availableProcessors() - 1
return this
.chunked(numThreads)
.map { chunk ->
val threadPool = Executors.newFixedThreadPool(numThreads)
try {
return@map chunk
.map {
// CAUTION -> needs to be written like this
// otherwise the submit(Runnable) overload is called
// which always returns an empty Future!!!
val callable: () -> R? = { action(it) }
threadPool.submit(callable)
}
} finally {
threadPool.shutdown()
}
}
.flatten()
.map { future -> future.get() }
}
Tant que la charge de travail par élément est similaire, cela donne un bon traitement parallèle.
Jetez un œil à ceci: kotlinlang.org/docs/reference/coroutines/flow.html < / a>
Autant que je sache, les exécutions parallèles sont l'un des (rares) avantages que Java Stream a encore sur la séquence Kotlin. Alors peut-être que vous pourriez utiliser des flux à la place.
@findusl - J'ai tenté quelque chose avec un flux parallèle, mais cela ne semble pas bien fonctionner. Je n'ai pas de bonne source, mais par exemple stackoverflow.com/a/30826175 "Il existe des problèmes connus concernant le traitement de flux infinis en parallèle. En particulier, il est impossible de diviser tâche à parts égales efficacement. "