3
votes

Dans rxjs, comment chaîner le mappage à travers des tableaux de données reçues de différentes API?

J'appelle une API et je reçois un tableau de résultats, je vérifie la pagination et s'il existe plus de pages, j'appelle la page suivante, répète jusqu'à ce qu'il n'y ait plus de pages.

Pour chaque tableau de résultats, j'appelle un autre point de terminaison et je fais exactement la même chose: je reçois un tableau de résultats, vérifie une autre page et appelle à nouveau le point de terminaison. Lavez, rincez et répétez.

Par exemple:

Je veux saisir une liste de pays qui pourraient être une réponse paginée, puis pour chaque pays, je veux saisir une liste de villes, qui pourrait également être paginé. Et pour chaque ville, j'exécute un ensemble de transformations puis je stocke dans une base de données.

J'ai déjà essayé ceci, mais je suis resté bloqué:

// grabCountries stays the same as above, but the rest is as follows:

const grabCities = (country) =>
  Observable.create(async (observer) => {
    const url = `http://api.com/${country}/cities`
      let cursor = url
      do {
       const results = fetch(cursor)

       // results = {
       //     data: [
       //         'Montreal', 'Toronto',
       //         'Paris', 'Marseilles',
       //         'Barcelona', 'Madrid'
       //     ],
       //     next: '89ghjg98nd8g8sdfg98gs9h868hfoig'
       // }

       results.data.forEach(city => {
         observer.next(city)
       })

    cursor = results.next ? `${url}/${results.next}` : undefined

    } while (cursor)
})

const multiCaster = new Subject()

grabCountries.subscribe(multiCaster)
multiCaster.pipe(map((country) => {
    grabCities(country).pipe(map(saveCityToDB)).subscribe()
})).subscribe()
multiCaster.pipe(map(saveCountryToDB)).subscribe()

J'ai essayé quelques approches:

Faire un sujet (parfois je 'll faudra faire une base traitée en parallèle sur les résultats de' grabCountries '. Par exemple, je peux vouloir stocker les pays dans une base de données en parallèle avec la saisie des villes.)

const intermediateSubject = new Subject()

intermediateSubject.subscribe(storeCountriesInDatabase)
intermediateSubject.subscribe(getCities)

J'ai aussi essayé le piping et le mapping, mais il semble que ce soit fondamentalement la même chose.

En écrivant ceci, j'ai pensé à cette solution et elle semble fonctionner correctement, j'aimerais juste savoir si Je rend cela trop compliqué. Il peut y avoir des cas où je dois faire plus que quelques appels d'API d'affilée. (Imagine, Countries => States => Cities => Boulangeries => Avis => Commentaires => Réponses) Donc, cette étrange cartographie sur un autre modèle de rappel d'observateur pourrait devenir désagréable.

C'est donc ce que j'ai maintenant en gros:

const grabCountries = Observable.create(async (observer) => {
    const url = 'http://api.com/countries'
    let cursor = url
    do {

        const results = fetch(cursor)

        // results = { 
        //   data: [ 'Canada', 'France', 'Spain' ],
        //   next: '47asd8f76358df8f4058898fd8fab'
        // }

        results.data.forEach(country => { observer.next(country) })

        cursor = results.next ? `${url}/${results.next}` : undefined

    } while(cursor)

})


const getCities = {
    next: (country) => {
        const url = 'http://api.com/cities'
        let cursor = url
        do {

            const results = fetch(cursor)

            // results = {
            //     data: [ 
            //         'Montreal', 'Toronto', 
            //         'Paris', 'Marseilles', 
            //         'Barcelona', 'Madrid' 
            //     ],
            //     next: '89ghjg98nd8g8sdfg98gs9h868hfoig'
            // }

            results.data.forEach(city => { 
                `**** What do I do here?? ****` 
            })

            cursor = results.next ? `${url}/${results.next}` : undefined

        } while(cursor)
    }
}

tl; dr - J'appelle une API qui reçoit un ensemble de résultats paginés dans un tableau et je dois mapper chaque élément et appeler une autre API qui en reçoit une autre ensemble paginé de résultats, chaque ensemble également dans un tableau.

Est-ce que l'imbrication d'une observable dans une autre et la cartographie des résultats via 'callApiForCountries.pipe (map (forEachCountryCallApiForCities))' la meilleure méthode ou en avez-vous d'autres recommandations?


3 commentaires

est-ce l'intégralité de votre cas d'utilisation? J'aime beaucoup RXJS, mais peut-être qu'un Promise.all () plus simple convient mieux ici.


@RandyCasburn Ce n'est pas l'intégralité du cas d'utilisation. En fin de compte, cela effectuera toutes sortes de transformations et / ou d'effets secondaires (journalisation, stockage, mise à jour de bases de données externes, files d'attente, etc.) le tout en parallèle et en séquence et construit de manière dynamique. C'est le début d'un grand projet. J'aborde juste ce morceau à la fois.


Fascinant - cela semble très amusant!


3 Réponses :


0
votes

Vous avez besoin de l'opérateur expand . Il se comporte de manière récursive et correspond donc à l'idée d'avoir des résultats paginés.


1 commentaires

Merci pour la suggestion, cela semble pouvoir fonctionner. Je vais essayer.



1
votes

Voici le code qui devrait fonctionner avec l'exploration séquentielle de l'URL suivante. Vous commencez par une {next: url} jusqu'à ce que res.next ne soit pas disponible.

of({next:http://api.com/cities}).pipe(
    expand(res=>results.next ? `${url}/${results.next}` : undefined
    takeWhile(res=>res.next!==undefined)
).subscribe()


0 commentaires

1
votes

D'accord, j'ai donc dépensé beaucoup de cerveau là-dessus et j'ai trouvé deux solutions qui semblent fonctionner.

const nestedFlow = () => {
	fetchAccountIDs.pipe(map(accountIds => {
		getAccountPostIDs(accountIds) // Has the do loop for paging inside
			.pipe(
				map(fetchPostDetails),
				map(mapToDBFormat),
				map(storeInDB)
			).subscribe()
	})).subscribe()
}


const expandedflow = () => {
	fetchAccountIDs.subscribe((accountId) => {
		// accountId { accountId: '345367geg55sy'}
		getAccountPostIDs(accountId).pipe(
			expand((results) => {
				/*
				results : {
					postIDs: [
						131424234,
						247345345,
					],
					cursor: '374fg8v0ggfgt94',
				}
				*/
				const { postIDs, cursor } = results
				if (cursor) return getAccountPostIDs({...accountId, cursor})
				return { postIDs, cursor }
			}),
			takeWhile(hasCursor, true), // recurs until cursor is undefined
			concatMap(data => data.postIDs), 
			map(data => ({ post_id: data })), 
			map(fetchPostDetails), 
			map(mapToDBFormat), 
			map(storeInDB) 
		).subscribe()
	})
}

Les deux semblent fonctionner avec des performances similaires. J'en ai lu certains où laisser le flux de données est une mauvaise pratique et vous devriez tout diriger, mais je ne sais pas comment éliminer la première sortie dans le 'ExpandFlow' car le 'expand' doit rappeler un observable, mais peut-être cela peut être fait.

Il ne me reste plus qu'à résoudre les problèmes de condition de concurrence à partir du moment où le 'complete' est appelé dans getAccountPostIDs le dernier enregistrement est stocké dans la base de données. Actuellement dans mon test, observer.complete se termine avant 3 des actions upsert.

Tous les commentaires sont appréciés et j'espère que cela aidera quelqu'un à l'avenir. P >


0 commentaires