Hi Guys,

I'm actually trying to understand the purpose of Table and in particular
KafkaJsonTableSource. I try to see if for my use case ths can be usefull.

Here is my context :

I send logs on logstash, i add some information (Type, Tags), Logstash send
logs to Kafka in JSON format and finally i use Flink-Connector-Kafka to
read from Kafka and parse the logs.


Before any processing events from Kafka to Flink look like this :

*{"message":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2;
proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port:
32769","@timestamp":"2018-04-20T14:47:35.285Z","host":"FW"}*

Then i use "JSONDeserializationSchema" to deserialize events :

*FlinkKafkaConsumer011<ObjectNode> kafkaConsumer = new
FlinkKafkaConsumer011<>("Firewall",new
JSONDeserializationSchema(),properties);*

I take the value of the key "message" :

*public String map(ObjectNode value) throws Exception {*
*                                String message =
value.get("message").asText();*

Then parse it with Java Regex and put each match group in a String/Int/... :

action : accept
service_id : doamin-udp
src_ip : 1.1.1.1
dst_ip : 2.2.2.2
.....

Now i want to replace "message" key by "rawMessage" and put each match
group in JSON object to obain the final result :

*{"rawMessage":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2;
proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769",*
*"@timestamp":"2018-04-20T14:47:35.285Z",*
*"host":"FW",*
*"type":"firewall",*
*"tags":["Checkpoint"],*
*"action":"accept",*
*"service_id":"domain-udp",*
*"src_ip":"1.1.1.1",*
*"dst_ip":"2.2.2.2",*
*...}*

I'm a newbie with Streaming Application technologies, with Flink, and for
the moment i still discover how it works and what are the different
fonctionnalities. But when i was looking for a solution to obtain my final
result, i came across KafkaJsonTableSource.

Does anyone think this can be a good solution for my use case ?

I think i will be able to store JSON from Kafka, process data then modify
the table and send data to another Kafka, is it correct ?

Regards,
Sebastien

Reply via email to