11
votes

Pause et reprendre l'abonnement sur le froid iobservable

en utilisant RX , i Desire Pause et reprendre la fonctionnalité dans le code suivant:

Comment mettre en place une pause () et reprendre ()? xxx

solution RX?
  • Je crois que je pourrais le faire fonctionner avec une sorte de gatinage de champ booléen combiné avec verrouillage de fil ( moniteur.wait et moniteur.pulse )

  • mais y a-t-il un opérateur RX ou un autre raccourci réactif pour atteindre le même objectif?


0 commentaires

4 Réponses :


-1
votes

Cela fonctionne simplement:

    class SimpleWaitPulse
    {
      static readonly object _locker = new object();
      static bool _go;

      static void Main()
      {                                // The new thread will block
        new Thread (Work).Start();     // because _go==false.

        Console.ReadLine();            // Wait for user to hit Enter

        lock (_locker)                 // Let's now wake up the thread by
        {                              // setting _go=true and pulsing.
          _go = true;
          Monitor.Pulse (_locker);
        }
      }

      static void Work()
      {
        lock (_locker)
          while (!_go)
            Monitor.Wait (_locker);    // Lock is released while we’re waiting

        Console.WriteLine ("Woken!!!");
      }
    }


2 commentaires

Je posais une question à propos d'une méthode RX pour atteindre cet objectif, mais comme il n'y a pas d'autres réponses, j'accepterai votre explication comme réponse :) J'ai trouvé Ce pausable observable mais malheureusement je ne comprends pas ça et Visual Studio ne peut pas résoudre créatewithDisposable pour moi / je ne sais pas ce que cela devrait faire.


@Cel: J'ai modifié le code que vous parlez de. Ils ont effondré créatewithdisposable dans une surcharge de Créer .



3
votes

Ici, c'est une application d'iconnectableOvrable que j'ai légèrement corrigé pour la nouvelle API (original ici ): xxx


1 commentaires

J'ai déjà employé la solution non RX, j'espère qu'une uppote pour une bonne réponse n'est pas déplaire.



13
votes

Voici une façon raisonnablement simple RX de faire ce que vous voulez. J'ai créé une méthode d'extension appelée pausable code> qui prend une source observable et une seconde observable de booléen qui fait une pause ou reprend l'observable.

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);


9 commentaires

J'ai déjà employé la solution non RX, j'espère qu'une uppote pour une bonne réponse n'est pas déplaire.


@Enigmativity pourquoi il perd certaines valeurs? Il devrait les préserver tous, non?


@ J.Lennon - Comment savez-vous que cela perd ses valeurs? Avez-vous du code de test qui le montre?


@Engmativitity oui. J'ai couru votre exemple dans Linqpad et le premier élément reçu dans l'abonnement était "4" et non "0". Corrigez-moi si je me trompe mais j'ai remarqué ici: "Func > Switcher = B => {...};" Basé dans la variable 'B', True est de faire une pause et de FALSE est de continuer, le code commence par FAUX et l'opérateur concat est un peu tour (il s'abonnera dans l'abonnement publié juste la ReplaySubject ont été complétés) qui provoquent la perte de valeurs.


Avez-vous corrigé le problème avec les valeurs perdues? J'ai le même problème dans une application de console.


@Superjmn - Si vous modifiez le code retour pauser.startwith (false) .Distinctntrontilchanged () à retour pauser.startwith (true) .Distinctsntilhanged () Il commence par l'hypothèse que l'observable n'est pas en pause et que vous obtenez toutes les valeurs.


@Enigmativity peut-être que je ne comprends pas bien. J'ai essayé votre approche ici ( gist.github.com/superjmn/8e5cdac40117A4BCec4C ), mais rien n'apparaît dans la console. Je m'attendais à ce que les articles soient imprimés jusqu'à ce que B.onnext (True) soit atteint. Je ne sais pas ce que je fais mal, merci d'avance. J'espère que vous pourrez expliquer son utilisation avec un peu plus de perspicacité! :RÉ


@Superjmn - Il y a un problème dans mon code. Si vous ajoutez un b.onnext (FALSE); avant votre premier Dormez Vous obtenez des résultats. Je pense avoir besoin d'un petit rejig.


@Enigmativité Ce serait génial si vous résolvez cela. En fait, je me considère que plusieurs niveaux sous vous en termes de Rx. Dire la vérité, je ne comprends pas ce qui se passe sous la cagoule, mais après ça va bien, je pourrais faire un aspect plus profond pour essayer d'avoir le sens de tout. Merci d'avance. J'ai vraiment hâte de voir votre version fixe!



3
votes

Voici ma réponse. Je pense qu'il peut y avoir une condition de race autour de la résolution de pause, mais cela peut être atténué en sérialisant toute activité sur un planificateur. (favoriser sérieusement la synchronisation).

using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Microsoft.Reactive.Testing;
using NUnit.Framework;

namespace StackOverflow.Tests.Q7620182_PauseResume
{
    [TestFixture]
    public class PauseAndResumeTests
    {
        [Test]
        public void Should_pause_and_resume()
        {
            //Arrange
            var scheduler = new TestScheduler();

            var isRunningTrigger = new BehaviorSubject<bool>(true);
            Action pause = () => isRunningTrigger.OnNext(false);
            Action resume = () => isRunningTrigger.OnNext(true);

            var source = scheduler.CreateHotObservable(
                ReactiveTest.OnNext(0.1.Seconds(), 1),
                ReactiveTest.OnNext(2.0.Seconds(), 2),
                ReactiveTest.OnNext(4.0.Seconds(), 3),
                ReactiveTest.OnNext(6.0.Seconds(), 4),
                ReactiveTest.OnNext(8.0.Seconds(), 5));

            scheduler.Schedule(TimeSpan.FromSeconds(0.5), () => { pause(); });
            scheduler.Schedule(TimeSpan.FromSeconds(5.0), () => { resume(); });



            //Act
            var sut = Observable.Create<IObservable<int>>(o =>
            {
                var current = source.Replay();
                var connection = new SerialDisposable();
                connection.Disposable = current.Connect();

                return isRunningTrigger
                    .DistinctUntilChanged()
                    .Select(isRunning =>
                    {
                        if (isRunning)
                        {
                                //Return the current replayed values.
                                return current;
                        }
                        else
                        {
                                //Disconnect and replace current.
                                current = source.Replay();
                                connection.Disposable = current.Connect();
                                //yield silence until the next time we resume.
                                return Observable.Never<int>();
                        }

                    })
                    .Subscribe(o);
            }).Switch();

            var observer = scheduler.CreateObserver<int>();
            using (sut.Subscribe(observer))
            {
                scheduler.Start();
            }

            //Assert
            var expected = new[]
            {
                    ReactiveTest.OnNext(0.1.Seconds(), 1),
                    ReactiveTest.OnNext(5.0.Seconds(), 2),
                    ReactiveTest.OnNext(5.0.Seconds(), 3),
                    ReactiveTest.OnNext(6.0.Seconds(), 4),
                    ReactiveTest.OnNext(8.0.Seconds(), 5)
                };
            CollectionAssert.AreEqual(expected, observer.Messages);
        }
    }
}


1 commentaires

N'y a-t-il pas une fuite de séquence source originale? Au lieu de retourner Isrunningtrigger, je l'étais dans une variable et je dispose d'une composition composable associée à une variable «connexion».