en utilisant RX , i Desire Pause et reprendre la fonctionnalité dans le code suivant:
Je crois que je pourrais le faire fonctionner avec une sorte de gatinage de champ booléen combiné avec verrouillage de fil ( mais y a-t-il un opérateur RX ou un autre raccourci réactif pour atteindre le même objectif? P> LI>
ul> p> moniteur.wait code> et
moniteur.pulse code>) p > li>
4 Réponses :
Cela fonctionne simplement:
class SimpleWaitPulse { static readonly object _locker = new object(); static bool _go; static void Main() { // The new thread will block new Thread (Work).Start(); // because _go==false. Console.ReadLine(); // Wait for user to hit Enter lock (_locker) // Let's now wake up the thread by { // setting _go=true and pulsing. _go = true; Monitor.Pulse (_locker); } } static void Work() { lock (_locker) while (!_go) Monitor.Wait (_locker); // Lock is released while weâre waiting Console.WriteLine ("Woken!!!"); } }
Je posais une question à propos d'une méthode RX pour atteindre cet objectif, mais comme il n'y a pas d'autres réponses, j'accepterai votre code> explication code> comme réponse :) J'ai trouvé Ce pausable observable mais malheureusement je ne comprends pas ça et Visual Studio ne peut pas résoudre créatewithDisposable code> pour moi / je ne sais pas ce que cela devrait faire.
@Cel: J'ai modifié le code que vous parlez de. Ils ont effondré créatewithdisposable code> dans une surcharge de
Créer code>.
Ici, c'est une application d'iconnectableOvrable que j'ai légèrement corrigé pour la nouvelle API (original ici ):
J'ai déjà employé la solution non RX, j'espère qu'une uppote pour une bonne réponse n'est pas déplaire.
Voici une façon raisonnablement simple RX de faire ce que vous voulez. J'ai créé une méthode d'extension appelée pausable code> qui prend une source observable et une seconde observable de booléen qui fait une pause ou reprend l'observable.
var xs = Observable.Generate(
0,
x => x < 100,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(0.1));
var bs = new Subject<bool>();
var pxs = xs.Pausable(bs);
pxs.Subscribe(x => { /* Do stuff */ });
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
J'ai déjà employé la solution non RX, j'espère qu'une uppote pour une bonne réponse n'est pas déplaire.
@Enigmativity pourquoi il perd certaines valeurs? Il devrait les préserver tous, non?
@ J.Lennon - Comment savez-vous que cela perd ses valeurs? Avez-vous du code de test qui le montre?
@Engmativitity oui. J'ai couru votre exemple dans Linqpad et le premier élément reçu dans l'abonnement était "4" et non "0". Corrigez-moi si je me trompe mais j'ai remarqué ici: "Func concat code> est un peu tour (il s'abonnera dans l'abonnement publié juste la ReplaySubject ont été complétés) qui provoquent la perte de valeurs.
Avez-vous corrigé le problème avec les valeurs perdues? J'ai le même problème dans une application de console.
@Superjmn - Si vous modifiez le code retour pauser.startwith (false) .Distinctntrontilchanged () code> à
retour pauser.startwith (true) .Distinctsntilhanged () code> Il commence par l'hypothèse que l'observable n'est pas en pause et que vous obtenez toutes les valeurs.
@Enigmativity peut-être que je ne comprends pas bien. J'ai essayé votre approche ici ( gist.github.com/superjmn/8e5cdac40117A4BCec4C ), mais rien n'apparaît dans la console. Je m'attendais à ce que les articles soient imprimés jusqu'à ce que B.onnext (True) soit atteint. Je ne sais pas ce que je fais mal, merci d'avance. J'espère que vous pourrez expliquer son utilisation avec un peu plus de perspicacité! :RÉ
@Superjmn - Il y a un problème dans mon code. Si vous ajoutez un b.onnext (FALSE); code> avant votre premier
Dormez code> Vous obtenez des résultats. Je pense avoir besoin d'un petit rejig.
@Enigmativité Ce serait génial si vous résolvez cela. En fait, je me considère que plusieurs niveaux sous vous en termes de Rx. Dire la vérité, je ne comprends pas ce qui se passe sous la cagoule, mais après ça va bien, je pourrais faire un aspect plus profond pour essayer d'avoir le sens de tout. Merci d'avance. J'ai vraiment hâte de voir votre version fixe!
Voici ma réponse. Je pense qu'il peut y avoir une condition de race autour de la résolution de pause, mais cela peut être atténué en sérialisant toute activité sur un planificateur. (favoriser sérieusement la synchronisation).
using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using Microsoft.Reactive.Testing; using NUnit.Framework; namespace StackOverflow.Tests.Q7620182_PauseResume { [TestFixture] public class PauseAndResumeTests { [Test] public void Should_pause_and_resume() { //Arrange var scheduler = new TestScheduler(); var isRunningTrigger = new BehaviorSubject<bool>(true); Action pause = () => isRunningTrigger.OnNext(false); Action resume = () => isRunningTrigger.OnNext(true); var source = scheduler.CreateHotObservable( ReactiveTest.OnNext(0.1.Seconds(), 1), ReactiveTest.OnNext(2.0.Seconds(), 2), ReactiveTest.OnNext(4.0.Seconds(), 3), ReactiveTest.OnNext(6.0.Seconds(), 4), ReactiveTest.OnNext(8.0.Seconds(), 5)); scheduler.Schedule(TimeSpan.FromSeconds(0.5), () => { pause(); }); scheduler.Schedule(TimeSpan.FromSeconds(5.0), () => { resume(); }); //Act var sut = Observable.Create<IObservable<int>>(o => { var current = source.Replay(); var connection = new SerialDisposable(); connection.Disposable = current.Connect(); return isRunningTrigger .DistinctUntilChanged() .Select(isRunning => { if (isRunning) { //Return the current replayed values. return current; } else { //Disconnect and replace current. current = source.Replay(); connection.Disposable = current.Connect(); //yield silence until the next time we resume. return Observable.Never<int>(); } }) .Subscribe(o); }).Switch(); var observer = scheduler.CreateObserver<int>(); using (sut.Subscribe(observer)) { scheduler.Start(); } //Assert var expected = new[] { ReactiveTest.OnNext(0.1.Seconds(), 1), ReactiveTest.OnNext(5.0.Seconds(), 2), ReactiveTest.OnNext(5.0.Seconds(), 3), ReactiveTest.OnNext(6.0.Seconds(), 4), ReactiveTest.OnNext(8.0.Seconds(), 5) }; CollectionAssert.AreEqual(expected, observer.Messages); } } }
N'y a-t-il pas une fuite de séquence source originale? Au lieu de retourner Isrunningtrigger, je l'étais dans une variable et je dispose d'une composition composable associée à une variable «connexion».