3
votes

Réduisez le temps d'exécution, la lecture de fichiers, la manipulation de chaînes de chaque ligne et l'écriture de fichiers

J'écris sur un script qui lit toutes les lignes de plusieurs fichiers, lit un nombre au début de chaque bloc et place ce numéro devant chaque ligne du bloc jusqu'au numéro suivant et ainsi de suite. Ensuite, il écrit toutes les lignes de lecture dans un seul fichier .csv.

Les fichiers que je lis ressemblent à ceci:

timecmd "type combined_data_1.txt > NUL & type combined_data_2.txt > NUL & type combined_data_3.txt >NUL & type combined_data_4.txt  > NUL"
command took 0:1:25.51 (85.51s total)

Et le fichier de sortie devrait ressembler à ceci:

timecmd "type combined_data_1.txt combined_data_2.txt combined_data_3.txt combined_data_4.txt  > test.csv"

combined_data_1.txt
combined_data_2.txt
combined_data_3.txt
combined_data_4.txt

command took 0:2:42.93 (162.93s total)

Actuellement, mon script est le suivant:

timecmd "type combined_data_1.txt combined_data_2.txt combined_data_3.txt combined_data_4.txt  > NUL"

combined_data_1.txt


combined_data_2.txt


combined_data_3.txt


combined_data_4.txt

command took 0:1:25.87 (85.87s total)

Il a un temps d'exécution d'environ 135 secondes (les 4 fichiers qui sont lus font chacun 500 Mo et le fichier de sortie a 2,3 Go). L'exécution du script prend environ 10 Go de RAM. Je pense que cela pourrait être un problème. La plus grande quantité de temps est nécessaire pour créer la liste de toutes les lignes, je pense. Je voudrais réduire le temps d'exécution de ce programme, mais je suis nouveau sur python et je ne sais pas comment faire cela. Pouvez-vous me donner quelques conseils?

Merci

Edit:

J'ai mesuré les temps pour les commandes suivantes en cmd (je n'ai installé que Windows sur mon ordinateur , j'ai donc utilisé, espérons-le, des commandes cmd équivalentes):

écriture séquentielle vers NUL

from asyncio import Semaphore, ensure_future, gather, run
import time

limit = 8

async def read(file_list):
    tasks = list()
    result = None

    sem = Semaphore(limit)

    for file in file_list:
        task = ensure_future(read_bounded(file,sem))
        tasks.append(task)

        result = await gather(*tasks)

    return result

async def read_bounded(file,sem):
    async with sem:
        return await read_one(file)

async def read_one(filename):
    result = list()
    with open(filename) as file:
        dataList=[]
        content = file.read().split(":")
        file.close()
        j=1
        filmid=content[0]
        append=result.append
        while j<len(content):
            for entry in content[j].split("\n"):
                if len(entry)>10:
                    append("%s%s%s%s" % (filmid,",",entry,"\n"))
                else:
                    if len(entry)>0:
                        filmid=entry
            j+=1
    return result

if __name__ == '__main__':
    start=time.time()
    write_append="w"
    files = ['combined_data_1.txt', 'combined_data_2.txt', 'combined_data_3.txt', 'combined_data_4.txt']

    res = run(read(files))

    with open("output.csv",write_append) as outputFile:
        for result in res:
            outputFile.write(''.join(result))
            outputFile.flush()
    outputFile.close()
    end=time.time()
    print(end-start)

écriture séquentielle vers fichier

13368,2385003,4,2004-07-08
13368,659432,3,2005-03-16
13369,751812,2,2002-12-16
13369,2625420,2,2004-05-25

parallèle

13368:
2385003,4,2004-07-08
659432,3,2005-03-16
13369:
751812,2,2002-12-16
2625420,2,2004-05-25


0 commentaires

3 Réponses :


1
votes

Vous voulez lire 2 Gio et écrire 2 Gio avec un temps écoulé faible et une faible consommation de mémoire. Le parallélisme, pour le noyau et pour la broche, compte. Idéalement, vous auriez tendance à les occuper tous. Je suppose que vous avez au moins quatre cœurs disponibles. La segmentation de vos E / S est importante, pour éviter une mallocation excessive.

Commencez par la chose la plus simple possible. Veuillez effectuer quelques mesures et mettre à jour votre question pour les inclure.

séquentielle

Veuillez effectuer des mesures de synchronisation séquentielles de

        content = file.read().split(":")

et

cat combined_data_1.csv > /dev/null &
cat combined_data_2.csv > /dev/null &
cat combined_data_3.csv > /dev/null &
cat combined_data_4.csv > /dev/null &
wait

Je suppose que vous aurez une faible utilisation du processeur et que vous mesurerez donc les taux d'E / S en lecture et en écriture.

parallèle

Veuillez effectuer des mesures d'E / S parallèles:

$ cat combined_data_[1234].csv > big.csv

Cela vous permettra de savoir si le chevauchement des lectures offre une possibilité d'accélération. Par exemple, placer les fichiers sur quatre systèmes de fichiers physiques différents pourrait permettre cela - vous garderiez quatre broches occupées.

asynchrone

Sur la base de ces horaires, vous pouvez choisir d'abandonner async I / O, et à la place, utilisez quatre interpréteurs python distincts.

logique

$ cat combined_data_[1234].csv > /dev/null

C'est de là que vient une grande partie de votre grande empreinte mémoire. Plutôt que de glisser dans tout le fichier à la fois, envisagez de lire par lignes ou par blocs. Un générateur pourrait vous offrir une API pratique pour cela.

EDIT:

compression

Il semble que vous soyez lié aux E / S - vous avez des cycles d'inactivité en attendant sur le disque. Si le consommateur final de votre fichier de sortie est prêt à effectuer une décompression, alors envisagez d'utiliser gzip , xz / lzma , ou snappy . L'idée est que la majeure partie du temps écoulé est consacrée aux E / S, vous voulez donc manipuler des fichiers plus petits pour faire moins d'E / S. Cela profite à votre script lors de l'écriture de 2 Gio de sortie, et peut également bénéficier du code qui consomme cette sortie.

En tant qu'élément distinct, vous pouvez éventuellement organiser le code qui produit les quatre fichiers d'entrée pour en produire des versions compressées.

p >


2 commentaires

Merci pour votre réponse! J'ai inclus les mesures du temps dans la question. Je pense que cela signifie que l'exécution du script avec des E / S asynchrones n'offre aucun avantage pour l'exécution globale, n'est-ce pas?


Oui, il semble que vous soyez lié aux E / S, donc il y a des économies limitées à réaliser en peaufinant le code. Je recommande de faire moins d'E / S, peut-être via la compression. Les mises à jour incrémentielles, où vous ignorez les anciens enregistrements inchangés, constituent une autre piste à explorer. Cela peut cependant nécessiter une nouvelle logique pour le code consommateur.



0
votes

J'ai essayé de résoudre votre problème. Je pense que c'est un moyen très facile et simple si vous n'avez aucune connaissance préalable d'une bibliothèque spéciale.

Je viens de prendre 2 fichiers d'entrée nommés input.txt & input2.txt avec le contenu suivant.

Remarque: Tous les fichiers sont dans le même répertoire.

input.txt

13368,2385003,4,2004-07-08
13368,659432,3,2005-03-16
13369,751812,2,2002-12-16
13369,2625420,2,2004-05-25
13364,2385001,5,2004-06-08
13364,659435,1,2005-03-16
13370,751811,2,2023-12-16
13370,2625220,2,2015-05-26

input2.txt

# https://stackoverflow.com/questions/55226823/reduce-runtime-file-reading-string-manipulation-of-every-line-and-file-writing
import re

def read_file_and_get_output_lines(file_names):
    output_lines = []

    for file_name in file_names:
        with open(file_name) as f:
            lines = f.readlines()
            for new_line in lines:
                new_line = new_line.strip()

                if not re.match(r'^\d+:$', new_line):
                    output_line = [old_line]
                    output_line.extend(new_line.split(","))
                    output_lines.append(output_line)
                else:
                    old_line = new_line.rstrip(":")

    return output_lines

def write_lines_to_csv(output_lines, file_name):
    with open(file_name, "w+") as f:
        for arr in output_lines:
            line = ",".join(arr)
            f.write(line + '\n')

if __name__ == "__main__":
    file_names = [
        "input.txt",
        "input2.txt"
    ]

    output_lines = read_file_and_get_output_lines(file_names)
    print(output_lines)
    # [['13368', '2385003', '4', '2004-07-08'], ['13368', '659432', '3', '2005-03-16'], ['13369', '751812', '2', '2002-12-16'], ['13369', '2625420', '2', '2004-05-25'], ['13364', '2385001', '5', '2004-06-08'], ['13364', '659435', '1', '2005-03-16'], ['13370', '751811', '2', '2023-12-16'], ['13370', '2625220', '2', '2015-05-26']]

    write_lines_to_csv(output_lines, "output.csv")

J'ai écrit le code de manière modulaire afin que vous puissiez facilement l'importer et l'utiliser dans votre projet. Une fois que vous exécutez le code ci-dessous à partir du terminal en utilisant python3 csv_writer.py , il lira tous les fichiers fournis dans la liste file_names et générera output.csv sera le résultat que vous recherchez.

csv_writer.py

13364:
2385001,5,2004-06-08
659435,1,2005-03-16
13370:
751811,2,2023-12-16
2625220,2,2015-05-26

output.csv

13368:
2385003,4,2004-07-08
659432,3,2005-03-16
13369:
751812,2,2002-12-16
2625420,2,2004-05-25


3 commentaires

Merci pour votre réponse! Malheureusement, cela n'a pas réduit la durée d'exécution du script. Cela a pris plus de temps à exécuter.


D'accord, quelle est la durée actuelle?


Le temps d'exécution actuel de mon script était d'environ 130 secondes, votre script a pris environ 280 secondes.



2
votes

Dans ce cas, vous ne gagnez rien en utilisant asyncio pour deux raisons:

  • asyncio est monothread et ne parallélise pas le traitement (et, en Python, ne peut pas non plus les threads )
  • les appels IO accèdent au système de fichiers, ce qu'asyncio ne couvre pas - il s'agit principalement d'E / S réseau

Le cadeau que vous n'utilisez pas correctement asyncio est le fait que votre coroutine read_one ne contient pas un seul wait . Cela signifie qu'il ne suspend jamais l'exécution et qu'il se terminera avant de céder la place à une autre coroutine. En faire une fonction ordinaire (et supprimer complètement asyncio) aurait exactement le même résultat.

Voici une version réécrite du script avec les modifications suivantes:

  • octet IO partout, pour plus d'efficacité
  • parcourt le fichier au lieu de tout charger en même temps
  • code séquentiel
import sys

def process(in_filename, outfile):
    with open(in_filename, 'rb') as r:
        for line in r:
            if line.endswith(b':\n'):
                prefix = line[:-2]
                continue
            outfile.write(b'%s,%s' % (prefix, line))

def main():
    in_files = sys.argv[1:-1]
    out_file = sys.argv[-1]
    with open(out_file, 'wb') as out:
        for fn in in_files:
            process(fn, out)

if __name__ == '__main__':
    main()

Sur ma machine et Python 3.7, cette version fonctionne à environ 22 s / Gio, testée sur quatre fichiers générés aléatoirement, de 550 Mio chacun. Il a une empreinte mémoire négligeable car il ne charge jamais le fichier entier en mémoire.

Le script s'exécute sur Python 2.7 sans changement, où il horloge à 27 s / Gio. Pypy (6.0.0) l'exécute beaucoup plus rapidement, ne prenant que 11 s / Gio.

L'utilisation de concurrent.futures en théorie devrait permettre le traitement dans un thread tandis qu'un autre attend IO, mais le résultat finit par être nettement plus lent que l’approche séquentielle la plus simple.


2 commentaires

Je vous remercie! C'est presque deux fois plus rapide. La réduction du temps d'exécution est-elle principalement due à la lecture en octets ou à l'itération? J'ai pensé qu'il serait plus rapide de charger le fichier entier à la fois pour réduire le nombre d'accès sur le fichier


@ Alex4224 Je ne sais pas lequel des deux aide le plus; J'ai essayé de rendre le code rapide en le faisant le moins possible. Le chargement du fichier en une seule fois n'accélère pas les choses car le nombre d'accès est de toute façon amorti par io buffering.