9
votes

TENSORFLOW: Enqueuse et déséquilibrer une file d'attente de plusieurs threads

Le problème que j'essaie de résoudre est le suivant: J'ai une liste formiMGS de noms de fichiers. J'ai défini un

  • tf.randomshaufflequeur avec son capacité = len (formadimgs) et min_after_dequeue = 0 .
  • Ce tf.randomshaufflequeur devrait être rempli par formidimgs pour un epochlimit spécifié Nombre de fois.
  • Un certain nombre de fils devraient fonctionner en parallèle. Chaque fil de thread est un élément du tf.randomshaufflequeur et fait certaines opérations sur elle et en faisait en conclusion à une autre file d'attente. J'ai cette partie droite.
  • Cependant, une fois 1 époch de drainimgs ont été traités et le tf.randomshauffleueue est vide, à condition que l'époque actuelle e , la file d'attente doit à nouveau être remplie et les threads doivent fonctionner à nouveau.

    La bonne nouvelle est la suivante: je l'ai travaillé dans un certain cas (voir PS à la fin !!)

    La mauvaise nouvelle est: i pense qu'il y a une meilleure façon de le faire.

    La méthode que j'utilise pour le faire maintenant est la suivante (j'ai simplifié les fonctions et avoir supprimé le prétraitement basé sur l'image et l'ensemble ultérieur, mais le cœur du traitement reste le même !!): xxx

    La fonction de travail est la suivante: xxx

    donc, bien que Cela fonctionne, j'ai l'impression qu'il existe un moyen meilleur et plus propre d'y parvenir. Donc, en quelques mots, mes questions sont les suivantes:

    1. Y a-t-il un moyen plus simple et plus propre d'atteindre cette tâche dans Tensorflow?
    2. Y a-t-il un problème avec la logique de ce code? Je ne suis pas très expérimenté avec des scénarios multithreading, de sorte que tout défaut évident qui a sauté mon attention serait très utile pour moi.

      P.S: Il semble que ce code ne soit pas parfait après tout. Lorsque j'ai couru avec 1,2 million d'images et 200 threads, il a couru. Cependant, lorsque je l'exécute pour 10 images et 20 threads, il donne l'erreur suivante: xxx

      Je pensais que je l'ai couvert par sauf tf.Errors.cancellederror < / code>. Qu'est-ce qui se passe ici?


0 commentaires

3 Réponses :


1
votes

Je recommande d'avoir un seul fil appelant Enqueue_many Epochs Times Enqueue le nombre d'images correct. Il peut alors fermer la file d'attente. Cela vous permettrait de simplifier votre fonction de travail et d'autres threads.


2 commentaires

Merci mais j'aimerais utiliser plusieurs threads pour accélération car il y a des étapes de prétraitement complexes que je dois faire


Vous pouvez utiliser un thread pour en faisez des noms de fichiers dans la file d'attente principale, puis plusieurs threads pour déroger ces noms de fichiers, préprocessez et en faisez-les dans la file d'attente finale.



3
votes

J'ai enfin découvert la réponse. Le problème était que plusieurs threads s'affrontent ensemble sur divers points de la fonction code>. La fonction Work () suivante () code> fonctionne parfaitement.

def work(coord, val, sess, epoch, maxepochs, incrementepoch, supplyimg, q, lock, close_op):
    print('I am thread number %s'%(threading.current_thread().name))
    print('I can see a queue with size %d'%(sess.run(q)))
    while not coord.should_stop():
        lock.acquire()
        if sess.run(q) > 0:
            filename, currepoch = sess.run([val, epoch])
            filename = filename.decode(encoding='UTF-8')
            tid = threading.current_thread().name
            print(filename + ' ' + str(currepoch) + ' thread ' + str(tid))
        elif sess.run(epoch) < maxepochs:
            print('Thread %s has acquired the lock'%(threading.current_thread().name))
            print("The previous epoch = %d"%(sess.run(epoch)))
            sess.run([incrementepoch, supplyimg])
            sz = sess.run(q)
            print("The new epoch = %d"%(sess.run(epoch)))
            print("The new queue size = %d"%(sz))
    else:
            coord.request_stop()
        lock.release()

    return None


0 commentaires

1
votes

Je pense que le GIL empêchera tout parallélisme réel d'être fait dans ces threads.

Pour obtenir des performances avec TENSORFLOW, vous devez conserver vos données dans TENSORFLOW.

TENSOR Flow's Guide de lecture de la lecture explique comment aborder un problème de problème très similaire.

Plus précisément, vous semblez avoir réécrit une partie importante de string_input_producer .


3 commentaires

J'utilise des données réelles. string_input_producteur () ne dit pas que l'époque de données qu'il extrait à tout moment. Cela fait simplement sue que n'importe quel document est extrait de EPOCH nombre de fois. Donc, ma mise en œuvre n'est pas une réécriture de string_input_producer () . Je comprends que la façon dont je le fais est, c'est peut-être pas le meilleur moyen, mais je dois conserver un contrôle direct et très précis sur l'époque et l'itération particulières. Les données proviennent et je ne semblais pas trouver quoi que ce soit dans le Guide de données. à propos de ça. Je vais le lire à nouveau cependant.


Ah, merci de clarifier. Je suppose que je ne comprends pas complètement le problème. Pourquoi ne pas utiliser (chaîne, époque) paires?


Je l'ai utilisé. J'ai utilisé la variable epoch pour calculer l'époque