static void Test() { var a = Observable.Range(0, 10); var b = Observable.Range(5, 10); //var zip = a.Zip(b, (x, y) => x + "-" + y); //zip.Subscribe(Console.WriteLine); var joined = MergeJoin(a,b, (x,y) => x + "-" + y); joined.Subscribe(Console.WriteLine); } static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector) { return Observable.CreateWithDisposable<string>(o => { Queue<int> a = new Queue<int>(); Queue<int> b = new Queue<int>(); object gate = new object(); left.Subscribe(x => { lock (gate) { if (a.Count == 0 || a.Peek() < x) a.Enqueue(x); while (a.Count != 0 && b.Count != 0) { if (a.Peek() == b.Peek()) { o.OnNext(selector(a.Dequeue(), b.Dequeue())); } else if (a.Peek() < b.Peek()) { a.Dequeue(); } else { b.Dequeue(); } } } }); right.Subscribe(x => { lock (gate) { if (b.Count == 0 || b.Peek() < x) b.Enqueue(x); while (a.Count != 0 && b.Count != 0) { if (a.Peek() == b.Peek()) { o.OnNext(selector(a.Dequeue(), b.Dequeue())); } else if (a.Peek() < b.Peek()) { a.Dequeue(); } else { b.Dequeue(); } } } }); return Disposable.Empty; });
4 Réponses :
groupby code> peut faire ce dont vous avez besoin. Il semble que vous n'ayez pas de contraintes de temps sur lorsque les articles sont "rejoints", vous avez juste besoin d'articles similaires pour être ensemble de la mode.
Merge<TSource>(params IObservable<TSource>[] sources);
Merge<TSource>(this IEnumerable<IObservable<TSource>> sources);
Merge<TSource>(this IObservable<IObservable<TSource>> source);
Le seul problème est que j'ai besoin de pouvoir rejoindre des valeurs dans l'ordre. Cependant, il peut être résolu si au lieu d'INT I Passez des tuples de valeur index.
Que voulez-vous dire par "dans l'ordre"?
Gardez à l'esprit que, en utilisant fusionnez code> +
compte code>, vous n'obtiendrez pas de correspondances tant que les deux séquences source finissent. C'est bon pour l'exemple
plage code>, mais si vos sources sont chaudes / sans fin, la sortie peut ne pas être ce que vous attendez.
L'ordre si quelles entités avec des touches égales sont réunies: rejoindre (A, B, C) === Joindre (joindre (A, B), c)
Que diriez-vous d'utiliser le nouvel opérateur de jointure au V.2838.
var a = Observable.Range(1, 10); var b = Observable.Range(5, 10); var joinedStream = a.Join(b, _ => Observable.Never<Unit>(), _ => Observable.Never<Unit>(), (aOutput, bOutput) => new Tuple<int, int>(aOutput, bOutput)) .Where(tupple => tupple.Item1 == tupple.Item2); joinedStream.Subscribe(output => Trace.WriteLine(output));
+1 pour la solution avec une jointure. J'ai passé une heure hier et je ne pouvais pas le faire travailler. Je partage vos préoccupations concernant la performance. En outre, le code résultant est beaucoup plus cryptique et difficile à suivre la comparaison à la simple jointure LINQ. Je commence à penser que Rx n'est tout simplement pas une bonne solution pour ce type de problèmes.
@Serger - Je suis sûr que cela pourrait être rendu plus efficace en émissions de valeurs de durée car les matchs sont fabriqués (c'est-à-dire remplacer l'observable.Never quelque chose d'un peu plus intelligent). Cela dépendrait tous de ce que vous êtes des règles pour quand il est prudent de terminer la durée.
Cette réponse est copiée depuis le RX Forums , juste pour qu'il soit archivé ici aussi: ou sans utiliser des expressions de requête: p>
Le seul problème avec cette solution est qu'il faut que YS code> est chaud (ou
MultiCast code>) et ne prend pas en charge le scénario où la valeur
ys code> s'allume avant la valeur
xs code>.
Honnêtement, je ne peux pas penser à une solution basée sur les opérateurs existants qui fonctionne pour des sources chaudes d'ordre inconnu (c'est-à-dire Le code ci-dessous a été testé avec votre entrée à double plage, les mêmes entrées renversées, ainsi qu'avec avec xs avant YS code> vs
ys avant xs code>) . Votre solution semble bien (hé, si cela fonctionne), mais je ferais quelques modifications si c'était mon code:
MutAflisposable CODE> et
COMPOSITEDISPABLE CODE> LI>
onerror code> pour des exceptions jetées dans le sélecteur (le rendant plus cohérent avec d'autres opérateurs) li>
vide
A posé la même question sur le RX Forum