8
votes

Comment rejoindre plusieurs séquences iobervables?

static void Test()
    {
        var a = Observable.Range(0, 10);
        var b = Observable.Range(5, 10);
        //var zip = a.Zip(b, (x, y) => x + "-" + y);
        //zip.Subscribe(Console.WriteLine);

        var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
        joined.Subscribe(Console.WriteLine);
    }

static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
    {
        return Observable.CreateWithDisposable<string>(o =>
            {
                Queue<int> a = new Queue<int>();
                Queue<int> b = new Queue<int>();
                object gate = new object();

                left.Subscribe(x =>
                    {
                        lock (gate)
                        {
                            if (a.Count == 0 || a.Peek() < x)
                                a.Enqueue(x);

                            while (a.Count != 0 && b.Count != 0)
                            {
                                if (a.Peek() == b.Peek())
                                {
                                    o.OnNext(selector(a.Dequeue(), b.Dequeue()));
                                }
                                else if (a.Peek() < b.Peek())
                                {
                                    a.Dequeue();
                                }
                                else
                                {
                                    b.Dequeue();
                                }
                            }
                        }
                    });

                right.Subscribe(x =>
                {
                    lock (gate)
                    {
                        if (b.Count == 0 || b.Peek() < x)
                            b.Enqueue(x);

                        while (a.Count != 0 && b.Count != 0)
                        {
                            if (a.Peek() == b.Peek())
                            {
                                o.OnNext(selector(a.Dequeue(), b.Dequeue()));
                            }
                            else if (a.Peek() < b.Peek())
                            {
                                a.Dequeue();
                            }
                            else
                            {
                                b.Dequeue();
                            }
                        }
                    }
                });

                return Disposable.Empty;
            });

1 commentaires

A posé la même question sur le RX Forum


4 Réponses :


3
votes

groupby code> peut faire ce dont vous avez besoin. Il semble que vous n'ayez pas de contraintes de temps sur lorsque les articles sont "rejoints", vous avez juste besoin d'articles similaires pour être ensemble de la mode.

Merge<TSource>(params IObservable<TSource>[] sources);
Merge<TSource>(this IEnumerable<IObservable<TSource>> sources);
Merge<TSource>(this IObservable<IObservable<TSource>> source);


4 commentaires

Le seul problème est que j'ai besoin de pouvoir rejoindre des valeurs dans l'ordre. Cependant, il peut être résolu si au lieu d'INT I Passez des tuples de valeur index.


Que voulez-vous dire par "dans l'ordre"?


Gardez à l'esprit que, en utilisant fusionnez + compte , vous n'obtiendrez pas de correspondances tant que les deux séquences source finissent. C'est bon pour l'exemple plage , mais si vos sources sont chaudes / sans fin, la sortie peut ne pas être ce que vous attendez.


L'ordre si quelles entités avec des touches égales sont réunies: rejoindre (A, B, C) === Joindre (joindre (A, B), c)



1
votes

Que diriez-vous d'utiliser le nouvel opérateur de jointure au V.2838.

var a = Observable.Range(1, 10);
var b = Observable.Range(5, 10);

var joinedStream = a.Join(b, _ => Observable.Never<Unit>(), _ => Observable.Never<Unit>(), 
    (aOutput, bOutput) => new Tuple<int, int>(aOutput,  bOutput))
    .Where(tupple => tupple.Item1 == tupple.Item2);

joinedStream.Subscribe(output => Trace.WriteLine(output));


2 commentaires

+1 pour la solution avec une jointure. J'ai passé une heure hier et je ne pouvais pas le faire travailler. Je partage vos préoccupations concernant la performance. En outre, le code résultant est beaucoup plus cryptique et difficile à suivre la comparaison à la simple jointure LINQ. Je commence à penser que Rx n'est tout simplement pas une bonne solution pour ce type de problèmes.


@Serger - Je suis sûr que cela pourrait être rendu plus efficace en émissions de valeurs de durée car les matchs sont fabriqués (c'est-à-dire remplacer l'observable.Never quelque chose d'un peu plus intelligent). Cela dépendrait tous de ce que vous êtes des règles pour quand il est prudent de terminer la durée.



2
votes

Cette réponse est copiée depuis le RX Forums , juste pour qu'il soit archivé ici aussi: xxx

ou sans utiliser des expressions de requête: xxx


1 commentaires

Le seul problème avec cette solution est qu'il faut que YS est chaud (ou MultiCast ) et ne prend pas en charge le scénario où la valeur ys s'allume avant la valeur xs .



2
votes

Honnêtement, je ne peux pas penser à une solution basée sur les opérateurs existants qui fonctionne pour des sources chaudes d'ordre inconnu (c'est-à-dire xs avant YS vs ys avant xs ) . Votre solution semble bien (hé, si cela fonctionne), mais je ferais quelques modifications si c'était mon code:

  • ANNULATION DE SUPPORTAIRE CABROISION correctement en utilisant MutAflisposable et COMPOSITEDISPABLE
  • appel onerror pour des exceptions jetées dans le sélecteur (le rendant plus cohérent avec d'autres opérateurs)
  • envisager de prendre en charge l'achèvement s'il est possible que une source puisse terminer avant l'autre

    Le code ci-dessous a été testé avec votre entrée à double plage, les mêmes entrées renversées, ainsi qu'avec avec vide + jamais : xxx


0 commentaires