Est-ce que quelqu'un sait comment j'écrirais un traitement de séquence à la manière d'une API de flux java en Python? L'idée est d'écrire les opérations dans l'ordre dans lequel elles se dérouleront:
[action2(action1(item)) for item in my_list if condition(item)]
Maintenant en python, je pourrais faire
myList.stream() .filter(condition) .map(action1) .map(action2) .collect(Collectors.toList());
Mais c'est le ordre inverse.
Comment pourrais-je avoir quelque chose dans le bon ordre? Évidemment, je pourrais utiliser des variables, mais je devrais alors trouver un nom pour chaque résultat partiel.
3 Réponses :
Vous pouvez écrire ceci vous-même:
>>> JavaLike(range(10)).stream().filter(lambda x: x % 2 == 0).map(str).collect(tuple) ('0', '2', '4', '6', '8')
Une syntaxe similaire est alors possible:
from collections import UserList class JavaLike(UserList): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.iter = None def stream(self): self.iter = None return self def filter(self, function): self.iter = filter(function, self if self.iter is None else self.iter) return self def map(self, function): self.iter = map(function, self if self.iter is None else self.iter) return self def collect(self, collection_class=None): if collection_class is None: if self.iter is not None: ret = JavaLike(self.iter) self.iter = None return ret return JavaLike(self) return collection_class(self if self.iter is None else self.iter)
Je dois dire que j'espérais que quelque chose de construit existait mais c'est ce dont j'avais besoin.
Il existe au moins deux modules sur PyPI: lazy-streams et pystreams
Je suis peut-être pointilleux mais j'évite d'utiliser tout ce qui n'est pas mature, sauf si je suis désespéré. En attendant j'ai trouvé pypi.org/project/Rx qui semble répondre à mes besoins. Je ne l'ai pas encore testé.
Il existe une bibliothèque qui fait déjà exactement ce que vous recherchez, c'est-à-dire que l'évaluation paresseuse et l'ordre des opérations est le même avec la façon dont il est écrit, ainsi que beaucoup d'autres bonnes choses comme le multiprocessus ou le multithreading Map / Reduce.
Il s'appelle pyxtension
et il est prêt et maintenu sur PyPi .
Votre code serait réécrit sous cette forme:
stream(myList) .filter(condition) .mpmap(action1) # this is for multi-processed map .fastmap(action2) # this is multi-threaded map .toList()
et
from pyxtension.strams import stream stream(myList) .filter(condition) .map(action1) .map(action2) .toList()
Notez que la dernière instruction toList () code > fait exactement ce que vous attendez - il collecte les données comme cela se produirait dans un RDD Spark.