6
votes

Multiprocess ou multithread? - parallèle un calcul simple pour des millions d'itérations et stocker le résultat dans une structure de données unique

J'ai un dictionnaire D of {String: liste} entrées, et je calcule une fonction F (D [S1], D [S2]) -> Float Pour une paire de cordes (S1, S2) dans d.

En outre, J'ai créé une classe matricielle personnalisée labelednumericMatrix qui me permet d'effectuer des affectations telles que m [ID1, ID2] = 1,0.

Je dois calculer F (x, y) et stocker le résultat dans m [x, y] pour tous les 2 tuples dans l'ensemble des chaînes S, y compris lorsque S1 = S2. Ceci est facile à coder en tant que boucle, mais l'exécution de ce code prend un certain temps, car la taille de l'ensemble se développe sur de grandes valeurs telles que 10 000 ou plus.

Aucun des résultats que je stocke dans ma matrice étiquetée m dépend de l'autre. Par conséquent, il semble simple de parallementer ce calcul en utilisant des services multithread ou multiprocess de Python. Cependant, étant donné que CPPHON ne permet pas vraiment à mon d'exécuter simultanément le calcul de F (x, y) et de stockage de m [x, y] via le threading, il semble que la multiropess est mon seul choix. Cependant, je ne pense pas que la multirofression est conçue pour passer autour de 1 Go de structures de données entre les processus, tels que ma structure matricielle marquée contenant 10000x10000 éléments.

Quelqu'un peut-il offrir des conseils (a) si je devrais éviter d'essayer de paralléliser mon algorithme, et (b) si je peux faire la parallélisation, comment faire de manière à ce que, de préférence dans CPPHON?


1 commentaires

Avez-vous essayé d'exécuter votre code sous Pypy? J'ai trouvé que c'était environ 20 fois plus rapide que CPPHON à certaines occasions. Cela pourrait vous suffire d'éviter tout la parallélisation.


5 Réponses :


6
votes

première option - un processus serveur

Créer un processus serveur . Cela fait partie de l'emballage multiprocessionnant qui permet un accès parallèle aux structures de données. De cette façon, chaque processus accédera directement à la structure de données, verrouillant d'autres processus.

de La documentation :

processus serveur

Un objet manager renvoyé par gestionnaire () contrôle un processus de serveur qui contient des objets Python et permet aux autres processus de les manipuler en utilisant des proxies.

Un gestionnaire renvoyé par gestionnaire () prendra en charge la liste des types, dict , Espace de noms, LOCK , ROCLO, SEMAPHORE, BOULATEDEMORE, CONDITION, ÉVÉNEMENT, File d'attente, valeur et tableau.

Deuxième option - Piscine de travailleurs

Créer un Piscine de travailleurs , une file d'attente d'entrée et une file d'attente de résultat.


0 commentaires


1
votes

Avez-vous profilé votre code? Est-ce simplement calculer F qui est trop coûteux ou stocke les résultats dans la structure de données (ou peut-être)?

Si F est dominante, vous devez vous assurer que vous ne pouvez pas apporter des améliorations algorithmiques avant de commencer à vous inquiéter de la parallélisation. Vous pourrez peut-être obtenir une grande vitesse en transformant une partie ou la totalité de la fonction dans une extension C, peut-être utiliser Cyron . Si vous y allez avec multiprocession, je ne vois pas pourquoi vous devez passer la structure de données complète entre les processus?

Si le stockage des résultats dans la matrice est trop coûteux, vous pouvez accélérer votre code en utilisant une structure de données plus efficace (comme Array.array ou Numpy. ndarray ). Sauf si vous avez été très prudent de concevoir et de mettre en œuvre votre classe matricielle personnalisée, il sera presque certainement plus lent que ceux.


1 commentaires

La classe matricielle que j'ai créée n'est qu'une petite enveloppe autour de la classe de stockage matricielle de Numpy, offrant un mappage interne d'IDS aux numéros de ligne / colonne et de certaines autres fonctionnalités associées aux colonnes et lignes marquées. J'ai fait du profilage, mais pas assez de manière globale pour fournir un rapport de qualité.



0
votes

Merci à tous pour vos réponses.

J'ai créé une solution (pas "la solution") au problème proposé et que d'autres peuvent le trouver utiles, je pose le code ici. Ma solution est une hybride d'options 1 et 3 suggérées par Adam Matan. Le code contient des numéros de ligne de ma session VI, qui aidera à la discussion ci-dessous. P>

112 def unitTest():
113  import cStringIO, os
114  from fingerprintReader import MismatchKernelReader
115  from fingerprintScorers import FeatureVectorLinearKernel
116  exampleData = cStringIO.StringIO() # 9 examples from GPCR (3,1)-mismatch descriptors, first 10 columns.
117  exampleData.write( ",AAA,AAC,AAD,AAE,AAF,AAG,AAH,AAI,AAK"  + os.linesep )
118  exampleData.write( "TS1R2_HUMAN,5,2,3,6,8,6,6,7,4" + os.linesep )
119  exampleData.write( "SSR1_HUMAN,11,6,5,7,4,7,4,7,9" + os.linesep )
120  exampleData.write( "OXYR_HUMAN,27,13,14,14,15,14,11,16,14" + os.linesep )
121  exampleData.write( "ADA1A_HUMAN,7,3,5,4,5,7,3,8,4" + os.linesep )
122  exampleData.write( "TA2R_HUMAN,16,6,7,8,9,10,6,6,6" + os.linesep )
123  exampleData.write( "OXER1_HUMAN,10,6,5,7,11,9,5,10,6" + os.linesep )
124  exampleData.write( "NPY1R_HUMAN,3,3,0,2,3,1,0,6,2" + os.linesep )
125  exampleData.write( "NPSR1_HUMAN,0,1,1,0,3,0,0,6,2" + os.linesep )
126  exampleData.write( "HRH3_HUMAN,16,9,9,13,14,14,9,11,9" + os.linesep )
127  exampleData.write( "HCAR2_HUMAN,3,1,3,2,5,1,1,6,2" )
128  columnIDs = ( "TS1R2_HUMAN", "SSR1_HUMAN", "OXYR_HUMAN", "ADA1A_HUMAN", "TA2R_HUMAN", "OXER1_HUMAN",
129                "NPY1R_HUMAN", "NPSR1_HUMAN", "HRH3_HUMAN", "HCAR2_HUMAN", )
130  m = createSimilarityMatrix( exampleData, MismatchKernelReader, FeatureVectorLinearKernel, columnIDs,
131                              verbose=True, )
132  m.SetOutputPrecision( 6 )
133  print m
134 
135 ## end of unitTest()


3 commentaires

Vous êtes probablement au courant de cela, mais votre façon de formater votre code est un peu inhabituelle et, à mon avis, il est assez difficile de lire. En particulier, la plupart des gens utilisent quatre espaces pour l'indentation - je ne pense pas qu'un espace est suffisant pour qu'il soit clair en un coup d'œil que vous êtes au niveau de l'indentation. Le conseil de PEP 8 est vraiment très utile, surtout lorsque les gens commencent à partager code les uns avec les autres.


En effet, mon code ne suit pas Pep8 Word-for-Word. Comme je préfère avoir le moins de littéraux possible dans mon code et utiliser des noms de variables décrivant l'intention du code, limitant mes lignes à 79 caractères et utiliser quatre espaces par empreinte entraînerait une croissance considérablement du nombre de lignes. Je suis certainement d'accord avec vous que la logique dans les lignes 91-106 bénéficierait d'un code qui utilise plus d'indentation, mais a choisi de présenter le code de cette façon avec des commentaires pour commencer chaque bloc logique afin de limiter sa longueur. Merci pour différents types de conseils.


Depuis la publication de ce code, j'ai découvert que cela n'éplaît pas à des exemples de 10 000 entrées ou plus, à savoir en raison de problèmes de consommation de mémoire résultant des travaux des coulisses nécessaires au multiprocession de python pour la mise à jour des structures de données. Il semble que le multiprocession + sqlite / mysql soit nécessaire afin d'élaborer le problème sur des jeux de données plus importants.



0
votes

En référence à mon dernier commentaire ci-joint au code Publié le 21 mars, j'ai trouvé MultiProcessing.Pool + SQLite (PYSQLite2) inutilisable pour ma tâche particulière, car deux problèmes ont eu lieu:

(1) à l'aide de la connexion par défaut, À l'exception du premier travailleur, chaque processus de travail effectué une requête d'insertion uniquement exécutée une fois. (2) Lorsque je modifie les mots-clés de connexion en check_same_thread = Faux, le bassin complet des travailleurs est utilisé, mais que seules certaines requêtes réussissent et que certaines requêtes échouent. Lorsque chaque travailleur a également exécuté le temps.sleep (0,01), le nombre de défaillances de requête a été réduit, mais pas entièrement. (3) moins important encore, je pouvais entendre mon disque dur de lire / écrire frénétiquement, même pour une petite liste de travaux de 10 requêtes d'insertion. P>

Je me suis ensuite eu recours à MySQL-Python, et les choses ont bien fonctionné. True, il faut configurer le démon de serveur MySQL, un utilisateur et une base de données pour cet utilisateur, mais ces étapes sont relativement simples. P>

Voici un exemple de code qui a fonctionné pour moi. Évidemment, cela pourrait être optimisé, mais il transmet l'idée de base pour ceux qui recherchent comment commencer à utiliser à l'aide de multiprocession. P>

  1 from multiprocessing import Pool, current_process
  2 import MySQLdb
  3 from numpy import random
  4
  5 
  6 if __name__ == "__main__":
  7  
  8   numValues   = 50000
  9   tableName   = "tempTable"
 10   useHostName = ""
 11   useUserName = ""  # Insert your values here.
 12   usePassword = ""
 13   useDBName   = ""
 14   
 15   # Setup database and table for results.
 16   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 17   topCursor = dbConnection.cursor()
 18   # Assuming table does not exist, will be eliminated at the end of the script.
 19   topCursor.execute( 'CREATE TABLE %s (oneText TEXT, oneValue REAL)' % tableName )
 20   topCursor.close() 
 21   dbConnection.close()
 22   
 23   # Define simple function for storing results.
 24   def work( storeValue ):
 25     #print "%s storing value %f" % ( current_process().name, storeValue )
 26     try:
 27       dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 28       cursor = dbConnection.cursor()
 29       cursor.execute( "SET AUTOCOMMIT=1" )
 30       try:
 31         query = "INSERT INTO %s VALUES ('%s',%f)" % ( tableName, current_process().name, storeValue )
 32         #print query
 33         cursor.execute( query )
 34       except:
 35         print "Query failed."
 36       
 37       cursor.close()
 38       dbConnection.close()
 39     except: 
 40       print "Connection/cursor problem."
 41   
 42   
 43   # Create set of values to assign
 44   values = random.random( numValues )
 45   
 46   # Create pool of workers 
 47   pool = Pool( processes=6 )
 48   # Execute assignments.
 49   for value in values: pool.apply_async( func=work, args=(value,) )
 50   pool.close()
 51   pool.join()
 52 
 53   # Cleanup temporary table.
 54   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 55   topCursor = dbConnection.cursor()
 56   topCursor.execute( 'DROP TABLE %s' % tableName )
 57   topCursor.close()
 58   dbConnection.close()


0 commentaires