2
votes

Comment DROP_LATEST avec coroutine Flow <T>?

Veuillez aider. J'essayais de faire:

Event for A
A1
A2
A3

J'ai donc InitailFlow qui émet un court laps de temps (2 secondes) qui est ensuite transformé en AnotherFlow qui prend plus de temps à se terminer (15 secondes au total) ... Je voudrais supprimer les autres éléments entrants émis par l'InitialFlow pendant AnotherFlow n'est pas fini ...

J'ai essayé:

Event for A
A1
A2
A3
Event for B
B1
B2
B3

Mais pour une raison quelconque, voici le résultat:

flow{
    delay(2000)
    emit("A")
    delay(2000)
    emit("B")
    delay(2000)
    emit("C")
}.buffer(0, BufferOverflow.DROP_LATEST)
    .onEach {
       println("Event for $it")
    }
    .flatMapConcat {
       flow {
           delay(5000)
           emit("${it}1")
           delay(5000)
           emit("${it}2")
           delay(5000)
           emit("${it}3")
        }
     }
     .onEach {
         println(it)
     }
     .launchIn(scope)

Il traite toujours l' Event B pour une raison quelconque, même lorsque j'ai un .buffer(0, BufferOverflow.DROP_LATEST) .

Pourquoi traite-t-il encore l'événement B?

Y a-t-il un moyen de faire cela? Je m'attends à ce que la sortie soit uniquement:

    2sec     2sec     2sec
------[A]------[B]------[C]------...----------------> InitailFlow
       \        |        | 
        \      drop      drop
         \
     5sec \    5sec        5sec
----------[1]---------[2]---------[3]-----|> AnotherFlow
result: [A1, A2, A3]

Merci d'avance.


8 commentaires

Quelle version de coroutines utilisez-vous? Je crois que j'utilise la dernière mais je ne trouve pas la fonction .buffer(0, BufferOverflow.DROP_LATEST) ...


@Deadbeef Il a été ajouté dans la version 1.4.0


Je n'obtiens pas vraiment ce que vous attendez. Pouvez-vous donner un exemple de la sortie souhaitée?


@Deadbeef son 1.4.0-M1


@marstran Bien sûr! J'ai ajouté la sortie attendue ... c'est aussi la même sortie que le "graphique" dans ma question .. Donc, juste pour clarifier un peu plus ... j'aimerais cela tant que l'autre flux dans le flatMap n'est pas terminé pourtant ... tous les autres événements entrants doivent être supprimés .. si le flatMap flow termine, il pourrait alors reprendre un autre événement ...


Vous pouvez obtenir genre de près à l' aide du conflate -operator. Mais cela l'amènera à capter le dernier élément émis par le flux externe lorsque le premier flux interne sera terminé. Il vous donnera A1 A2 A3 C1 C2 C3 . C'est juste équivalent à utiliser "DROP_OLDEST" au lieu de "DROP_LATEST" dans votre appel de tampon.


La raison pour laquelle vous obtenez les événements B lors de l'utilisation de DROP_LATEST est que la stratégie décide uniquement de l'élément à conserver lorsqu'il y a déjà une valeur en attente du tampon amalgamé. Donc, avec DROP_LATEST, B est suspendu en attendant que A se termine, alors vous choisissez de conserver B lorsque C arrive parce que C était le plus récent. Avec DROP_OLDEST, vous choisissez de conserver C car B était le plus ancien.


@marstran, je vois ... cela a du sens ... hmm ... mais j'ai l'impression que mon cas d'utilisation est à peu près valide et devrait être pris en charge ... vous ne pensez pas? y a-t-il un autre moyen de contourner cela?


3 Réponses :


-1
votes

Chaque fois emit est appelée, elle est suspendue de sorte que l'opérateur de buffer n'a aucun effet.

Je ne suis pas sûr de la sortie que vous attendez mais je pense que vous recherchez l'opérateur conflate :

flow{
    delay(2000)
    emit("A")
    delay(2000)
    emit("B")
    delay(2000)
    emit("C")
}
.conflate()
.onEach {
    println("Event for $it")
}
.flatMapConcat {
    flow {
        delay(5000)
        emit("${it}1")
        delay(5000)
        emit("${it}2")
        delay(5000)
        emit("${it}3")
    }
}
.onEach {
    println(it)
}
.launchIn(scope)

Les conflate Etats documents officiels de l' opérateur:

Conflit les émissions de flux via un canal conflué et fait fonctionner le collecteur dans une coroutine séparée. L'effet de ceci est que l'émetteur n'est jamais suspendu en raison d'un collecteur lent, mais le collecteur obtient toujours la valeur la plus récente émise.

Vous trouverez la conflate documentation officielle de l' opérateur ici .


2 commentaires

Comme mentionné dans le commentaire ci-dessus, l'utilisation de conflate ne permet pas d'atteindre ce que OP voulait faire.


conflate ne fonctionne pas .. ne supprime pas la DERNIÈRE valeur .. il supprime la PLUS ANCIENNE ... Voici la documentation: "/ ** * Demande un canal conflué dans la fonction d'usine Channel(...) . C'est un raccourci pour créer * un canal avec [ onBufferOverflow = DROP_OLDEST ] [BufferOverflow.DROP_OLDEST]. * / "



2
votes

Cela devrait fonctionner pour vous:

fun <T> Flow<T>.dropIfBusy(): Flow<T> = flow {
    coroutineScope {
        val channel = produce(capacity = Channel.RENDEZVOUS) {
            collect { offer(it) }
        }
        channel.consumeEach { emit(it) }
    }
}

Il s'agit essentiellement de l'implémentation de buffer "naïve" de la documentation kotlin
La seule différence ici est que nous utilisons channel.offer au lieu de channel.send Lorsqu'il est utilisé en conjonction avec un canal RENDEZVOUS, toutes les valeurs qui sont proposées au canal, lorsqu'il est suspendu, sont supprimées, créant le comportement souhaité.


7 commentaires

Vous n'avez pas encore essayé, mais si cela fonctionne, cela ne devrait-il pas être l'implémentation de .buffer(RENDEZVOUS, DROP_LATEST) toute façon? coz RENDEZVOUS == 0.


Je le pensais moi-même au départ, mais la documentation indique "Pour implémenter l'une ou l'autre des stratégies personnalisées [DROP_OLDEST / DROP_LATEST], un tampon d'au moins un élément est utilisé."


Solution intelligente, mais cela ne fonctionne pas vraiment non plus. J'obtiens cette sortie: Event for A, Event for B, Event for C, C1, C2, C3 . Je pense que c'est parce que cette fonction émettra tous les événements du flux amont avant de continuer avec le flatMapConcat .


Avez-vous remplacé .buffer(0, BufferOverflow.DROP_LATEST) dans votre code par dropIfBusy() ? J'obtiens la sortie attendue en utilisant cela


Si cela ne fonctionne toujours pas, cela peut dépendre du contexte. Quel Dispatcher utilise scope ?


Je pourrais confirmer que cette solution fonctionne ... mais je trouve toujours déroutant pourquoi c'est le cas ... dans mon .buffer(RENDEVOUS, DROP_LATEST) doit fonctionner de cette façon. Vous n'êtes pas d'accord? J'ai déposé un problème dans le github coroutine ici: github.com/Kotlin/kotlinx.coroutines/issues/2391 . J'espère qu'ils le verront et y répondront.


@AdrianK Désolé, je viens de réaliser que j'utilisais encore du code d'une tentative précédente. Maintenant je l'ai fait fonctionner :)



1
votes

Après avoir joué un peu avec la solution de @ AdrianK, j'ai trouvé une solution plus simple en utilisant channelFlow . Étant donné que channelFlow actuellement une API expérimentale, vous devez toutefois vous channelFlow pour l'utiliser.

Comme ça:

fun <T> Flow<T>.dropIfBusy(): Flow<T> = channelFlow {
    collect { offer(it) }
}.buffer(0)


0 commentaires