8
votes

Filetage de la piscine avec tâches uniques

J'utilise ThreadPiscoolTaskexecutor (du printemps) afin d'exécuter des tâches asynchroniquement.

La tâche requise chargera un objet à partir d'une DB externe dans la mémoire de mon système. J'utilise la taille de la piscine maximale de la piscine de 10 et la taille de la file d'attente maximale de 100. P>

Supposons que les 10 threads sont occupés à obtenir des objets de ma DB et une tâche est créée, il ira à la file d'attente. Maintenant une autre tâche est créée qui devrait obtenir le même objet (la même clé dans dB) de la DB, elle ira également à la file d'attente (en supposant que les 10 threads sont toujours occupés). p>

Donc, ma file d'attente risque de s'accorde facilement avec des tâches dupliquées qui seront exécutées à leur tour et que je ne veux pas que cela se produise. P>

Je pensais qu'une solution devrait être sous la forme d'une collection unique qui sert de file d'attente de piscine de fil. Sous la hotte ThreadpooltaskeExecutor utilise LinkedBlockingQueue qui ne fournit pas un caractère unique. P>

J'ai pensé à quelques solutions possibles, mais aucune ne me satisfait: p>

  • en utilisant threadpoolexecutor au lieu de Threadpooltaskexecutor. Le threadpoolexecutaire fournit un constructeur qui me permet de déterminer le type de file d'attente de la piscine de thread, mais il doit mettre en œuvre l'interface BlockingQuie. Je ne pouvais pas trouver une implémentation qui préserve l'unicité. Li> ul>

    Cela m'a amené à essayer d'étendre LinkedBlockingQuningQueue et remplacement Ajouter: P>

    public boolean add(E e)
        if(!this.contains(e)) {
            return super.add(e);
        } else {
            return false;
        }
    }
    


0 commentaires

3 Réponses :


6
votes

en utilisant GUAVA et écoutefuture Vous pouvez faire quelque chose comme ça (pas testé) xxx

résultant < / p>

  • Seuls les éléments qui ne sont pas en cours de traitement ou dans la file d'attente seront ajoutés à la file d'attente ( UNQUEQUEUE )
  • Les éléments traités seront supprimés du Uniqueffeur
  • Vous n'aurez qu'un maxium de 100 articles dans la file d'attente

    Cette implémentation ne gère pas


11 commentaires

J'ai déjà pensé à utiliser une solution de contournement similaire plus ou moins à votre suggestion (appliquer l'unicité à l'aide d'un ensemble supplémentaire). Si je ne reviendrai pas une meilleure réponse dans les prochains jours, je vais accepter le vôtre.


S'il vous plaît laissez-moi savoir si vous êtes venu à une meilleure solution dans les prochains jours.


Je viens de trouver une implémentation de blocageQueue soutenue par un likedHashashSet. Peut-être que cela vous aide: grepcode.com/file/repo1.maven.org/maven2/org.apache.marmotta / ...


Je vais bientôt regarder, le nom semble prometteur et oui - cela vous aidera certainement comme vous savez bien :)


Ok, je pense que vous m'avez fourni toutes les solutions possibles (génial!). Le second (en utilisant LinkedHashshsetBlockingQueue avec threadpoolEcutor) est très élégant, mais il pourrait ne pas être complet car encore quelques tâches similaires peuvent être exécutées en même temps (dans le cas de moins de 10 threads en cours d'exécution, pas besoin de la file d'attente dans un tel cas) . Votre première solution est «moins élégante» mais le travail est-il parfaitement. Qu'est-ce que tu penses?


"Moins élégant" signifie toujours "élégant" =) J'aime la couche supplémentaire de l'abstraction, on pourrait facilement encapsuler cette fonctionnalité en mettant en œuvre l'interface exécutorservice fournissant ce nouveau type (ou transfertexecutetorservice de GUAVA). Tant qu'il existe un moyen d'écrire un test unitaire élégant, je considère que cette approche est propre =)


Cette solution pourrait être mise en œuvre de la même manière mais sans guava et lisiblefuture. Vous pouvez simplement remplacer après -execute (runnable R, lancer t) méthode de threadpoolexecutor et faire la même chose.


J'ai fini par prolonger le threadpooltaskexecutor (de printemps) et remplacer la méthode "Exécuter".


@forhas Même si vous avez accepté cette réponse, je pense qu'il serait utile d'ajouter une autre réponse avec des détails sur votre solution finale.


@forhas pouvez-vous partager votre implémentation? J'ai le même problème et je ne veux pas utiliser de goyava.


@ user1480019 Je viens de poster une réponse, cela fait essentiellement la même chose que la réponse acceptée, mais basée sur des bibliothèques de printemps au lieu de Guava.



0
votes

Si vous êtes autorisé à gérer la base de données, je vous suggère d'utiliser la base de données elle-même pour éviter les efforts en double:

  • Ajoutez une colonne LOCKID à votre table
  • Ajoutez une colonne d'état à votre table (peut-être 'nouveau' et 'fait')
  • Assurez-vous que votre niveau d'isolement de DB est au moins read_commité

    puis essayez quelque chose comme ça, dans votre fil principal: xxx

    Les lignes qui retirent de la sélection sont ce que vous ajoutez à la file d'attente.

    Ensuite, vous avez une classe qui implémente en ligne qui traite ces lignes, en les récupérant de la file d'attente. Une fois que cela traite une ligne, vous faites une autre mise à jour SQL (à l'intérieur de l'exécutable) pour régler le retour sur zéro et définir le statut sur "Terminé".

    Cela a l'avantage de travailler même si vous avez plusieurs machines chacune avec leur propre file d'attente.


3 commentaires

Qui a dit que j'utilise un dB relationnel (je ne suis pas)? Et je ne cherche pas une solution d'optimisation de la DB, il pourrait s'agir de tout type de tâche en cours d'exécution (l'accès à une DB n'est qu'un cas privé).


C'est juste; Je faisais juste ce que j'ai vu dans votre question ("chargera un objet d'un dB externe"). Ce que j'ai écrit pourrait également s'appliquer à la non-RDB, en fonction de vos paramètres de cohérence.


Le titre des questions est la "file d'attente de la piscine de thread avec des tâches uniques". Vous avez bien sûr eu de bonnes intentions mais je pense que vous avez manqué un peu. Merci de partager de toute façon :)



1
votes

Une solution similaire à la solution acceptée mais basée sur le ressort (par opposition à GUAVA):

Créer une interface RunnableWithID strong>: p>

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.Set;

/**
 * In addition to all the abilities of ThreadPoolTaskExecutor adds the ability
 * to execute a task only if it is not already running/queued using the
 * executeIfNotQueuedOrRunningAlready method.
 *
 * @see ThreadPoolTaskExecutor
 */
public class UniquTaskExecutor extends ThreadPoolTaskExecutor implements TaskWithIdExecutor {

    private Set<String> queuedTasks;

    public UniquTaskExecutor() {
        queuedTasks = Sets.newConcurrentHashSet();
    }

    @Override
    public void execute(Runnable task) {
        super.execute(task);
    }

    /**
     * @param task The task to execute
     */
    @Override
    public void executeIfNotQueuedOrRunningAlready(RunnableWithId task) {
        if (queuedTasks.add(task.getTaskId())) {
            ListenableFuture<?> res = submitListenable(task);
            res.addCallback(new ListenableFutureCallback<Object>() {
                @Override
                public void onFailure(Throwable throwable) {
                    queuedTasks.remove(task.getTaskId());
                }

                @Override
                public void onSuccess(Object o) {
                    queuedTasks.remove(task.getTaskId());
                }
            });
        }
    }
}


0 commentaires