en utilisant debezium-mongodb-connector j'ai réussi à pousser mes collections vers kafka, le seul problème auquel je suis confronté est que le champ date
dans l'une de mes collections avec ce format 2019-05-14T23 : 25: 34.703 + 00: 00, ce n'est pas poussé vers le sujet avec le même format mais j'obtiens plutôt quelque chose comme ça 1560708085175.
c'est ma commande de connecteur debezium connect-standalone / etc / kafka / connect -standalone.properties /etc/kafka/connect-mongodb-source.properties
ceci est mon exemple de collection mongodb.
name=mongodb-source-connector connector.class=io.debezium.connector.mongodb.MongoDbConnector mongodb.hosts=repracli/**.**.**.***27017 mongodb.name=mongo_conn initial.sync.max.threads=1 tasks.max=1 transforms=unwrap transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongo$ transforms.unwrap.operation.header=true
et ceci est mon exemple de sujet kafka.
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"sender"},{"type":"string","optional":true,"field":"receiver"},{"type":"string","optional":true,"field":"receiverWalletId"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"field":"type"},{"type":"int32","optional":true,"field":"amount"},{"type":"int32","optional":true,"field":"totalFee"},{"type":"int64","optional":true,"field":"createdAt"},{"type":"int64","optional":true,"field":"updatedAt"},{"type":"int32","optional":true,"field":"__v"},{"type":"string","optional":true,"field":"from"},{"type":"string","optional":true,"field":"orderId"},{"type":"string","optional":true,"field":"id"}],"optional":false,"name":"mongo_conn.digi.transactions"},"payload":{"sender":"5cef970ca2e9c273c655483","receiver":"5cef970ca2e9c27355c483","receiverWalletId":"5cef970ca2e9c27556c484","status":"pending","type":"topup","amount":6000,"totalFee":0,"createdAt":1560708024322,"updatedAt":1560708024753,"__v":0,"from":"smt","orderId":"d7a97581-9d18-79cd-8b09-16e400a43714","id":"5d0683b8be4af834abe3cf58"}}
et voici ma connexion -mongodb-source.properties
{"_id":"5cdb4e6ed767ba70593e2aa8","sender":"5cdb43db4505956efc70ba03","receiver":"5cdb43db4505956efc70ba03","receiverWalletId":"5cdb43db4505956efc70ba04","status":"succes","type":"topup","amount":200000,"totalFee":0,"createdAt":"2019-05-14T23:25:34.703Z","updatedAt":"2019-05-14T23:25:35.132Z","__v":0,"details":"none."}
3 Réponses :
Debezium diffuse les données au format tel qu'elles sont stockées dans oplog. La date ressemble à un horodatage Unix en millisecondes depuis l'époque.
Vous pouvez écrire un SMT ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect ) qui manipulera le message et le convertira les champs demandés dans votre représentation sous forme de chaîne préférée.
Si vous regardez dans org.bson.BsonDateTime
, vous verrez qu'il s'agit vraiment d'une valeur longue
.
J'ai ajouté ces lignes à mon connect-mongodb-source.properties. "transforms": "TimestampConverter", "transforms.TimestampConverter.field": "createdAt" "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter $ Valu e", "transforms.TimestampConverter. format ":" aaaa-MM-jj HH: mm: ss ZZZ "" transforms.TimestampConverter.target.type ":" string "mais cela ne fonctionne pas
Résolu
name=mongodb-source-connector connector.class=io.debezium.connector.mongodb.MongoDbConnector mongodb.hosts=repracli/**.**.**.***:27017 mongodb.name=mongo_conn initial.sync.max.threads=1 tasks.max=1 transforms=unwrap,convert,convert2,convert3,convert4 transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope transforms.unwrap.operation.header=true transforms.convert.type=org.apache.kafka.connect.transforms.TimestampConverter$Value transforms.convert.target.type=string transforms.convert.field=createdAt transforms.convert.format=yyyy-MM-dd HH:mm:ss ZZZ transforms.convert2.type=org.apache.kafka.connect.transforms.TimestampConverter$Value transforms.convert2.target.type=string transforms.convert2.field=updatedAt transforms.convert2.format=yyyy-MM-dd HH:mm:ss ZZZ transforms.convert3.type=org.apache.kafka.connect.transforms.TimestampConverter$Value transforms.convert3.target.type=string transforms.convert3.field=created_at transforms.convert3.format=yyyy-MM-dd HH:mm:ss ZZZ transforms.convert4.type=org.apache.kafka.connect.transforms.TimestampConverter$Value transforms.convert4.target.type=string transforms.convert4.field=updated_at transforms.convert4.format=yyyy-MM-dd HH:mm:ss ZZZ
Pour plusieurs transformations, vous aurez besoin de quelque chose comme:
transforms=unwrap,convert1,convert2 transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope transforms.unwrap.operation.header=true transforms.convert1.type=org.apache.kafka.connect.transforms.TimestampConverter$Value transforms.convert1.target.type=string transforms.convert1.field=createdAt transforms.convert1.format=yyyy-MM-dd HH:mm:ss ZZZ transforms.convert2.type=org.apache.kafka.connect.transforms.TimestampConverter$Value transforms.convert2.target.type=string transforms.convert2.field= *updatedAt* transforms.convert2.format=yyyy-MM-dd HH:mm:ss ZZZ