1
votes

Le multitraitement de gros CSV ne renvoie pas la quantité attendue de lignes

J'essaie d'aider quelqu'un avec quelque chose. Je ne suis en aucun cas un programmeur expert, mais ce que j'essaie de faire est de calculer une valeur à partir d'un CSV en fonction de l'année et de l'ID d'un autre CSV. Le programme fonctionne comme je l'avais prévu si je mets statiquement une taille d'échantillon plus petite à des fins de temps et de test (amount_of_reviews fonctionne avec un CSV 180 Mo). Mais quand je veux que cela fonctionne TOUTES les données, il me semble qu'il me manque environ 2000 des résultats attendus de 20245 (l'un des threads échoue peut-être?). J'utilise le multitraitement pour réduire le temps d'exécution du programme. Je vais juste continuer et publier tout mon code ici et j'espère que quelqu'un avec de l'expérience pourra repérer mes erreurs.

import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime
from ctypes import c_char_p

print (datetime.datetime.now())

with open('D:/temp/listings.csv', encoding="utf8") as f:
    reader = csv.reader(f)
    f.seek(0)

    idSet = set()
    for row in reader:
        idSet.add(row[0])

idList = list(idSet)
idList = sorted(idList)
listings = []

def amount_of_reviews_2019(id):
    total = 0
    with open('D:/temp/reviews.csv', encoding="utf8") as f:
        reader = csv.reader(f)
        f.seek(0)
        next(reader)

        for row in reader:
            if int(row[2][:4]) >= 2019 and row[0] == id:
                total = total + 1
        return total


def calc(id):
    with open('D:/temp/listings.csv', encoding="utf8") as f:
        reader = csv.reader(f)
        f.seek(1)
        listing = []
        for row in reader:
            if row[0] == id: 
                listing.append(row[0])
                listing.append(row[48])
                listing.append(row[49])
                listing.append(amount_of_reviews_2019(id))
        listings.append(listing)
        print(len(listings))

def format_csv(data, lock):
    with lock:
        with open('D:/temp/multiprocessing.csv', 'a+', newline='', encoding="utf8") as csvfile:
            filewriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
            print(data)
            filewriter.writerows(data)
            #for y in data:
                #filewriter.writerow([y[0], y[1], y[2], y[3]])


def do(counter, lock):
    for id in idList:
        if counter.value < len(idList): #len(idList) = 20245 #When i put lets say 15 here I get all 15 expected results
            with counter.get_lock():
                counter.value += 1 #I am aware I skip the 0 index here
                print(counter.value)
            calc(idList[counter.value])
        else:
            format_csv(listings, lock)
            break

if __name__ == '__main__':
    lock = Lock()

    print(len(idList))
    sharedCounter = Value('i', 0)

    processes = []
    for i in range(os.cpu_count()):
        print('registering process %d' % i)
        processes.append(Process(target=do, args=(sharedCounter, lock)))

    for process in processes:
        process.start()

    for process in processes:
        process.join()

    print (datetime.datetime.now())


17 commentaires

20K lignes est un très petit nombre de lignes. Il peut même ne pas remplir le cache du processeur. Le seul coût de la communication inter-processus devrait annuler tout avantage. Même avec le multithreading, le coût peut être plus élevé que tous les avantages


amount_of_reviews fonctionne avec un CSV de 180 Mo qu'il doit exécuter plusieurs fois. len (idList) = 20245. Actuellement, il faut environ 4h pour s'exécuter. 20+ sans multitraitement


Dans ce cas, mettez à jour la question et ajoutez toutes les informations pertinentes. Ne forcez pas les gens à lire l'intégralité du code pour comprendre ce qui se passe. Dans ce cas cependant, vous pouvez simplement charger les données dans une base de données, l'indexer et commencer à analyser.


En général, le parallélisme des données fonctionne en fractionnant les données en partitions et en demandant à un travailleur de traiter une seule partition à chaque fois. Pas besoin de verrouillage ou de synchronisation dans ce cas. Les CSV peuvent être facilement partitionnés le long des lignes. Lorsque tous les travailleurs ont terminé, un seul processus peut collecter leurs résultats.


BTW, c'est ainsi que fonctionne la carte / réduction: une étape divise les entrées et les mappe aux travailleurs, une autre collecte les résultats et produit la sortie finale.


Merci de m'avoir informé à ce sujet. Pourriez-vous jeter un coup d'œil à: machinelearningplus.com/python/parallel-processing- python Je voudrais être sûr que c'est ce que vous me dites


J'utilise le multitraitement pour réduire le temps d'exécution du programme Voici le problème. En mettant de côté les détails de cette implémentation particulière pendant une seconde, savez-vous avec certitude que le multitraitement est une nécessité et qu'il améliorera les performances? Avez-vous effectué une analyse comparative / profilage? Dans l'ensemble, je pense que @PanagiotisKanavos fait d'excellents points.


Ce serait formidable si vous pouviez partager les données avec nous. Je pourrais faire une analyse comparative et essayer d'écrire une solution demain. Je n'ai évidemment pas encore examiné votre code en profondeur, mais je serais malheureux si je ne pouvais pas le réduire à moins de 5 secondes.


@Alexander Cécile oui ça fait toute la différence à cause de la taille des fichiers et de la méthode amount_of_reviews. Je vais examiner le parallélisme des données et le traitement parallèle et essayer!


@Alexander Cécile insideairbnb.com/get-the-data.html est l'endroit où vous pouvez télécharger les données. Les CSV que j'utilise sont lists.csv et reviews.csv. Toute aide est grandement appréciée.


@ imagine93 D'accord, mais qu'est-ce qui vous amène à dire ça?


@Alexander Cécile J'ai essayé de l'exécuter avec et sans et d'écrire un texte sur le terminal sur la progression. L'utilisation du processeur est également à 100% en multitraitement et + -10 sans. Sans multitraitement, 20 heures n'étaient pas assez de temps pour terminer sur mon système (Ryzen 5 2600) cela se termine en 4/5


@ imagine93 20 heures !? Quelle est la taille des fichiers d'entrée? Pouvez-vous expliquer le fonctionnement du programme?


Calculez le total des avis pour chaque avis à partir de 2019 (à partir de reviews.csv, 180 Mo) pour chaque identifiant d'annonce unique (listings.csv, + - 20 Mo)


Ne serait-il pas beaucoup plus rapide si vous utilisiez numpy par exemple pour changer les csv en tableaux et faire votre comptage avec ceux-ci?


@ imagine93 Utilisez-vous les versions gzippées des fichiers ou les versions non compressées? Seulement les données d'Amsterdam?


De plus, si un identifiant n'a pas d'avis, il doit toujours apparaître dans le résultat, n'est-ce pas? Le nombre peut être NaN ou 0.


3 Réponses :


2
votes

Ce code semble avoir une condition de concurrence:

import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime

print (datetime.datetime.now())

idSet = set(range(20245))
idList = list(idSet)
idList = sorted(idList)
listings = []

totalCounter = Value('i', 0)

def calc(id):
    listing = []
    listings.append(listing)

def format_csv(data, lock):
    with lock:
        totalCounter.value += len(data)

def do(counter, lock):
    for id in idList:
        value = None
        with counter.get_lock():
            if counter.value < len(idList):
                value = counter.value
                counter.value += 1
        if value is not None:
            calc(idList[value])
        else:
            format_csv(listings, lock)
            break

if __name__ == '__main__':
    lock = Lock()

    sharedCounter = Value('i', 0)

    processes = []
    for i in range(os.cpu_count()):
        processes.append(Process(target=do, args=(sharedCounter, lock)))

    for process in processes:
        process.start()

    for process in processes:
        process.join()

    print (datetime.datetime.now())
    print('len(idList): %d, total: %d' % (len(idList), totalCounter.value))

Vous incrémentez le compteur tout en maintenant un verrou dessus, très bien. Cependant, dans idList [counter.value] vous interrogez la valeur du compteur en dehors du verrou. Donc, un autre thread / processus peut avoir changé le compteur entre-temps. Dans ce cas, vous lirez une valeur inattendue du compteur. Une manière sûre d'écrire votre code serait la suivante:

    value = None
    with counter.get_lock():
        counter.value += 1 #I am aware I skip the 0 index here
        value = counter
    print(value)
    calc(idList[value])

MODIFIER Voici une version de votre code qui a toutes les conditions de course supprimées (je crois ) et supprime également les E / S de fichier. Cela fonctionne correctement pour moi. Peut-être que vous pouvez rajouter le fichier E / S morceau par morceau et voir où les choses vont mal

    with counter.get_lock():
        counter.value += 1 #I am aware I skip the 0 index here
        print(counter.value)
    calc(idList[counter.value])


4 commentaires

Cela a beaucoup de sens. Je vais faire ce changement et le laisser fonctionner pendant la nuit pour tester si cela a résolu mon problème initial. Merci pour votre temps


Ce changement avait du sens mais n'a pas aidé à résoudre le problème. J'ai toujours moins de (18589) lignes renvoyées au lieu du 20245 attendu (len (idList))


Deux choses: 1. Pour simplifier le débogage, vous pouvez vérifier si calc est appelé pour chaque id . Vous n'êtes pas obligé d'exécuter le code dans calc . Pour autant que je sache, la fonction ajoutera toujours exactement un élément aux listes . Si la fonction n'est pas appelée pour chaque id ou s'il y a une invocation pour laquelle elle n'ajoute pas d'élément, vous pouvez revenir en arrière à partir de là. 2. Le test counter.value doit également être à l'intérieur du verrou. De plus, ce pour id dans idList dans la fonction do () me semble un peu louche. Vous pouvez essayer while True et casser si counter.value == len .


J'ai modifié ma réponse pour montrer ce que je pense être un code correct qui est aussi proche que possible de votre code d'origine.



1
votes

Je suggérerais d'utiliser des pandas pour lire les fichiers (merci Alexander). puis parcourez les listes et additionnez tous les avis qui ont cet identifiant spécifique et qui sont postérieurs à 2019:

import numpy as np
import pandas
import datetime
import time

listing_csv_filename = r'listings.csv'
reviews_csv_filename = r'reviews.csv'
start = time.time()
df_listing = pandas.read_csv(listing_csv_filename, delimiter=',', quotechar='"')
df_reviews = pandas.read_csv(reviews_csv_filename, delimiter=',', parse_dates=[1])
values = list()
valid_year = df_reviews['date'] > datetime.datetime(2019, 1, 1, 0, 0, 0)
for id_num in df_listing['id']:
    valid = (df_reviews['listing_id'] == id_num) & valid_year
    values.append((id_num, np.sum(valid)))

print(values)
print(time.time() - start)


2 commentaires

Quelques conseils: vous pouvez avoir ces deux open () en une seule instruction with. Au lieu d'appeler readlines () et l'interaction, parcourez simplement l'objet fichier lui-même. Cela devrait faire gagner du temps et de la mémoire. Il n'y a probablement aucun inconvénient, et même un avantage, à faire tous ces .replace () sans vérifier si la sous-chaîne est contenue dans la chaîne. Pourquoi ne pas simplement utiliser Pandas pour cela? Je n’ai pas encore vu le CSV, mais à moins qu’il ne soit extrêmement foutu, read_csv () devrait être bien, non?


Merci pour cela, pandas read_csv semble être beaucoup plus robuste.



1
votes

Diagnostic

Sans l'avoir approfondi, je dirais qu'il y a deux principaux coupables ici, et ils vont tous les deux de pair:

Premièrement, il y a le fichier répété analyse et itération. Vous effectuez une itération sur chaque ID de la "boucle principale", soit 20 025 fois. Pour chaque ID, vous lisez et parcourez ensuite le fichier de listes entier (20 051 lignes) et le fichier de commentaires entier (493 816 lignes). Cela équivaut à lire 10 milliards 290 millions 186 000 675 lignes de CSV.

Deuxièmement, il y a le multitraitement lui-même. Je ne l'ai pas examiné en profondeur, mais je pense qu'il est juste de dire que nous pouvons avoir une bonne idée du problème uniquement à partir du code. Comme nous l'avons vu ci-dessus, pour chaque ID, votre programme ouvre les deux fichiers CSV. Avoir un tas de processus qui doivent tous écrire dans les deux mêmes fichiers, 20 000 fois au total, ne peut pas être bon pour les performances. Je ne serais pas entièrement surpris si le code fonctionnait plus rapidement sans le multiprocesseur qu'avec lui. Il y a aussi la condition de course potentielle mentionnée par Daniel Junglas.


Solutions

1.

D'accord, c'est toujours un gâchis, mais je voulais juste obtenir quelque chose là-bas avant le tournant du siècle. Je continuerai à chercher une meilleure solution. Sur la base du nombre d'annonces qui apparaissent dans les avis mais pas dans listings.csv , entre autres, la solution idéale pourrait être légèrement différente.

import collections as colls
import csv
import datetime

cutoff_date = datetime.date(2019, 1, 1)

with open('../resources/reviews.csv') as reviews_file:
    reader = csv.DictReader(reviews_file)
    review_listing_counts = colls.Counter(
        (row['listing_id'] for row in reader if datetime.datetime.fromisoformat(row['date']).date() >= cutoff_date))

with open('../resources/listings.csv') as listings_file, open('../out/listing_review_counts_3.csv', 'w',
                                                              newline='') as out_file:
    reader = csv.DictReader(listings_file)
    listings_ids = (row['id'] for row in reader)

    writer = csv.writer(out_file)
    writer.writerow(('id', 'review_count'))
    writer.writerows(((curr_id, review_listing_counts[curr_id]) for curr_id in listings_ids))

Le temps d'exécution est d'environ 1 s, ce qui signifie que j'ai battu mon objectif de 5 secondes ou moins. Yay :)

2.

Cette méthode utilise un dictionnaire pour compter les avis, et le module csv standard. Gardez à l'esprit que cela générera une erreur si un avis concerne une liste qui ne figure pas dans listings.csv.

import csv
import datetime

with open('../resources/listings.csv') as listings_file:
    reader = csv.DictReader(listings_file)
    listing_review_counts = dict.fromkeys((row['id'] for row in reader), 0)

cutoff_date = datetime.date(2019, 1, 1)

with open('../resources/reviews.csv') as reviews_file:
    reader = csv.DictReader(reviews_file)
    for row in reader:
        rev_date = datetime.datetime.fromisoformat(row['date']).date()
        if rev_date >= cutoff_date:
            listing_review_counts[row['listing_id']] += 1

with open('../out/listing_review_counts_2.csv', 'w', newline='') as out_file:
    writer = csv.writer(out_file)
    writer.writerow(('id', 'review_count'))
    writer.writerows(listing_review_counts.items())

3. h3>

Cette méthode utilise collections.Counter code> et le module csv standard.

import numpy as np
import pandas as pd

listings_df = pd.read_csv('../resources/listings.csv', header=0, usecols=['id'], dtype={'id': str})

reviews_df = pd.read_csv('../resources/reviews.csv', header=0, parse_dates=['date'], dtype={'listing_id': str})

valid_reviews = reviews_df[reviews_df['date'] >= pd.Timestamp(year=2019, month=1, day=1)]

review_id_counts = valid_reviews['listing_id'].value_counts()

counts_res: pd.DataFrame = pd.merge(listings_df, review_id_counts, left_on='id', right_index=True, how='left').rename(columns={'listing_id': 'review_count'})
counts_res['review_count'] = counts_res['review_count'].fillna(0).astype(np.int64)

counts_res.to_csv(path_or_buf='../out/listing_review_counts.csv', index=False)

Faites-moi savoir si vous avez des questions, si je dois inclure des explications, etc. :)


2 commentaires

Soory pour la réponse tardive. Merci beaucoup. Cela fait toute la différence. Quelle méthode préférez-vous normalement utiliser lorsque vous faites quelque chose comme ça? Je savais que mon programme était inefficace, mais vous m'avez vraiment montré à quel point il était mauvais et pourquoi. J'essaierai d'utiliser davantage numpy et pandas à l'avenir. Merci encore!


@ imagine93 Cela dépend vraiment du reste du programme. Pandas est idéal pour un certain nombre d'opérations complexes, et n'a pas d'inconvénients majeurs par rapport aux autres, donc j'irais probablement pour cela.