J'essaie de faire une capture de données de modification à partir d'Oracle DB en utilisant le flux de données Spring Cloud avec Kafka comme courtier. J'utilise un mécanisme de sondage pour cela. J'interroge la base de données avec une requête de sélection de base à intervalles réguliers pour capturer toutes les données mises à jour. Pour un meilleur système anti-échec, j'ai conservé l'heure de mon dernier sondage dans oracle DB et je l'ai utilisé pour obtenir les données qui sont mises à jour après le dernier sondage.
public MessageSource<Object> jdbcMessageSource() { String query = "select * from kafka_test where LAST_UPDATE_TIME > '"+<Last poll time value read from kafka comes here>+"'"; JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(this.dataSource, query); return jdbcPollingChannelAdapter; }
Mes requêtes dans les propriétés de l'application sont les suivantes :
query: select * from kafka_test where LAST_UPDATE_TIME >(select LAST_POLL_TIME from poll_time) update : UPDATE poll_time SET LAST_POLL_TIME = CURRENT_TIMESTAMP
Cela fonctionne parfaitement pour moi. Je suis capable d'obtenir le CDC de la base de données avec cette approche.
Le problème que je cherche maintenant est ci-dessous:
Créer une table juste pour maintenir l'heure d'interrogation est une surcharge . Je cherche à conserver cette dernière heure de sondage dans un sujet kafka et à récupérer cette heure à partir du sujet kafka lorsque je fais le prochain sondage.
J'ai modifié la méthode jdbcMessageSource
comme ci-dessous pour essayer ça:
public MessageSource<Object> jdbcMessageSource() { JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery()); jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate()); return jdbcPollingChannelAdapter; } @Bean public IntegrationFlow pollingFlow() { IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),spec -> spec.poller(Pollers.fixedDelay(3000))); flowBuilder.channel(this.source.output()); flowBuilder.transform(trans,"transform"); return flowBuilder.get(); }
Mais le Spring Data Flow instancie le bean pollingFlow () (voir le code ci-dessus) une seule fois. Par conséquent, quelle que soit la requête exécutée en premier, elle restera la même. Je souhaite mettre à jour la requête avec une nouvelle heure de sondage pour chaque sondage.
Existe-t-il un moyen d'écrire un Integrationflow
personnalisé pour que cette requête soit mise à jour chaque fois que je fais un sondage?
J'ai essayé IntegrationFlowContext
pour cela mais je n'ai pas réussi.
Merci d'avance !!!
3 Réponses :
Voir la réponse d'Artem pour le mécanisme d'une requête dynamique dans l'adaptateur standard; une alternative, cependant, serait simplement d'encapsuler un JdbcTemplate
dans un Bean et de l'invoquer avec
.from(() -> jdbcTemplate...)
ou même un simple lambda
IntegrationFlows.from(myPojo(), "runQuery", e -> ...) ...
Voir ma réponse, s'il vous plaît,
@Gary Russell Merci beaucoup pour votre réponse. Cela nous a vraiment aidés à résoudre le problème.
Nous avons cette configuration de test (désolé, c'est un XML):
<inbound-channel-adapter query="select * from item where status=:status" channel="target" data-source="dataSource" select-sql-parameter-source="parameterSource" update="delete from item"/> <beans:bean id="parameterSource" factory-bean="parameterSourceFactory" factory-method="createParameterSourceNoCache"> <beans:constructor-arg value=""/> </beans:bean> <beans:bean id="parameterSourceFactory" class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory"> <beans:property name="parameterExpressions"> <beans:map> <beans:entry key="status" value="@statusBean.which()"/> </beans:map> </beans:property> <beans:property name="sqlParameterTypes"> <beans:map> <beans:entry key="status" value="#{ T(java.sql.Types).INTEGER}"/> </beans:map> </beans:property> </beans:bean> <beans:bean id="statusBean" class="org.springframework.integration.jdbc.config.JdbcPollingChannelAdapterParserTests$Status"/>
Faites attention à ExpressionEvaluatingSqlParameterSourceFactory
et à son createParameterSourceNoCache () code> usine. Ce résultat peut être utilisé pour la
select-sql-parameter-source
.
Le JdbcPollingChannelAdapter
a un setSelectSqlParameterSource
sur la question.
Donc, vous configurez un ExpressionEvaluatingSqlParameterSourceFactory
pour pouvoir résoudre un paramètre de requête en tant qu'expression pour un appel de méthode bean afin d'obtenir une valeur souhaitée de Kafka. Ensuite, createParameterSourceNoCache ()
vous aidera à obtenir une SqlParameterSource
attendue .
Il y a aussi quelques informations dans la documentation: https://docs.spring.io/spring-integration/ docs / current / reference / html / # jdbc-inbound-channel-adapter
Avec l'aide des deux réponses ci-dessus, j'ai pu comprendre l'approche.
Écrivez un modèle jdbc
et enveloppez-le comme un bean et utilisez-le pour le Flux d'intégration
.
@EnableBinding(Source.class) @AllArgsConstructor public class StockSource { private DataSource dataSource; @Autowired private JdbcTemplate jdbcTemplate; private MessageChannelFactory messageChannelFactory; // You can use normal message channel which is available in spring cloud data flow as well. private List<String> findAll() { jdbcTemplate = new JdbcTemplate(dataSource); String time = "10/24/60" . (this means 10 seconds for oracle DB) String query = << your query here like.. select * from test where (last_updated_time > time) >>; return jdbcTemplate.query(query, new RowMapper<String>() { @Override public String mapRow(ResultSet rs, int rowNum) throws SQLException { ... ... any row mapper operations that you want to do with you result after the poll. ... ... ... // Change the time here for the next poll to the DB. return result; } }); } @Bean public IntegrationFlow supplyPollingFlow() { IntegrationFlowBuilder flowBuilder = IntegrationFlows .from(this::findAll, spec -> { spec.poller(Pollers.fixedDelay(5000)); }); flowBuilder.channel(<<Your message channel>>); return flowBuilder.get(); } }
Dans notre cas d'utilisation, nous étions persistance de l'heure du dernier sondage dans un sujet kafka. C'était pour réduire l'état de l'application. Chaque nouveau sondage dans la base de données aura maintenant une nouvelle heure dans la condition where
.
PS: votre courtier de messagerie (kafka / rabbit mq) devrait fonctionner dans votre local ou connectez-vous à eux s'ils sont hébergés sur une plate-forme différente.
God Speed !!!