6
votes

Kotlin Coroutines - Comment bloquer pour attendre / rejoindre tous les jobs?

Je suis nouveau sur Kotlin / Coroutines, donc j'espère qu'il me manque quelque chose / je ne comprends pas complètement comment structurer mon code pour le problème que j'essaie de résoudre.

Essentiellement, je prends une liste de chaînes, et pour chaque élément de la liste je veux l'envoyer à une autre méthode pour faire le travail (faire un appel réseau et renvoyer des données basées sur la réponse). ( Modifier :) Je veux que tous les appels soient lancés simultanément, et bloqués jusqu'à ce que tous les appels soient terminés / que la réponse soit traitée, puis renvoyer une nouvelle liste avec les informations de chaque réponse.

Je ne comprends probablement pas encore complètement quand utiliser launch / async, mais j'ai essayé de suivre à la fois avec launch (avec joinAll ) et async (avec await code>).

fun processData(lstInputs: List<String>): List<response> {

    val lstOfReturnData = mutableListOf<response>()

    runBlocking {
        withContext(Dispatchers.IO) {
            val jobs = List(lstInputs.size) {
                launch {
                    lstOfReturnData.add(networkCallToGetData(lstInputs[it]))
                }
            }
            jobs.joinAll()
        }
    }

    return lstofReturnData

Ce à quoi je m'attends, c'est si mes lstInputs ont une taille de 120, lorsque toutes les tâches sont jointes, mon lstOfReturnData doit également avoir une taille de 120.

Ce qui se passe réellement, ce sont des résultats incohérents. Je vais l'exécuter une fois, et j'obtiens 118 dans ma liste finale, je l'exécute à nouveau, c'est 120, je l'exécute à nouveau, c'est 117, etc. Dans la méthode networkCallToGetData () , je gère tout exceptions, pour au moins renvoyer quelque chose pour chaque demande, indépendamment de l'échec de l'appel réseau.

Quelqu'un peut-il m'aider à expliquer pourquoi j'obtiens des résultats incohérents et ce que je dois faire pour m'assurer que je bloque correctement et que tous les emplois sont rejoints avant de continuer?


0 commentaires

3 Réponses :


1
votes

Le blocage d'exécution devrait signifier que vous n'avez pas besoin d'appeler join. Le lancement d'une coroutine à partir d'une étendue de blocage d'exécution devrait le faire pour vous. Avez-vous essayé juste:

fun processData(lstInputs: List<String>): List<response> {

val lstOfReturnData = mutableListOf<response>()

runBlocking {
    lstInputs.forEach {
            launch(Dispatchers.IO) {
                lstOfReturnData.add(networkCallToGetData(it))
            }
   } 
}

return lstofReturnData


0 commentaires

11
votes

mutableListOf () crée une ArrayList , qui n'est pas thread-safe.
Essayez plutôt d'utiliser ConcurrentLinkedQueue .

De plus, utilisez-vous la version stable de Kotlin / Kotlinx.coroutine (pas l'ancienne version expérimentale)? Dans la version stable, avec l'introduction de la concurrence structurée, il n'est plus nécessaire d'écrire jobs.joinAll . launch est une fonction d'extension de runBlocking qui lancera de nouvelles coroutines dans le cadre des domaines runBlocking et runBlocking attendra automatiquement la fin de tous les jobs lancés. Ainsi, le code ci-dessus peut être raccourci en

val lstOfReturnData = ConcurrentLinkedQueue<response>()
runBlocking {
        lstInputs.forEach {
            launch(Dispatches.IO) {
                lstOfReturnData.add(networkCallToGetData(it))
            }
        }
}
return lstOfReturnData


1 commentaires

Merci beaucoup pour la réponse et le pointeur sur l'utilisation de ConcurrentLinkedQueue, et les mises à jour sur l'API stable. Oui, j'utilise la nouvelle version, mais une grande partie du code de mon équipe est sur l'ancienne, donc je vois que j'ai un peu mélangé les choses. Je viens de faire quelques tests initiaux où j'introduis un delay (2000) et cela semble fonctionner comme je le souhaite. Tous les éléments se déclenchent (et n'attendez pas que chacun ait dépassé le délai), tous se rejoignent une fois terminé, et la taille de la liste par la suite est ce que j'attends. Je ferai des tests plus approfondis ce week-end et accepterai votre réponse si elle semble toujours bonne.



3
votes

runBlocking bloque le thread actuel de manière interrompue jusqu'à son achèvement. Je suppose que ce n'est pas ce que tu veux. Si je pense mal et que vous voulez bloquer le thread actuel, vous pouvez vous débarrasser de la coroutine et simplement faire un appel réseau dans le thread actuel:

class Presenter(private val uiContext: CoroutineContext = Dispatchers.Main) 
    : CoroutineScope {

    // creating local scope for coroutines
    private var job: Job = Job()
    override val coroutineContext: CoroutineContext
        get() = uiContext + job

    // call this to cancel job when you don't need it anymore
    fun detach() {
        job.cancel()
    }

    fun processData(lstInputs: List<String>) {

        launch {
            val deferredList = lstInputs.map { 
                async(Dispatchers.IO) { networkCallToGetData(it) } // runs in parallel in background thread
            }
            val lstOfReturnData = deferredList.awaitAll() // waiting while all requests are finished without blocking the current thread

            // use lstOfReturnData in Main Thread, e.g. update UI
        }
    }
}

Mais si ce n'est pas votre intention vous pouvez faire ce qui suit:

val lstOfReturnData = mutableListOf<response>()
lstInputs.forEach {
    lstOfReturnData.add(networkCallToGetData(it))
} 


1 commentaires

Cette approche semble plus efficace, vous pouvez me corriger si je me trompe mais comme tous les jobs seront exécutés simultanément, le temps d'exécution sera moindre par rapport à la réponse acceptée.