Je cherchais un exemple utilisant Kafka Streams sur la façon de faire ce genre de chose, c'est-à-dire joindre une table clients avec une table d'adresses et transmettre les données à ES: -
"hits": [ { "_index": "customers_with_addresses", "_type": "_doc", "_id": "1", "_score": 1.3278645, "_source": { "first_name": "Sally", "last_name": "Thomas", "email": "sally.thomas@acme.com", "addresses": [{ "street": "3183 Moore Avenue", "city": "Euless", "state": "Texas", "zip": "76036", "type": "SHIPPING" }, { "street": "2389 Hidden Valley Road", "city": "Harrisburg", "state": "Pennsylvania", "zip": "17116", "type": "BILLING" }], } }, â¦.
+----+-------------+---------------------------+------------+--------------+-------+----------+ | id | customer_id | street | city | state | zip | type | +----+-------------+---------------------------+------------+--------------+-------+----------+ | 10 | 1001 | 3183 Moore Avenue | Euless | Texas | 76036 | SHIPPING | | 11 | 1001 | 2389 Hidden Valley Road | Harrisburg | Pennsylvania | 17116 | BILLING | | 12 | 1002 | 281 Riverside Drive | Augusta | Georgia | 30901 | BILLING | | 13 | 1003 | 3787 Brownton Road | Columbus | Mississippi | 39701 | SHIPPING | | 14 | 1003 | 2458 Lost Creek Road | Bethlehem | Pennsylvania | 18018 | SHIPPING | | 15 | 1003 | 4800 Simpson Square | Hillsdale | Oklahoma | 73743 | BILLING | | 16 | 1004 | 1289 University Hill Road | Canehill | Arkansas | 72717 | LIVING | +----+-------------+---------------------------+------------+--------------+-------+----------+
+------+------------+----------------+-----------------------+ | id | first_name | last_name | email | +------+------------+----------------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Davidson | ed@walker.com | | 1004 | Anne | Kim | annek@noanswer.org | +------+------------+----------------+-----------------------+
Les données de table proviennent de sujets Debezium, am Je corrige en pensant que j'ai besoin d'un peu de Java au milieu pour rejoindre les flux, le publier dans un nouveau sujet qui le transforme ensuite en ES?
Quelqu'un aurait-il un exemple de code de cela?
Merci.
3 Réponses :
Oui, vous pouvez implémenter la solution à l'aide de l'API Kafka streams en java de la manière suivante.
Voici l'exemple ci-dessous (considérant que les données sont consommées au format json):
KStream<String,JsonNode> customers = builder.stream("customer", Consumed.with(stringSerde, jsonNodeSerde)); KStream<String,JsonNode> addresses = builder.stream("address", Consumed.with(stringSerde, jsonNodeSerde)); // Select the customer ID as key in order to join with address. KStream<String,JsonNode> customerRekeyed = customers.selectKey(value-> value.get("id").asText()); ObjectMapper mapper = new ObjectMapper(); // Select Customer_id as key to aggregate the addresses and join with customer KTable<String,JsonNode> addressTable = addresses .selectKey(value-> value.get("customer_id").asText()) .groupByKey() .aggregate(() ->mapper::createObjectNode, //initializer (key,value,aggregate) -> aggregate.add(value), Materialized.with(stringSerde, jsonNodeSerde) ); //adder // Join Customer Stream with Address Table KStream<String,JsonNode> customerAddressStream = customerRekeyed.leftJoin(addressTable, (left,right) -> { ObjectNode finalNode = mapper.createObjectNode(); ArrayList addressList = new ArrayList<JsonNode>(); // Considering the address is arrayNode ((ArrayNode)right).elements().forEachRemaining(addressList ::add); left.putArray("addresses").allAll(addressList); return left; },Joined.keySerde(stringSerde).withValueSerde(jsonNodeSerde));
Vous pouvez consulter les détails sur tous les types de jointures ici: p>
https: //docs.confluent .io / current / streams / developer-guide / dsl-api.html # rejoindre
Merci Nishu! Cela clarifie les choses pour moi ... alors je suppose que je diffuserais simplement customerAddressStream vers un autre sujet pour sombrer dans ES via Kafka connect?
Oui exactement. :) '
En fonction de la rigueur de votre exigence pour imbriquer plusieurs adresses dans un nœud client, vous pouvez le faire dans KSQL (qui est construit sur Kafka Streams).
Remplissez certaines données de test dans Kafka (ce qui dans votre cas est déjà fait via Debezium):
$ curl -s http://localhost:9200/customers_with_addresses/_search | jq '.hits.hits[0]' { "_index": "customers_with_addresses", "_type": "type.name=kafkaconnect", "_id": "CUSTOMERS_WITH_ADDRESSES+0+2", "_score": 1, "_source": { "ZIP": "76505", "CITY": "Temple", "ADDRESS_TYPE": "LIVING", "CUSTOMER_ID": 1002, "FULL_NAME": "Rebeca Kerrod", "STATE": "Texas", "STREET": "55795 Derek Avenue", "LAST_NAME": "Kerrod", "FIRST_NAME": "Rebeca" } }
Lancez KSQL et pour commencer, inspectez simplement les données:
curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d '{ "name": "sink-elastic-customers_with_addresses-00", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "CUSTOMERS_WITH_ADDRESSES", "connection.url": "http://elasticsearch:9200", "type.name": "type.name=kafkaconnect", "key.ignore": "true", "schema.ignore": "true", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" } }'
Maintenant nous déclarons un STREAM
(sujet Kafka + schéma) sur les données afin de pouvoir les manipuler davantage:
ksql> SELECT * FROM CUSTOMERS_WITH_ADDRESSES WHERE CUSTOMER_ID=1002; 1558519823351 | 1002 | 1002 | Rebeca Kerrod | Rebeca | LIVING | Kerrod | 55795 Derek Avenue | Temple | Texas | 76505 1558519823351 | 1002 | 1002 | Rebeca Kerrod | Rebeca | SHIPPING | Kerrod | 164 Continental Plaza | Modesto | California | 95354
Nous allons modéliser le clients
en tant que TABLE
, et pour ce faire, les messages Kafka doivent être saisis correctement (et au moment où ils ont des clés nulles, comme le montre le " ROWKEY ":" null "
dans la sortie PRINT
ci-dessus). Vous pouvez configurer Debezium pour définir la clé de message afin que cette étape ne soit pas nécessaire pour vous dans KSQL:
ksql> DESCRIBE CUSTOMERS_WITH_ADDRESSES; Name : CUSTOMERS_WITH_ADDRESSES Field | Type ------------------------------------------ ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) CUSTOMER_ID | INTEGER (key) FULL_NAME | VARCHAR(STRING) FIRST_NAME | VARCHAR(STRING) ADDRESS_TYPE | VARCHAR(STRING) LAST_NAME | VARCHAR(STRING) STREET | VARCHAR(STRING) CITY | VARCHAR(STRING) STATE | VARCHAR(STRING) ZIP | VARCHAR(STRING) ------------------------------------------ For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Nous déclarons maintenant un TABLE
( state pour une clé donnée, instanciée à partir d'un sujet Kafka + schéma):
ksql> SHOW STREAMS; Stream Name | Kafka Topic | Format ------------------------------------------------------------------------------------------ CUSTOMERS_KEYED | CUSTOMERS_KEYED | JSON ADDRESSES_RAW | addresses | JSON CUSTOMERS_RAW | customers | JSON CUSTOMERS_WITH_ADDRESSES | CUSTOMERS_WITH_ADDRESSES | JSON
Nous pouvons maintenant joindre les données:
ksql> CREATE STREAM customers_with_addresses AS SELECT CUSTOMER_ID, FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, FIRST_NAME, LAST_NAME, TYPE AS ADDRESS_TYPE, STREET, CITY, STATE, ZIP FROM ADDRESSES_RAW A INNER JOIN CUSTOMER C ON A.CUSTOMER_ID = C.ID; Message ---------------------------- Stream created and running ----------------------------
Ceci crée un nouveau KSQL STREAM qui à son tour alimente un nouveau sujet Kafka.
ksql> CREATE TABLE CUSTOMER (ID INT, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR) WITH (KAFKA_TOPIC='CUSTOMERS_KEYED', VALUE_FORMAT='JSON', KEY='ID'); Message --------------- Table created ---------------
Le flux a un schéma:
ksql> CREATE STREAM CUSTOMERS_KEYED WITH (PARTITIONS=1) AS SELECT * FROM CUSTOMERS_RAW PARTITION BY ID; Message ---------------------------- Stream created and running ----------------------------
Nous pouvons interroger le flux:
ksql> CREATE STREAM addresses_RAW (ID INT, CUSTOMER_ID INT, STREET VARCHAR, CITY VARCHAR, STATE VARCHAR, ZIP VARCHAR, TYPE VARCHAR) WITH (KAFKA_TOPIC='addresses', VALUE_FORMAT='JSON'); Message ---------------- Stream created ---------------- ksql> CREATE STREAM customers_RAW (ID INT, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR) WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='JSON'); Message ---------------- Stream created ----------------
Nous pouvons également le diffuser sur Elasticsearch en utilisant Kafka Connect:
ksql> PRINT 'addresses' FROM BEGINNING ; Format:JSON {"ROWTIME":1558519823351,"ROWKEY":"null","id":1,"customer_id":1004,"street":"8 Moulton Center","city":"Bronx","state":"New York","zip":"10474","type":"BILLING"} {"ROWTIME":1558519823351,"ROWKEY":"null","id":2,"customer_id":1001,"street":"5 Hollow Ridge Alley","city":"Washington","state":"District of Columbia","zip":"20016","type":"LIVING"} {"ROWTIME":1558519823351,"ROWKEY":"null","id":3,"customer_id":1000,"street":"58 Maryland Point","city":"Greensboro","state":"North Carolina","zip":"27404","type":"LIVING"} {"ROWTIME":1558519823351,"ROWKEY":"null","id":4,"customer_id":1002,"street":"55795 Derek Avenue","city":"Temple","state":"Texas","zip":"76505","type":"LIVING"} {"ROWTIME":1558519823351,"ROWKEY":"null","id":5,"customer_id":1002,"street":"164 Continental Plaza","city":"Modesto","state":"California","zip":"95354","type":"SHIPPING"} {"ROWTIME":1558519823351,"ROWKEY":"null","id":6,"customer_id":1004,"street":"6 Miller Road","city":"Louisville","state":"Kentucky","zip":"40205","type":"BILLING"} {"ROWTIME":1558519823351,"ROWKEY":"null","id":7,"customer_id":1003,"street":"97 Shasta Place","city":"Pittsburgh","state":"Pennsylvania","zip":"15286","type":"BILLING"} {"ROWTIME":1558519823351,"ROWKEY":"null","id":8,"customer_id":1000,"street":"36 Warbler Circle","city":"Memphis","state":"Tennessee","zip":"38109","type":"SHIPPING"} {"ROWTIME":1558519823351,"ROWKEY":"null","id":9,"customer_id":1001,"street":"890 Eagan Circle","city":"Saint Paul","state":"Minnesota","zip":"55103","type":"SHIPPING"} {"ROWTIME":1558519823354,"ROWKEY":"null","id":10,"customer_id":1000,"street":"8 Judy Terrace","city":"Washington","state":"District of Columbia","zip":"20456","type":"SHIPPING"} ^C Topic printing ceased ksql> ksql> PRINT 'customers' FROM BEGINNING; Format:JSON {"ROWTIME":1558519852363,"ROWKEY":"null","id":1001,"first_name":"Jolee","last_name":"Handasyde","email":"jhandasyde0@nhs.uk"} {"ROWTIME":1558519852363,"ROWKEY":"null","id":1002,"first_name":"Rebeca","last_name":"Kerrod","email":"rkerrod1@sourceforge.net"} {"ROWTIME":1558519852363,"ROWKEY":"null","id":1003,"first_name":"Bobette","last_name":"Brumble","email":"bbrumble2@cdc.gov"} {"ROWTIME":1558519852368,"ROWKEY":"null","id":1004,"first_name":"Royal","last_name":"De Biaggi","email":"rdebiaggi3@opera.com"}
Résultat:
$ curl -s "https://api.mockaroo.com/api/ffa9ff20?count=10&key=ff7856d0" | kafkacat -b localhost:9092 -t addresses -P $ curl -s "https://api.mockaroo.com/api/9b868890?count=4&key=ff7856d0" | kafkacat -b localhost:9092 -t customers -P
p >
Concernant "Vous pouvez configurer Debezium pour définir la clé de message": c'est en fait toujours le cas. La clé de message représentera la ou les colonnes de clé primaire de la table capturée; ou, s'il n'y en a pas, selon le connecteur, la première clé unique de la table.
@Robin, à quoi cela ressemblerait avec KSQL si nous voulions créer un seul document contenant un client et toutes ses adresses (c'est-à-dire un tableau intégré). Je pense que c'est la représentation préférée lors de la transmission de telles données à ES. C'est ce que nous avons essayé avec le billet de blog que j'ai lié et c'est là que les choses se compliquent.
@Gunnar oui, ce n'est pas possible dans KSQL… pour le moment :) Voir github.com/confluentinc/ksql/issues / 2147
Ai-je bien compris pour rejoindre deux flux, nous devons rejoindre les flux par clé commune? Quel flux ID un est le retard?
Nous avons créé une démo et article de blog sur ce cas d'utilisation (diffusion d'agrégats vers Elasticsearch) il y a quelque temps sur le blog Debezium.
Un problème à garder à l'esprit est que cette solution (basée sur Kafka Streams, mais je pense que c'est la même chose pour KSQL) est susceptible d'exposer les résultats de jointure intermédiaire. Par exemple. Supposons que vous insérez un client et 10 adresses en une seule transaction. L'approche de jonction de flux peut d'abord produire un agrégat du client et de ses cinq premières adresses et peu de temps après l'agrégat complet avec les 10 adresses. Cela peut être souhaitable ou non pour votre cas d'utilisation spécifique. Je me souviens également que la gestion des suppressions n'est pas triviale (par exemple, si vous supprimez l'une des 10 adresses, vous devrez donc produire à nouveau l'agrégat avec les 9 adresses restantes qui auraient pu être intactes, cependant).
Une alternative à considérer peut être le modèle de boîte d'envoi où vous produisiez essentiellement un événement explicite avec le précalcul agrégé à partir de votre application elle-même. C'est à dire. cela nécessite un peu d'aide de l'application, mais cela évite les subtilités de produire ce résultat de jointure après coup.
Doit-il être en Java? KSQL fonctionnerait ici aussi, je peux poster un exemple si cela vous intéresse.
Salut Robin - Je serais très intéressé par KSQL aussi, j'ai juste pensé que Java était la façon dont je devrais le faire! :)