3
votes

Exemple de jointure de flux avec Apache Kafka?

Je cherchais un exemple utilisant Kafka Streams sur la façon de faire ce genre de chose, c'est-à-dire joindre une table clients avec une table d'adresses et transmettre les données à ES: -

Clients

"hits": [
  {
    "_index": "customers_with_addresses",
    "_type": "_doc",
    "_id": "1",
    "_score": 1.3278645,
    "_source": {
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com",
      "addresses": [{
        "street": "3183 Moore Avenue",
        "city": "Euless",
        "state": "Texas",
        "zip": "76036",
        "type": "SHIPPING"
      }, {
        "street": "2389 Hidden Valley Road",
        "city": "Harrisburg",
        "state": "Pennsylvania",
        "zip": "17116",
        "type": "BILLING"
      }],
    }
  }, ….

Addresses

+----+-------------+---------------------------+------------+--------------+-------+----------+
| id | customer_id | street                    | city       | state        | zip   | type     |
+----+-------------+---------------------------+------------+--------------+-------+----------+
| 10 |        1001 | 3183 Moore Avenue         | Euless     | Texas        | 76036 | SHIPPING |
| 11 |        1001 | 2389 Hidden Valley Road   | Harrisburg | Pennsylvania | 17116 | BILLING  |
| 12 |        1002 | 281 Riverside Drive       | Augusta    | Georgia      | 30901 | BILLING  |
| 13 |        1003 | 3787 Brownton Road        | Columbus   | Mississippi  | 39701 | SHIPPING |
| 14 |        1003 | 2458 Lost Creek Road      | Bethlehem  | Pennsylvania | 18018 | SHIPPING |
| 15 |        1003 | 4800 Simpson Square       | Hillsdale  | Oklahoma     | 73743 | BILLING  |
| 16 |        1004 | 1289 University Hill Road | Canehill   | Arkansas     | 72717 | LIVING   |
+----+-------------+---------------------------+------------+--------------+-------+----------+

Index Elasticsearch de sortie

+------+------------+----------------+-----------------------+
| id   | first_name | last_name      | email                 |
+------+------------+----------------+-----------------------+
| 1001 | Sally      | Thomas         | sally.thomas@acme.com |
| 1002 | George     | Bailey         | gbailey@foobar.com    |
| 1003 | Edward     | Davidson       | ed@walker.com         |
| 1004 | Anne       | Kim            | annek@noanswer.org    |
+------+------------+----------------+-----------------------+

Les données de table proviennent de sujets Debezium, am Je corrige en pensant que j'ai besoin d'un peu de Java au milieu pour rejoindre les flux, le publier dans un nouveau sujet qui le transforme ensuite en ES?

Quelqu'un aurait-il un exemple de code de cela?

Merci.


2 commentaires

Doit-il être en Java? KSQL fonctionnerait ici aussi, je peux poster un exemple si cela vous intéresse.


Salut Robin - Je serais très intéressé par KSQL aussi, j'ai juste pensé que Java était la façon dont je devrais le faire! :)


3 Réponses :


3
votes

Oui, vous pouvez implémenter la solution à l'aide de l'API Kafka streams en java de la manière suivante.

  1. Consommez les sujets sous forme de flux.
  2. Regroupez le flux d'adresses dans une liste à l'aide de l'ID client et convertissez le flux en table.
  3. Rejoignez le flux client avec la table d'adresses

Voici l'exemple ci-dessous (considérant que les données sont consommées au format json):

KStream<String,JsonNode> customers = builder.stream("customer", Consumed.with(stringSerde, jsonNodeSerde));
KStream<String,JsonNode> addresses = builder.stream("address", Consumed.with(stringSerde, jsonNodeSerde));

// Select the customer ID as key in order to join with address. 
KStream<String,JsonNode> customerRekeyed = customers.selectKey(value-> value.get("id").asText());

ObjectMapper mapper = new ObjectMapper();    
// Select Customer_id as key to aggregate the addresses and join with customer
KTable<String,JsonNode> addressTable = addresses
        .selectKey(value-> value.get("customer_id").asText())
        .groupByKey()
        .aggregate(() ->mapper::createObjectNode,  //initializer
                   (key,value,aggregate) -> aggregate.add(value),
                 Materialized.with(stringSerde, jsonNodeSerde)
         );  //adder

// Join Customer Stream with Address Table
KStream<String,JsonNode> customerAddressStream = customerRekeyed.leftJoin(addressTable,
               (left,right) -> {
                      ObjectNode finalNode = mapper.createObjectNode();
                      ArrayList addressList = new ArrayList<JsonNode>();
                      // Considering the address is arrayNode
                      ((ArrayNode)right).elements().forEachRemaining(addressList ::add);
                      left.putArray("addresses").allAll(addressList);              
                      return left;
               },Joined.keySerde(stringSerde).withValueSerde(jsonNodeSerde));

Vous pouvez consulter les détails sur tous les types de jointures ici: p>

https: //docs.confluent .io / current / streams / developer-guide / dsl-api.html # rejoindre


2 commentaires

Merci Nishu! Cela clarifie les choses pour moi ... alors je suppose que je diffuserais simplement customerAddressStream vers un autre sujet pour sombrer dans ES via Kafka connect?


Oui exactement. :) '



5
votes

En fonction de la rigueur de votre exigence pour imbriquer plusieurs adresses dans un nœud client, vous pouvez le faire dans KSQL (qui est construit sur Kafka Streams).

Remplissez certaines données de test dans Kafka (ce qui dans votre cas est déjà fait via Debezium):

$ curl -s http://localhost:9200/customers_with_addresses/_search | jq '.hits.hits[0]'
{
  "_index": "customers_with_addresses",
  "_type": "type.name=kafkaconnect",
  "_id": "CUSTOMERS_WITH_ADDRESSES+0+2",
  "_score": 1,
  "_source": {
    "ZIP": "76505",
    "CITY": "Temple",
    "ADDRESS_TYPE": "LIVING",
    "CUSTOMER_ID": 1002,
    "FULL_NAME": "Rebeca Kerrod",
    "STATE": "Texas",
    "STREET": "55795 Derek Avenue",
    "LAST_NAME": "Kerrod",
    "FIRST_NAME": "Rebeca"
  }
}

Lancez KSQL et pour commencer, inspectez simplement les données:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d '{
      "name": "sink-elastic-customers_with_addresses-00",
      "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "topics": "CUSTOMERS_WITH_ADDRESSES",
        "connection.url": "http://elasticsearch:9200",
        "type.name": "type.name=kafkaconnect",
        "key.ignore": "true",
        "schema.ignore": "true",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
      }
    }'

Maintenant nous déclarons un STREAM (sujet Kafka + schéma) sur les données afin de pouvoir les manipuler davantage:

ksql> SELECT * FROM CUSTOMERS_WITH_ADDRESSES WHERE CUSTOMER_ID=1002;
1558519823351 | 1002 | 1002 | Rebeca Kerrod | Rebeca | LIVING | Kerrod | 55795 Derek Avenue | Temple | Texas | 76505
1558519823351 | 1002 | 1002 | Rebeca Kerrod | Rebeca | SHIPPING | Kerrod | 164 Continental Plaza | Modesto | California | 95354

Nous allons modéliser le clients en tant que TABLE , et pour ce faire, les messages Kafka doivent être saisis correctement (et au moment où ils ont des clés nulles, comme le montre le " ROWKEY ":" null " dans la sortie PRINT ci-dessus). Vous pouvez configurer Debezium pour définir la clé de message afin que cette étape ne soit pas nécessaire pour vous dans KSQL:

ksql> DESCRIBE CUSTOMERS_WITH_ADDRESSES;

Name                 : CUSTOMERS_WITH_ADDRESSES
 Field        | Type
------------------------------------------
 ROWTIME      | BIGINT           (system)
 ROWKEY       | VARCHAR(STRING)  (system)
 CUSTOMER_ID  | INTEGER          (key)
 FULL_NAME    | VARCHAR(STRING)
 FIRST_NAME   | VARCHAR(STRING)
 ADDRESS_TYPE | VARCHAR(STRING)
 LAST_NAME    | VARCHAR(STRING)
 STREET       | VARCHAR(STRING)
 CITY         | VARCHAR(STRING)
 STATE        | VARCHAR(STRING)
 ZIP          | VARCHAR(STRING)
------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

Nous déclarons maintenant un TABLE ( state pour une clé donnée, instanciée à partir d'un sujet Kafka + schéma):

ksql> SHOW STREAMS;

 Stream Name                              | Kafka Topic                          | Format
------------------------------------------------------------------------------------------
 CUSTOMERS_KEYED                          | CUSTOMERS_KEYED                      | JSON
 ADDRESSES_RAW                            | addresses                            | JSON
 CUSTOMERS_RAW                            | customers                            | JSON
 CUSTOMERS_WITH_ADDRESSES                 | CUSTOMERS_WITH_ADDRESSES             | JSON

Nous pouvons maintenant joindre les données:

ksql> CREATE STREAM customers_with_addresses AS 
      SELECT CUSTOMER_ID, 
             FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, 
             FIRST_NAME, 
             LAST_NAME, 
             TYPE AS ADDRESS_TYPE, 
             STREET, 
             CITY, 
             STATE, 
             ZIP 
        FROM ADDRESSES_RAW A 
             INNER JOIN CUSTOMER C 
             ON A.CUSTOMER_ID = C.ID;

 Message
----------------------------
 Stream created and running
----------------------------

Ceci crée un nouveau KSQL STREAM qui à son tour alimente un nouveau sujet Kafka.

ksql> CREATE TABLE CUSTOMER (ID INT, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR) WITH (KAFKA_TOPIC='CUSTOMERS_KEYED', VALUE_FORMAT='JSON', KEY='ID');

 Message
---------------
 Table created
---------------

Le flux a un schéma:

ksql> CREATE STREAM CUSTOMERS_KEYED WITH (PARTITIONS=1) AS SELECT * FROM CUSTOMERS_RAW PARTITION BY ID;

 Message
----------------------------
 Stream created and running
----------------------------

Nous pouvons interroger le flux:

ksql> CREATE STREAM addresses_RAW (ID INT, CUSTOMER_ID INT, STREET VARCHAR, CITY VARCHAR, STATE VARCHAR, ZIP VARCHAR, TYPE VARCHAR) WITH (KAFKA_TOPIC='addresses', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

ksql> CREATE STREAM customers_RAW (ID INT, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR) WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

Nous pouvons également le diffuser sur Elasticsearch en utilisant Kafka Connect:

ksql> PRINT 'addresses' FROM BEGINNING ;
Format:JSON
{"ROWTIME":1558519823351,"ROWKEY":"null","id":1,"customer_id":1004,"street":"8 Moulton Center","city":"Bronx","state":"New York","zip":"10474","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":2,"customer_id":1001,"street":"5 Hollow Ridge Alley","city":"Washington","state":"District of Columbia","zip":"20016","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":3,"customer_id":1000,"street":"58 Maryland Point","city":"Greensboro","state":"North Carolina","zip":"27404","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":4,"customer_id":1002,"street":"55795 Derek Avenue","city":"Temple","state":"Texas","zip":"76505","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":5,"customer_id":1002,"street":"164 Continental Plaza","city":"Modesto","state":"California","zip":"95354","type":"SHIPPING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":6,"customer_id":1004,"street":"6 Miller Road","city":"Louisville","state":"Kentucky","zip":"40205","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":7,"customer_id":1003,"street":"97 Shasta Place","city":"Pittsburgh","state":"Pennsylvania","zip":"15286","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":8,"customer_id":1000,"street":"36 Warbler Circle","city":"Memphis","state":"Tennessee","zip":"38109","type":"SHIPPING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":9,"customer_id":1001,"street":"890 Eagan Circle","city":"Saint Paul","state":"Minnesota","zip":"55103","type":"SHIPPING"}
{"ROWTIME":1558519823354,"ROWKEY":"null","id":10,"customer_id":1000,"street":"8 Judy Terrace","city":"Washington","state":"District of Columbia","zip":"20456","type":"SHIPPING"}
^C
Topic printing ceased

ksql>
ksql> PRINT 'customers' FROM BEGINNING;
Format:JSON
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1001,"first_name":"Jolee","last_name":"Handasyde","email":"jhandasyde0@nhs.uk"}
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1002,"first_name":"Rebeca","last_name":"Kerrod","email":"rkerrod1@sourceforge.net"}
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1003,"first_name":"Bobette","last_name":"Brumble","email":"bbrumble2@cdc.gov"}
{"ROWTIME":1558519852368,"ROWKEY":"null","id":1004,"first_name":"Royal","last_name":"De Biaggi","email":"rdebiaggi3@opera.com"}

Résultat:

$ curl -s "https://api.mockaroo.com/api/ffa9ff20?count=10&key=ff7856d0" | kafkacat -b localhost:9092 -t addresses -P

$ curl -s "https://api.mockaroo.com/api/9b868890?count=4&key=ff7856d0" | kafkacat -b localhost:9092 -t customers -P

p >


4 commentaires

Concernant "Vous pouvez configurer Debezium pour définir la clé de message": c'est en fait toujours le cas. La clé de message représentera la ou les colonnes de clé primaire de la table capturée; ou, s'il n'y en a pas, selon le connecteur, la première clé unique de la table.


@Robin, à quoi cela ressemblerait avec KSQL si nous voulions créer un seul document contenant un client et toutes ses adresses (c'est-à-dire un tableau intégré). Je pense que c'est la représentation préférée lors de la transmission de telles données à ES. C'est ce que nous avons essayé avec le billet de blog que j'ai lié et c'est là que les choses se compliquent.


@Gunnar oui, ce n'est pas possible dans KSQL… pour le moment :) Voir github.com/confluentinc/ksql/issues / 2147


Ai-je bien compris pour rejoindre deux flux, nous devons rejoindre les flux par clé commune? Quel flux ID un est le retard?



1
votes

Nous avons créé une démo et article de blog sur ce cas d'utilisation (diffusion d'agrégats vers Elasticsearch) il y a quelque temps sur le blog Debezium.

Un problème à garder à l'esprit est que cette solution (basée sur Kafka Streams, mais je pense que c'est la même chose pour KSQL) est susceptible d'exposer les résultats de jointure intermédiaire. Par exemple. Supposons que vous insérez un client et 10 adresses en une seule transaction. L'approche de jonction de flux peut d'abord produire un agrégat du client et de ses cinq premières adresses et peu de temps après l'agrégat complet avec les 10 adresses. Cela peut être souhaitable ou non pour votre cas d'utilisation spécifique. Je me souviens également que la gestion des suppressions n'est pas triviale (par exemple, si vous supprimez l'une des 10 adresses, vous devrez donc produire à nouveau l'agrégat avec les 9 adresses restantes qui auraient pu être intactes, cependant).

Une alternative à considérer peut être le modèle de boîte d'envoi où vous produisiez essentiellement un événement explicite avec le précalcul agrégé à partir de votre application elle-même. C'est à dire. cela nécessite un peu d'aide de l'application, mais cela évite les subtilités de produire ce résultat de jointure après coup.


0 commentaires