6
votes

Comment rejoindre le stdout de deux sous-processus et pipe à STDIN de nouvelle sous-processus de Python

permet de dire que j'ai eu la commande suivante qui s'exécute à partir de la coquille xxx pré>

pour ceux d'entre vous qui ne connaissent pas la vue SamTools (puisqu'il s'agit d'empilerflow). Ce que cela fait essentiellement, c'est la création d'un nouveau fichier BAM qui a un nouvel en-tête. Les fichiers BAM sont généralement de gros fichiers compressés, de sorte que même le fichier dans certains cas peut consommer du temps. Une approche alternative serait de subir une commande2, puis d'utiliser SamTools Readader pour changer d'en-tête. Cela passe deux fois dans le grand fichier. La commande ci-dessus passe à travers la BAM un seul moment qui est bon pour les fichiers BAM plus importants (ils doivent être plus grands que 20 Go, même lorsqu'ils sont comprimés - WGS). P>

Ma question est de savoir comment implémenter des commandes de ce type Python à l'aide de sous-processus. P>

J'ai ce qui suit: p>

fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_0 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
sub_1 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
### SOMEHOW APPEND sub_1.stdout to sub_0.stdout
sub_2 = subprocess.Popen(params_2, stdin=appended.stdout, stdout=fh_bam)


4 commentaires

Qu'est-ce que fh_bam ? Pourquoi ne pas simplement obtenir la sortie des deux et l'utiliser dans la commande pour votre dernier processus?


Le gestionnaire de fichiers de la sortie BAM. Eh bien, les deux premières commandes lisent essentiellement une partie d'un fichier et la mettant cela dans le stdout. Ainsi, «Obtenir la sortie» est déjà disponible dans les fichiers. La seule différence est que la deuxième commande est en train de saisir une partie du fichier. Et ce fichier particulier est compressé, de sorte que les fichiers ajoutés ne sont pas aussi simples.


Vous voulez la sortie du deuxième appel ou les deux?


Je souhaite essentiellement fusionner la sortie des deux premières commandes (qui convertissent tous deux le fichier compressé en texte) mais, au lieu d'écrire sur le disque, il passe directement à la commande suivante qui comprime le fichier avant d'écrire.


4 Réponses :


0
votes

Je suppose que la concaténation des sorties des deux premières sous-processus de mémoire n'est pas réalisable en raison de la taille des fichiers impliqués. Je suggérerais d'envelopper les sorties des deux premières sous-documentations dans un fichier comme. On dirait que vous n'avez besoin que de la méthode de lecture, car Popen ne lira que de son fichier STDIN, pas de recherche ni d'écrire. Le code ci-dessous suppose que le renvoi d'une chaîne vide de lecture est suffisant pour indiquer que le flux est sur EOF xxx

à clarifier, f1.fin bloquera et ne renvoie que '' lorsque le tuyau est fermé / eof'd. concat.rad ne tente que de lire à partir de F2 Après cela, la sortie de F1 et f2 ne serait pas être entrelacé. Il y a bien sûr une légère surcharge de lecture de la fin de F1 à plusieurs reprises, qui pourrait être évitée en définissant une variable d'indicateur pour indiquer quel fichier à lire. Je doute que cela échappe à la performance de manière significative.


5 commentaires

Pourriez-vous expliquer ce que fait (Self, * args) fait? Je ne comprends pas pourquoi vous vérifiez si RET == ''.


Il n'y a aucune garantie comment Popen appellera appeler la méthode de lecture de l'objet de fichier de tuyau STDIN que vous le donnez. Lire Peut prendre un argument de taille limitant ce qui est lu, ce que vous voudriez exactement, sinon tout serait lu dans la mémoire. Mais cela signifie que le premier processus 'stdout peut ne pas être complètement lu avec un appel à lire. Nous avons donc lu depuis le stdout de F1 jusqu'à ce qu'il soit vide, puis puis passez à Scdout de F2 ...


Downvote. STDIN'S POPEN fait pas Accepter un objet semblable à un fichier. Il a besoin d'un vrai .fileno () (un fichier réel, un tuyau ou une prise de certains systèmes). Il y a aussi d'autres problèmes.


Ouais, @jsebastian a raison :( J'ai fouillé cela sur ma tablette en supposant que cela fonctionnerait et sans tests. Les documents disent que Popen accepte les goûts de fichiers, mais les tests révèlent les méthodes de lecture / écriture ne sont jamais appelées, mais une méthode Fileno est requis. Vraisemblablement Os.Read () et OS.Write () sont utilisés à la place. J'ai posté une autre réponse à l'aide d'un tuyau.


@sirark: OS.Read () n'est pas utilisé par Module de sous-processus . Le tuyau est redirigé à l'aide de quelque chose comme os.dup2 (, 0) . L'enfant peut lire à partir de l'entrée, mais il aime par exemple par exemple, Stdio's getchar () .



1
votes

(Je ne peux pas commenter tristement, mais cette "réponse" est un commentaire à la réponse de CMIDI, si quelqu'un peut le déplacer, il serait apprécié! - PS: cette réponse était maintenant supprimée ...)

Marco a explicitement déclaré que les commandes produisent beaucoup de production, environ 20 Go. Si vous utilisez Communica (), il attendra le processus de résiliation, ce qui signifie que le descripteur «FD» devra tenir cette grande quantité de données. En pratique, le système d'exploitation affleurera les données sur le disque pendant l'entre-temps, à moins que votre ordinateur ait plus de 20 Go de RAM gratuite. Donc, vous finissez par écrire les données intermédiaires sur le disque, que l'auteur d'origine souhaitait éviter. +1 pour la réponse de Sirlark!


2 commentaires

Ouais, bon point, surtout sur le blocage du tuyau. Vous êtes arrivé en premier


@Ariel: fyi, La réponse de Sirlark ne fonctionnera pas



4
votes

Si vous avez déjà la commande shell dans la chaîne, vous pouvez simplement l'exécuter tel quel: XXX PRE>

Pour émuler le pipeline en Python: P>

#!/usr/bin/env python
from subprocess import Popen, PIPE

# start command3 to get stdin pipe, redirect output to the file
with open('output.bam', 'wb', 0) as output_file:
    command3 = Popen("samtools view -bS -".split(), 
                     stdin=PIPE, stdout=output_file)
# start command1 with its stdout redirected to command3 stdin
command1 = Popen('samtools view -HS header.sam'.split(),
                 stdout=command3.stdin)
rc_command1 = command1.wait() #NOTE: command3.stdin is not closed, no SIGPIPE or a write error if command3 dies
# start command2 after command1 finishes
command2 = Popen('samtools view input.bam 1:1-50000000'.split(),
                 stdout=command3.stdin)
command3.stdin.close() # inform command2 if command3 dies (SIGPIPE or a write error)
rc_command2 = command2.wait()
rc_command3 = command3.wait()


0 commentaires

-1
votes

Bien que Popen accepte des objets de type de fichier, il utilise en fait les poignées / descripteurs de fichiers sous-jacents, pas les méthodes de lecture et d'écriture des objets de fichier pour communiquer, comme @ j.f. Sebastian souligne à juste titre. Un meilleur moyen de le faire est d'utiliser un tuyau ( OS.Pipe () code>) qui n'utilise pas le disque. Cela vous permet de connecter le flux de sortie directement au flux d'entrée d'un autre processus, ce qui correspond exactement à ce que vous voulez. Le problème est alors juste une question de sérialisation, pour vous assurer que les deux flux sources ne se mêlent pas.

import os
import subprocess

r, w = os.pipe()

fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_sink = subprocess.Popen(params_2, stdin=r, stdout=fh_bam, bufsize=4096)
sub_src1 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=w, bufsize=4096)
sub_src1.communicate()
sub_src2 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=w, bufsize=4096)
sub_src2.communicate()


1 commentaires

L'explication est trompeuse. (1) bufsize n'a aucun effet sur la performance dans ce cas. En principe, (en raison de starr = tuyau ) bufsize peut affecter comment starr est lu (bien qu'il ne soit pas dans ce cas parce que .Communicate () sur POSIX n'utilise pas starr.read () - il utilise SELECT à la place). BuFSize n'a aucun effet sur STDIN, STDOUT car ils ne sont pas affectés au tuyau. (2) Si vous déposez starr = tuyau alors .Communicat () appel est inutile .wait () peut être utilisé comme dans ma réponse. (3) Vous devriez fermer les boutons de tuyaux inutilisés dans le parent (après les avoir réussi aux sous-processus)