6
votes

Créer un observable à partir d'une demande asynchrone périodique

Je veux un moyen générique de convertir une méthode asynchrone en observable. Dans mon cas, je traite des méthodes qui utilisent HttpClient pour récupérer des données à partir d'une API.

Disons que nous avons la méthode Task<string> GetSomeData() qui doit devenir une seule Observable<string> où les valeurs sont générées comme une combinaison de:

  • Appels périodiques répétés à GetSomeData() (par exemple toutes les x secondes)
  • Appels déclenchés manuellement à GetSomeData() à tout moment (par exemple, lorsque l'utilisateur frappe l'actualisation).

Comme il existe deux façons de déclencher l'exécution de la GetSomeData() concurrence peut être un problème. Pour éviter d'exiger que GetSomeData() soit thread-safe, je souhaite limiter la concurrence afin qu'un seul thread exécute la méthode en même temps. En conséquence, je dois gérer les demandes qui se chevauchent avec une certaine stratégie. J'ai fait une (sorte de) diagramme en marbre essayant de décrire le problème et le résultat souhaité

diagramme de marbre

Mon instinct me dit qu'il existe un moyen simple d'y parvenir, alors donnez-moi quelques idées :)

C'est la solution que j'ai jusqu'à présent. Cela ne résout malheureusement pas le problème de la concurrence.

static async Task Main(string[] args)
{
    //Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
    var someService = new SomeService();
    var stopwatch = Stopwatch.StartNew();
    var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
    observableWrapper.Stream
        .Take(6)
        .Subscribe(x => 
            {
                Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
            });

    await Task.Delay(4000);
    observableWrapper.TriggerAdditionalCall();
    observableWrapper.TriggerAdditionalCall();
    Console.ReadLine();
}

Méthode d'extension pour répéter avec retard:

class SomeService
{
    private int _ticks = 0;

    public async Task<string> GetSomeValueAsync()
    {
        //Just a hack to dermine if request was triggered manuall or by timer
        var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";

        //Here we have a data race! We would like to limit access to this method 
        var valueToReturn = $"{_ticks} ({initiatationWay})";

        await Task.Delay(500);
        _ticks += 1; 
        return valueToReturn;
    }
}

Un exemple de service contenant la méthode pour générer l'observable

static class Extensions
{
    public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();
}

Utilisé comme ceci (une course aux données se produira):

    public class ObservableCreationWrapper<T>
    {
        private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
        private Func<Task<T>> _methodToCall;
        private IObservable<T> _manualCalls;

        public IObservable<T> Stream { get; private set; }

        public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
        {
            _methodToCall = methodToCall;
            _manualCalls = _manualCallsSubject.AsObservable()
                .Select(x => Observable.FromAsync(x => methodToCall()))
                .Merge(1);

            Stream = Observable.FromAsync(() => _methodToCall())
                .DelayRepeat(period)
                .Merge(_manualCalls);
        }

        public void TriggerAdditionalCall()
        {
            _manualCallsSubject.OnNext(Unit.Default);
        }
    }


7 commentaires

Vous avez raison de dire que c'est simple, mais veuillez nous donner un code pour travailler avec. À quoi ressemblent les «points de terminaison asynchrones normaux»? À quoi ressemble un "signal de l'application"? Un clic sur un bouton? Une minuterie?


@Enigmativity J'ai ajouté un exemple de code de ce que j'ai jusqu'à présent. Cela fonctionne, mais ne répond pas entièrement à mes exigences. De plus, je soupçonne qu'il y a des améliorations à apporter.


J'ai du mal à comprendre les exigences d'une solution acceptable. Pourriez-vous concevoir un diagramme en marbre montrant des exemples de données d'entrée et de sortie? Vous n'avez pas besoin de le dessiner dans Photoshop, vous pouvez simplement utiliser des caractères ASCII simples comme ceci: Source: +--1-2-3--4--| , Result: +--ABC--D--| .


@TheodorZoulias Merci pour vos commentaires! J'ai réécrit la question pour la rendre plus claire et j'ai ajouté une (sorte de) diagramme en marbre.


Beau diagramme en marbre! La question est maintenant très claire à mon humble avis. Je ne sais pas pourquoi il a été rejeté.


Que se passe-t-il avec deux appels déclenchés manuellement proches l'un de l'autre, de sorte que le second soit déclenché avant la fin du premier? Annule-t-il la méthode asynchrone en cours d'exécution?


Que doit-il se passer en cas d'échec de methodToCall avec une exception? Le IObservable<T> Stream terminer dans un état d'échec?


3 Réponses :


3
votes

Voici mon point de vue sur ce problème:


Mise à jour: J'ai pu grandement simplifier ma solution proposée en empruntant des idées à la réponse d'Enigmativity. La méthode Observable.StartAsync gère automatiquement le désordre de l'annulation, et l'exigence d'une exécution sans chevauchement peut être appliquée simplement en utilisant un SemaphoreSlim .

19:52:43.684 Begin 1 periodic
19:52:44.208 Received #1 periodic
19:52:44.731 Begin 2 periodic
19:52:45.235 Received #2 periodic
19:52:45.729 Begin 3 periodic
19:52:46.232 Received #3 periodic
19:52:46.720 Begin 4 periodic
19:52:46.993 Begin 5 manual
19:52:47.220 Begin 6 manual
19:52:47.723 Received #6 manual
19:52:48.223 Begin 7 periodic
19:52:48.728 Received #7 periodic
19:52:49.227 Begin 8 periodic
19:52:49.730 Received #8 periodic
19:52:50.226 Begin 9 periodic

L'argument out Action manualInvocation est le mécanisme qui déclenche une invocation manuelle.

Exemple d'utilisation:

int ticks = 0;
var subscription = PeriodicAndManual(async (isManual, token) =>
{
    var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
    await Task.Delay(500, token);
    return id;
}, TimeSpan.FromMilliseconds(1000), out var manualInvocation)
.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
.Subscribe();

await Task.Delay(3200);
manualInvocation();
await Task.Delay(200);
manualInvocation();
await Task.Delay(3200);

subscription.Dispose();

Production:

/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// function that is invoked periodically and manually. Overlapping invocations
/// are prevented. Timer ticks that would cause overlapping are ignored.
/// Manual invocations cancel previous invocations, and restart the timer.
/// </summary>
public static IObservable<T> PeriodicAndManual<T>(
    Func<bool, CancellationToken, Task<T>> functionAsync,
    TimeSpan period,
    out Action manualInvocation)
{
    // Arguments validation omitted
    var manualSubject = new Subject<bool>();
    manualInvocation = () => manualSubject.OnNext(true);
    var semaphore = new SemaphoreSlim(1);
    return Observable
        .Interval(period)
        .Select(_ => false) // Not manual
        .Merge(manualSubject)
        .TakeUntil(isManual => isManual) // Stop on first manual
        .Repeat() // ... and restart the timer
        .Prepend(false) // Skip the initial interval delay
        .Scan(seed: (
            // Both representations of an operation are needed
            // The Observable provides automatic cancellation on unsubscription
            // The Task maintains the IsCompleted state
            Operation: (IObservable<T>)null,
            AsTask: Task.FromResult(default(T))
        ), accumulator: (previous, isManual) =>
        {
            // Start a new operation only if the previous operation is completed,
            // or if the call is manual. Otherwise return the previous operation.
            if (!previous.AsTask.IsCompleted && !isManual) return previous;
            // Start a new operation as hot observable
            var operation = Observable.StartAsync(async ct =>
            {
                await semaphore.WaitAsync(ct); // Ensure no overlapping
                try { return await functionAsync(isManual, ct); }
                finally { semaphore.Release(); }
            }, Scheduler.Immediate); // Propagate the task status synchronously
            return (operation, operation.ToTask());
        })
        .Select(entry => entry.Operation) // Discard the AsTask representation
        .DistinctUntilChanged() // Ignore duplicate operations
        .Switch(); // Cancel pending operations and ignore them
}

La technique d'utilisation des opérateurs Scan et DistinctUntilChanged pour supprimer des éléments pendant l'exécution de l'opération asynchrone précédente est empruntée à cette question.


3 commentaires

L'implémentation pourrait devenir encore plus simple si Observable.StartAsync un type qui est à la fois une Task et un IObservable . Mais hélas non. Créer un tel type est possible, mais délicat .


Réponse vraiment impressionnante! J'ai testé et cela fonctionne et résout mon problème. Je marquerai très probablement comme bonne réponse, mais je veux d'abord comprendre entièrement toutes les réponses suggérées. @TheodorZoulias est le CancellationToken nécessaire pour éviter le chevauchement dans cette version?


@figursagsmats merci mon pote! Non, l'observation du CancellationToken est facultative et n'affectera pas l'application de la restriction d'exécution simultanée unique. Mais il est préférable que la functionAsync Async réagisse rapidement à un signal d'annulation. Sinon, une opération périodique abandonnée peut continuer à s'exécuter en arrière-plan, empêchant une opération demandée manuellement de démarrer en temps opportun.



0
votes

Voici la requête dont vous avez besoin:

var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(5.0);

var source = Observable.FromAsync(() => GetSomeData());

IObservable<string> query =
    subject
        .StartWith(Unit.Default)
        .Select(x => source.Expand(n => Observable.Timer(delay).SelectMany(y => source)))
        .Switch();

Si chaque fois que vous appelez subject.OnNext(Unit.Default) déclenche immédiatement un appel à GetSomeData et quand puis répétez l'appel en fonction du TimeSpan ensemble en delay .

L'utilisation de .StartWith(Unit.Default) définit la requête dès qu'il y a un abonné.

L'utilisation de .Switch() annule toutes les opérations en attente basées sur un nouveau subject.OnNext(Unit.Default) appelé.

Cela devrait correspondre à votre diagramme en marbre.


La version ci-dessus n'a pas introduit de délai entre les valeurs.

La version 2 devrait.

var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(1.0);

IObservable<string> query =
    subject
        .StartWith(Unit.Default)
        .Select(x => Observable.Timer(TimeSpan.Zero, delay))
        .Switch()
        .SelectMany(x => Observable.FromAsync(() => GetSomeData()));

J'ai utilisé l'opérateur Expand pour introduire un délai entre les valeurs. Tant que la source ne produit qu'une seule valeur (ce FromAsync fait FromAsync ), cela devrait fonctionner FromAsync .


6 commentaires

Il y a des astuces vraiment chouettes et intelligentes dans cette implémentation! Mais il semble que cela n'empêche pas les exécutions qui se chevauchent (causées par des tâches dont la durée est supérieure au delay ), et aussi que cela n'annule pas les opérations en cours lorsqu'un appel manuel est déclenché. Ainsi, les résultats de toutes les invocations apparaissent finalement dans le flux résultant.


@TheodorZoulias - Cela annule certainement avec le .Switch() , mais avec la signature de Task<string> GetSomeData() il n'y a aucun moyen d'annuler cela à moins qu'il ne soit changé en Task<string> GetSomeData(CancellationToken ct) - alors le la dernière ligne de la requête devient .SelectMany(x => Observable.FromAsync(ct => GetSomeData(ct))); .


@TheodorZoulias - J'ai mis à jour ma réponse avec une requête qui devrait ajouter le délai requis. Cela a fonctionné dans mes tests.


J'ai expérimenté le Switch et la version annulable d' Observable.FromAsync . Très intéressant! Il faut cependant que le Switch affecte les observables FromAsync , ce qui n'est pas le cas dans votre exemple de version 1. Au lieu de cela, il affecte les observables Timer . Les FromAsync sont créés après le Switch et leur annulation se produit immédiatement après leur achèvement normal (ce qui est trop tard et inefficace). L'annulation est également «au mieux», ce qui signifie qu'elle est coopérative. Les anciennes tâches ne sont pas attendues avant de démarrer les nouvelles tâches, le chevauchement est donc toujours possible.


La version 2 est également intéressante. Du côté positif, l'annulation est fonctionnelle et le chevauchement est évité (à condition que le GetSomeData honore le jeton et se termine de manière synchrone lorsqu'il reçoit la notification). Les inconvénients sont qu'il utilise un opérateur Experimental et que l'invocation n'est pas périodique. Il y a un délai constant entre la fin d'une action et le début de la suivante, au lieu d'un délai constant entre les points de départ des actions suivantes (ce qui est ce que le diagramme en marbre de l'OP indique).


@TheodorZoulias - Ce n'est pas récursif donc je ne pense pas que ça marche. Comment le voyez-vous fonctionner?



0
votes

Je suggère de ne pas essayer d'annuler un appel déjà commencé. Les choses deviendront trop compliquées. Si la logique de GetSomeValueAsync implique un appel de base de données et / ou un appel d'API Web, vous ne pouvez tout simplement pas vraiment annuler l'appel.

Je pense que la clé ici est de s'assurer que tous les appels à GetSomeValueAsync sont sérialisés.

J'ai créé la solution suivante basée sur la version 1 d'Enigmativity. Elle est testée sur une page blazor de webassembly sur asp.net core 3.1, fonctionne très bien.

private int _ticks = 0; //simulate a resource you want serialized access

//for manual event, trigger will be 0; for Timer event, trigger will be 1,2,3...
protected async Task<string> GetSomeValueAsync(string trigger)
{
    var valueToReturn = $"{DateTime.Now.Ticks.ToString()}: {_ticks.ToString()} | ({trigger})";

    await Task.Delay(1000);
    _ticks += 1;
    return valueToReturn;
}

//define two subjects
private Subject<string> _testSubject = new Subject<string>();
private Subject<string> _getDataSubject = new Subject<string>();

//driving observable, based on Enigmativity's Version 1
var delay = TimeSpan.FromSeconds(3.0);
IObservable<string> getDataObservable =
    _testSubject
   .StartWith("Init")
   .Select(x => Observable.Timer(TimeSpan.Zero, delay).Select(i => i.ToString()))
   .Switch()
   .WithLatestFrom(_getDataSubject.AsObservable().StartWith("IDLE"))
   .Where(a => a.Second == "IDLE")
   .Select(a => a.First);

//_disposables is CompositeDisposable defined in the page
_disposables.Add(getDataObservable.Subscribe(async t =>
{
     _getDataSubject.OnNext("WORKING");
     //_service.LogToConsole is my helper function to log data to console
     await _service.LogToConsole(await GetSomeValueAsync(t)); 
     _getDataSubject.OnNext("IDLE");
}));

C'est ça. J'ai utilisé un bouton pour déclencher des événements manuels. Les _ticks en sortie sont toujours dans l'ordre, c'est-à-dire qu'aucun chevauchement ne s'est produit.


1 commentaires

Ajoutez simplement un bouton sur la page et cliquez dessus - <button @onclick = "_ => _testSubject.OnNext (string.Empty)"> Test d'événement </button>