2
votes

Séquence observable complète après un «long» intervalle

La séquence observable suivante ajoute chaque élément à un ReplaySubject afin que je puisse accéder à tous les éléments plus tard et même attendre l'achèvement du ReplaySubject. Il termine le ReaplySubject après un laps de temps.

                characteristic.WhenNotificationReceived()
                .Timeout(TimeSpan.FromSeconds(1))
                .Subscribe(
                    onNext: result =>
                    {
                        string nextTag = BitConverter.ToString(result.Data);
                        nextTag = nextTag.Replace("-", "");

                        rfidPlayer.OnNext(nextTag);
                    },
                    onError: error =>
                    {
                        rfidPlayer.OnCompleted();
                    });

Je voudrais que la séquence s'exécute jusqu'à ce qu'il y ait un temps donné depuis le dernier appel "OnNext", puis se termine. Cela sera très utile dans une variété de scénarios de communication Bluetooth où le périphérique Bluetooth me donnera une séquence de données, puis s'arrêtera sans aucun type de message ou d'événement d'achèvement. Dans ces scénarios, je dois déterminer de manière heuristique quand la séquence est terminée, puis la terminer moi-même. Donc, si cela fait "trop longtemps" depuis la dernière notification Bluetooth, je voudrais terminer le ReplaySubject.

Je pourrais le faire en créant une minuterie, en la réinitialisant lorsque chaque élément est reçu, puis en complétant le ReplaySubject lorsque la minuterie atteint «trop longtemps», mais j'ai entendu dire que créer un objet et le manipuler à partir d'un abonnement observable n'est pas thread safe.

Une suggestion sur la façon dont je peux terminer une séquence après un intervalle "trop long"?

Voici une version qui n'est pas thread-safe d'après ce que j'ai entendu mais qui devrait fonctionner comme prévu:

bool reading = true;
System.Timers.Timer timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
    reading = false;
};

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeWhile(x => reading)
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            timer.Stop();
            timer.Start();

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });

cela semble satisfaisant d'après la première réponse de Simonare:

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });


5 commentaires

Veuillez ne pas utiliser d'état externe (ie bool reading = true; ) et n'utilisez pas de minuterie. Vous travaillez avec un outil extrêmement puissant qui n'en a pas besoin. Lorsque vous utilisez Rx correctement, cela fonctionne à merveille.


Essayez également de ne pas travailler dans votre .Subscribe . Essayez ceci à la place: characteristic.WhenNotificationReceived().TakeUntil(Observab‌​le.Timer(TimeSpan.Fr‌​omSeconds(1))).Selec‌​t(result => BitConverter.ToString(result.Data).Replace("-", "")).Subscribe(rfidPlayer); .


TakeUntil définit-il un minuteur global ou un minuteur d'intervalle?


@Enigmativity Et merci pour vos commentaires, mon Rx a un long chemin à parcourir mais il s'améliore à pas de géant en grande partie grâce à vos retours.


Pas de soucis. J'ai ajouté une réponse dont je pense que vous pourriez tirer beaucoup. Il utilise deux des opérateurs qui, selon moi, sont parmi les plus importants mais sont souvent sous-utilisés - Generate et Switch . Faites-moi savoir si vous le trouvez utile.


3 Réponses :


1
votes

vous pouvez envisager d'utiliser Timeout Operator . Le seul inconvénient est qu'il se termine par un signal d'erreur. Vous devrez peut-être gérer une fausse erreur

L'opérateur Timeout vous permet d'abandonner un Observable avec une terminaison onError si cet Observable ne parvient pas à émettre des éléments pendant un laps de temps spécifié.

Si vous utilisez l'approche ci-dessous, vous pouvez surpasser l'erreur

characteristic.WhenNotificationReceived()
    .Timeout(TimeSpan.FromSeconds(1))
    .Subscribe(
        onNext: result =>
        {
            ....
            rfidPlayer.OnNext(....);
        },
        onError: error =>
        {
            rfidPlayer.OnCompleted();
        });

Une autre variante vous permet d'indiquer à timeout de basculer vers une sauvegarde Observable que vous spécifiez, plutôt que de se terminer avec une erreur, si la condition de timeout est déclenchée.


 .Timeout(200, Promise.resolve(42));


8 commentaires

L'opérateur timeout fonctionnerait de la même manière que l'opérateur TakeUntil sauf qu'il lèverait une exception au lieu de terminer la séquence.


vous pouvez vous en tenir à TakeUntil, je ne pense pas que vous rencontrerez un problème avec celui-ci.


TakeUntil se termine après un temps global écoulé. Je cherche une solution qui se termine lorsqu'un intervalle donné depuis l'arrivée du dernier élément passe. J'ai publié un exemple de code qui se comporte comme je le souhaite mais qui n'est pas thread-safe.


Timeout fait ça


c'est ce qu'il fait. Bien que ce ne soit pas très satisfaisant d'utiliser intentionnellement une prise pour travailler pour moi.


Vérifiez ma réponse modifiée, vous n'avez pas besoin de lever une exception, vous pouvez lui transmettre une autre promesse. dans votre cas, vous pouvez passer un observable vide afin qu'aucune erreur ne se produise.


Je n'ai pas accès aux promesses, donc cela ne fonctionnera pas tout à fait pour moi, ce deuxième argument devrait être IScheduler ou Observable et je dois être en mesure de compléter le rfidPlayer ReplaySubject.


jetez un œil à ma modification ci-dessus. J'ai marqué la vôtre comme réponse mais ce serait bien si vous mettiez à jour votre réponse pour inclure le bloc onError.



0
votes

Je trouve désagréable d'utiliser Timeout cause des exceptions.

Je préfère injecter une valeur dans la séquence que je peux utiliser pour terminer la séquence. Si ma séquence, par exemple, produit des nombres non négatifs, alors si j'injecte un -1 je sais pour terminer la séquence.

Voici un exemple:

Commencez par cette observable qui génère des puissances de 2 commençant par 1 et elle retarde également la production de chaque valeur du nombre de millisecondes de la valeur en attente.

characteristic
    .WhenNotificationReceived()
    .Select(result => BitConverter.ToString(result.Data).Replace("-", ""))
    .Select(x => Observable.Timer(TimeSpan.FromSeconds(1.0)).Select(y => (string)null).StartWith(x))
    .Switch()
    .TakeWhile(x => x != null)
    .Subscribe(rfidPlayer);

Donc 1, 2, 4, 8, etc., de plus en plus lentement.

Maintenant, je veux arrêter cette séquence s'il n'y a pas de valeurs pendant 3.0 secondes, je peux le faire:

1 
2 
4 
8 
16 
32 
64 
128 
256 
512 
1024 
2048 

Si j'exécute cette séquence, j'obtiens cette sortie:

    .Select(x => Observable.Timer(TimeSpan.FromSeconds(3.0)).Select(y => -1).StartWith(x))
    .Switch()
    .TakeWhile(x => x >= 0)

La séquence est sur le point de produire 4096 mais elle attend d'abord 4096 millisecondes pour produire cette valeur - en attendant, Observable.Timer(TimeSpan.FromSeconds(3.0)) déclenche et génère un -1 , arrêtant ainsi la séquence.

L'élément clé de cette requête est l'utilisation de Switch . Il prend un IObservable<IObservable<T>> et produit un IObservable<T> en s'abonnant uniquement à la dernière observable externe et en se désabonnant de la précédente.

Ainsi, dans ma requête, chaque nouvelle valeur produite par la séquence s'arrête et redémarre le Timer .

Dans votre cas, votre observable ressemblerait à ceci:

Observable
    .Generate(1, x => true, x => 2 * x, x => x, x => TimeSpan.FromMilliseconds(x))


0 commentaires

0
votes

Voici un opérateur personnalisé TakeUntilTimeout vous pouvez utiliser, qui est une fine couche au-dessus de l'opérateur Timeout intégré.

/// <summary>
/// Applies a timeout policy for each element in the observable sequence.
/// If the next element isn't received within the specified timeout duration
/// starting from its predecessor, the sequence terminates.
/// </summary>
public static IObservable<T> TakeUntilTimeout<T>(
    this IObservable<T> source,
    TimeSpan timeout)
{
    return source.Timeout(timeout, Observable.Empty<T>());
}


0 commentaires