1
votes

Comment éviter la double insertion dans Oracle avec le multithreading

Dans mon application, pendant que je fais INSERT dans Oracle, j'ai eu beaucoup d'exceptions à propos de la double insertion.

Mon code de test ressemble à ça

240981  JpaPersistenceUnit  TRACE  [pool-25-thread-3] openjpa.jdbc.SQL - <t 1427395137, conn 1023570122> executing prepstmnt 743213969 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240983  JpaPersistenceUnit  TRACE  [pool-25-thread-9] openjpa.jdbc.SQL - <t 1116539025, conn 246735198> executing prepstmnt 468904024 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-5] openjpa.jdbc.SQL - <t 2107513837, conn 1168031715> executing prepstmnt 1872262728 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-1] openjpa.jdbc.SQL - <t 1881630463, conn 2024928498> executing prepstmnt 1258578230 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-7] openjpa.jdbc.SQL - <t 1202968848, conn 1876787130> executing prepstmnt 1733696457 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]

240998  JpaPersistenceUnit  TRACE  [pool-25-thread-9] openjpa.jdbc.SQL - <t 1116539025, conn 246735198> executing prepstmnt 752805342 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params= (String) a,   (String) b]
240999  JpaPersistenceUnit  TRACE  [pool-25-thread-3] openjpa.jdbc.SQL - <t 1427395137, conn 1023570122> executing prepstmnt 1035550395 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params= (String) a,   (String) b]
240999  JpaPersistenceUnit  TRACE  [pool-25-thread-5] openjpa.jdbc.SQL - <t 2107513837, conn 1168031715> executing prepstmnt 1439514282 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]
241000  JpaPersistenceUnit  TRACE  [pool-25-thread-1] openjpa.jdbc.SQL - <t 1881630463, conn 2024928498> executing prepstmnt 1158780577 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]
241000  JpaPersistenceUnit  TRACE  [pool-25-thread-7] openjpa.jdbc.SQL - <t 1202968848, conn 1876787130> executing prepstmnt 1082517334 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]



41018  JpaPersistenceUnit  TRACE  [pool-25-thread-4] openjpa.Runtime - An exception occurred while ending the transaction.  This exception will be re-thrown.<openjpa-2.4.0-r422266:1674604 fatal store error> org.apache.openjpa.util.StoreException: The transaction has been rolled back.  See the nested exceptions for details on the errors that occurred.
FailedObject: com.test.SomeClass@19df04ab
	at org.apache.openjpa.kernel.BrokerImpl.newFlushException(BrokerImpl.java:2368)
	at org.apache.openjpa.kernel.BrokerImpl.flush(BrokerImpl.java:2205)
	at org.apache.openjpa.kernel.BrokerImpl.flushSafe(BrokerImpl.java:2103)
	at org.apache.openjpa.kernel.BrokerImpl.beforeCompletion(BrokerImpl.java:2021)
	at org.apache.openjpa.kernel.LocalManagedRuntime.commit(LocalManagedRuntime.java:81)
	at org.apache.openjpa.kernel.BrokerImpl.commit(BrokerImpl.java:1526)
	at org.apache.openjpa.kernel.DelegatingBroker.commit(DelegatingBroker.java:932)
	at org.apache.openjpa.persistence.EntityManagerImpl.commit(EntityManagerImpl.java:569)
	at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:514)
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:475)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:270)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:94)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:633)
	at com.test.Dao$$EnhancerBySpringCGLIB$$c4aa5f08.insertObject(<generated>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
	at org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.doInvoke(ServiceInvoker.java:58)
	at org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.invoke(ServiceInvoker.java:62)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invokeUnprivileged(ServiceTCCLInterceptor.java:56)
	at org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invoke(ServiceTCCLInterceptor.java:39)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.osgi.service.importer.support.LocalBundleContextAdvice.invoke(LocalBundleContextAdvice.java:59)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
	at com.sun.proxy.$Proxy255.insertObject(Unknown Source)
	at com.test.OtherClass.lambda$method$5(OtherClass.java:146)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: <openjpa-2.4.0-r422266:1674604 fatal store error> org.apache.openjpa.util.ObjectExistsException: ORA-00001: unique constraint (TABLE_PK) violated
 {prepstmnt 1119780936 INSERT INTO TABLE (COLUMN1, COLUMN2) VALUES (?, ?) [params=(String) a, (String) b]} [code=1, state=23000]
FailedObject: com.test.entities.Table@19df04ab
	at org.apache.openjpa.jdbc.sql.DBDictionary.narrow(DBDictionary.java:4986)
	at org.apache.openjpa.jdbc.sql.DBDictionary.newStoreException(DBDictionary.java:4961)
	at org.apache.openjpa.jdbc.sql.SQLExceptions.getStore(SQLExceptions.java:133)
	at org.apache.openjpa.jdbc.sql.SQLExceptions.getStore(SQLExceptions.java:75)
	at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushBatch(BatchingPreparedStatementManagerImpl.java:225)
	at org.apache.openjpa.jdbc.kernel.BatchingConstraintUpdateManager.flush(BatchingConstraintUpdateManager.java:63)
	at org.apache.openjpa.jdbc.kernel.AbstractUpdateManager.flush(AbstractUpdateManager.java:104)
	at org.apache.openjpa.jdbc.kernel.AbstractUpdateManager.flush(AbstractUpdateManager.java:77)
	at org.apache.openjpa.jdbc.kernel.JDBCStoreManager.flush(JDBCStoreManager.java:731)
	at org.apache.openjpa.kernel.DelegatingStoreManager.flush(DelegatingStoreManager.java:131)
	... 43 more
Caused by: org.apache.openjpa.lib.jdbc.ReportingSQLException: ORA-00001: unique constraint (TABLE_PK) violated
 {prepstmnt 1119780936 INSERT INTO TABLE (COLUMN1, COLUMN2) VALUES (?, ?) [params=(String) a, (String) b]} [code=1, state=23000]
	at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.wrap(LoggingConnectionDecorator.java:218)
	at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.wrap(LoggingConnectionDecorator.java:194)
	at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.access$1000(LoggingConnectionDecorator.java:58)
	at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator$LoggingConnection$LoggingPreparedStatement.executeUpdate(LoggingConnectionDecorator.java:1133)
	at org.apache.openjpa.lib.jdbc.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:275)
	at org.apache.openjpa.jdbc.kernel.JDBCStoreManager$CancelPreparedStatement.executeUpdate(JDBCStoreManager.java:1791)
	at org.apache.openjpa.jdbc.kernel.PreparedStatementManagerImpl.executeUpdate(PreparedStatementManagerImpl.java:268)
	at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushSingleRow(BatchingPreparedStatementManagerImpl.java:254)
	at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushBatch(BatchingPreparedStatementManagerImpl.java:157)
	... 48 more

Dans openJpa log Je vois quelque chose comme ça

class SomeClass{
EntityManager em;
Dao dao;

@Override
void insert(String a, String b){
    MyObject object =new MyObject(a,b);
    dao.insertObject(object);
    }
}

class OtherClass{
    private final ExecutorService completableFutureExecutor =
            new ThreadPoolExecutor(10, 11, 30L, TimeUnit.SECONDS, new SynchronousQueue<>());

    public void method() {

        Runnable task1 = () -> dao.insert("a","b");
        for (int i = 0; i < 5; i++) {
            completableFutureExecutor.submit(task1);
        }
    }
}

Comment puis-je éviter cela? Parce que j'ai beaucoup d'erreurs comme celle-là en production.

UPD a ajouté un nouveau journal dans l'extrait de code.

Mon application localise sur les deux serveurs (nœuds). Chaque serveur connecté à DB. Donc, mon test, nous pouvons multiplier par 2.


14 commentaires

"Voir les exceptions imbriquées pour plus de détails sur les erreurs qui se sont produites." - Quelles étaient ces exceptions imbriquées?


L'exception imbriquée a indiqué que l'objet déjà dans la base de données


Veuillez nous montrer l'exception + stacktrace. Mettez-le dans la question.


vous pouvez centraliser l'insertion dans un thread unique à l'aide d'un Executors. newSingleThreadExecutor () . Cette "file d'attente" est remplie par plusieurs threads (en supposant que les threads auraient besoin d'un certain temps pour créer l'insertion). Ensuite, en fonction du nombre d'insertions, vous pouvez utiliser un lot qui est envoyé toutes les # secondes. Il s'agit de la gestion la plus simple de l'insertion simultanée -> suppression de la concurrence!


Vous devez faire un "insérer s'il n'existe pas". Il existe au moins 3 façons de procéder. Consultez les questions suivantes: stackoverflow.com/questions/1702832 , stackoverflow.com/questions/3147874 , stackoverflow.com/questions/10824764 ... et probablement autres. (Choisissez une description que vous pouvez comprendre.)


@StephenC, j'ai ajouté un journal. Et maintenant je lis vos exemples


Notez que les liens ci-dessus sont pour vanilla SQL. Avec JPA ... il peut y avoir une autre approche.


Dans la base de données, définissez un champ unique et la base de données devrait renvoyer une erreur si vous essayez de double insertion


@Starmixcraft, la clé primaire est constituée des deux colonnes, c'est donc une clé primaire complexe


Peut-être qu'il y a quelque chose de comparable dans Oracle DB


@AxelH, votre variante n'a pas reproduit l'exception, mais elle reproduit plusieurs insertions dans DB. Et je ne comprends pas, comment puis-je créer un lot avec seulement 1 question Thread -1 et 2 questions - cela semble être synchronisé à partir de la réponse, n'est-ce pas?


Que souhaitez-vous souhaiter lors de l’insertion de données déjà présentes? Pourquoi essayez-vous d'insérer les mêmes données simultanément en premier lieu?


Je le veux INSERT juste une fois et s'il est déjà dans DB, sélectionnez-le. Mais j'ai environ 5 méthodes dans différentes classes qui peuvent appeler cela INSERT à temps, à cause de cela, cette exception devrait être gérée


Pour éviter la duplication, vous pouvez créer une contrainte unique et intercepter l'exception sur votre DAO. Je ne vois pas pourquoi le backend devrait filtrer ces doublons, cette logique est meilleure dans la base de données. Concurrence = / = Duplication


4 Réponses :


1
votes

Vous pourriez éventuellement synchroniser votre objet doa, ce qui signifie qu'il ne peut être exécuté que par un thread à la fois.

  @Override
  void insert(String a, String b) {
    MyObject object = new MyObject(a, b);
    synchronized (dao) {
      dao.insertObject(object);
    }
  }

Quelque chose comme ce qui précède

Le principal avantage du mot-clé synchronisé est que nous pouvons résoudre les données problèmes d'incohérence. Mais le principal inconvénient de synchronisé Le mot-clé est-il augmente le temps d'attente des threads et affecte les performances du système. Par conséquent, s'il n'y a pas d'exigence spécifique, il n'est jamais recommandé d'utiliser un mot-clé synchronisé.


5 commentaires

Merci pour votre réponse. J'y ai pensé, mais j'ai environ 10 à 50 appels par seconde avec des requêtes comme celle-là. Cela ralentira l'échange de bases de données, n'est-ce pas?


Mais si vous pouvez créer deux instances de votre dao , le verrou ne fonctionnera pas


Je ne peux pas encore commenter le message principal des OP, mais vous attendez-vous à insérer des données si elles existent déjà @Dred? Sinon, votre clé composite fonctionne comme prévu, A + B ne peut être inséré qu'une seule fois


@forzend, je sais qu'il ne peut être inséré qu'une seule fois, à cause de cela je fais merge . openJpa se fait sélectionner et s'il retourne NULL, il fait INSERT


ce fil n'a peut-être pas la réponse, mais explique également son utilisation. Nous avons en fait eu un problème très similaire avec notre application avec des données transactionnelles. Notre travail est actuellement synchronisé . Bien que cela ralentisse, c'était le meilleur moyen de garantir des données cohérentes



0
votes

L'utilisation d'un seul EntityManager à partir de threads simultanés est probablement une mauvaise idée, comme vous l'avez peut-être vu dans les journaux: le meilleur des cas est qu'une exception dans un thread annule la transaction unique de l'EM que tous les autres threads essaient également d'utiliser. Le pire des cas est celui de toutes sortes de problèmes de concurrence, de conditions de course, etc.

Je le veux INSÉRER juste une fois et s'il est déjà dans DB, sélectionnez-le. Mais je ont environ 5 méthodes dans différentes classes qui peuvent appeler cela INSERT à temps, à cause de cela, cette exception devrait être traitée

Dans ce cas, vous n'aurez probablement pas d'autre choix que de synchroniser vos appels DAO via d'autres moyens; pourrait être fait en verrouillant un enregistrement existant dans la base de données, ou de toute autre manière. Vous pouvez également essayer EntityManager.merge () , mais je ne pense pas que cela résoudrait votre problème d'écriture simultanée de deux machines séparées.


7 commentaires

À propos de EntityManager.merge () Je l'utilise maintenant. Qu'en est-il de Memcached et etc.?


Et si EntityManager est une mauvaise idée pour la concurrence, lequel est mieux? Je suis le plus récent dans ce domaine


Chaque thread doit avoir sa propre instance de EntityManager ; et avec lui sa propre connexion à la base de données et à la transaction.


«Qu'en est-il de Memcached et etc. - Que veux-tu dire?


Donc, si j'ai environ 100 threads sur l'application, ce serait environ 2 * 100 connexion dans DB intime? Et même cela, je pourrai résoudre mon problème alors que je dois insérer dans un intervalle de 0,001 seconde. Memcache est une bibliothèque qui permet de sauvegarder des informations en mémoire.


Oui, si vous souhaitez que 100 accès à la base de données se produisent simultanément, vous aurez certainement besoin de 100 connexions à la base de données. - Je ne comprends pas comment un cache mémoire résoudrait votre problème d'obtention de données cohérentes dans une base de données.


Notez que vous pouvez bien sûr utiliser une sorte de regroupement de connexions pour limiter le nombre de connexions. Néanmoins, chaque thread doit avoir sa ou ses propres transaction (s) .



0
votes

J'utilise maintenant memcache, mais ce n'est pas la solution finale

 private int expire = 2;

 public <T> void insert(Supplier<T> supplier, String... keyLine) throws InterruptedException, MemcachedException, TimeoutException {
        String key = ParseUtils.collectToKeyWithDot(keyLine);
        T value = getCache(key);
        if (value == null) {
            value = supplier.get();
            setCache(key, value);
        } 
    }

 private <T> Boolean setCache(String key, T value) throws InterruptedException, MemcachedException, TimeoutException {
        return memcacheClient.set(key, expire, value);
    }

    private <T> T getCache(String key) throws InterruptedException, MemcachedException, TimeoutException {
        return memcacheClient.get(key);
    }

Ici, je stocke mon premier INSERT dans DB et aussi dans le cache pendant 2 secondes. Dans ces 2 secondes, si un thread tente d'insérer les mêmes valeurs dans DB et cela ne se produira pas. Tout d'abord, il serait vérifié dans le cache. Pour moi, 2 secondes suffisent pour éviter les exceptions.

P.S. Je recherche toujours une solution plus élégante.


0 commentaires

0
votes

Vous voudrez peut-être envisager une écriture différée à partir de l'objet d'accès aux données, quelque chose comme ceci -

class SomeClass {
    private EntityManager em;
    private Dao dao;
    private Set<MyObject> writableObjects = new HashSet<>();

    @Override
    public void insert(String a, String b) {
        MyObject object = new MyObject(a, b);
        writableObjects.add(object);
    }

    @override
    public void commit() {
        writableObjects.forEach(object -> dao.insertObject(object));
    }
}

class OtherClass {
    private final ExecutorService completableFutureExecutor = new ThreadPoolExecutor(10, 11, 30L, TimeUnit.SECONDS,
            new SynchronousQueue<>());

    public void method() {

        Runnable task1 = () -> dao.insert("a", "b");
        for (int i = 0; i < 5; i++) {
            completableFutureExecutor.submit(task1);
        }

        dao.commit();
    }
}


5 commentaires

C'est une bonne idée d'utiliser Set , j'y ai pensé, mais incluez dans SET pas Object mais une insertion complète comme une String comme String insert = "INSERT INTO TABLE ..."; Définissez inserts.add (insert) pour éviter les doubles insertions égales. Cela ressemble à son propre lot. Set pourrait avoir une taille comme 5-10 et il devrait être atomique ou simultané.


Eh bien, de toute façon, ce serait bien. Tant que vous disposez d'un moyen fiable pour identifier les instances uniques. Idéalement, si vous souhaitez ajouter un objet personnalisé comme celui-ci à un Set, vous pouvez remplacer les méthodes hashCode () et equals (...). De nos jours, tout bon IDE ferait un assez bon travail, en les générant pour vous.


comment le remplacement de Hash peut-il m'aider ici?


Un HashSet en java est sauvegardé en interne par une structure de données HashMap, qui utilise le hashcode pour déterminer l'index de stockage principal de l'objet; par conséquent, implémenter un moyen fiable de calculer un hashcode (c'est-à-dire remplacer la méthode hashCode ()) vous aiderait à obtenir des lectures efficaces.


En attendant, pour aborder davantage votre point d'avoir un ensemble atomique ou concurrent; vous avez tout à fait raison à ce sujet, car votre code modifierait le même ensemble, avec plusieurs threads. Voici quelques-unes des façons dont vous pouvez créer un ensemble simultané - [1] Set myConcurrentSet = ConcurrentHashMap.newKeySet (); [2] Définissez myConcurrentSet = new ConcurrentSkipListSet ();