HI , Assuming that your looking for streaming use case , i think this is a better approach
1. Send Avro from logstash ,better performance. 2. Deserialize it to POJO . 3. Do logic... On Mon, Apr 23, 2018 at 4:03 PM, Lehuede sebastien <lehued...@gmail.com> wrote: > Hi Guys, > > I'm actually trying to understand the purpose of Table and in particular > KafkaJsonTableSource. I try to see if for my use case ths can be usefull. > > Here is my context : > > I send logs on logstash, i add some information (Type, Tags), Logstash > send logs to Kafka in JSON format and finally i use Flink-Connector-Kafka > to read from Kafka and parse the logs. > > > Before any processing events from Kafka to Flink look like this : > > *{"message":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2; > proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: > 32769","@timestamp":"2018-04-20T14:47:35.285Z","host":"FW"}* > > Then i use "JSONDeserializationSchema" to deserialize events : > > *FlinkKafkaConsumer011<ObjectNode> kafkaConsumer = new > FlinkKafkaConsumer011<>("Firewall",new > JSONDeserializationSchema(),properties);* > > I take the value of the key "message" : > > *public String map(ObjectNode value) throws Exception {* > * String message = > value.get("message").asText();* > > Then parse it with Java Regex and put each match group in a String/Int/... > : > > action : accept > service_id : doamin-udp > src_ip : 1.1.1.1 > dst_ip : 2.2.2.2 > ..... > > Now i want to replace "message" key by "rawMessage" and put each match > group in JSON object to obain the final result : > > *{"rawMessage":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2; > proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769",* > *"@timestamp":"2018-04-20T14:47:35.285Z",* > *"host":"FW",* > *"type":"firewall",* > *"tags":["Checkpoint"],* > *"action":"accept",* > *"service_id":"domain-udp",* > *"src_ip":"1.1.1.1",* > *"dst_ip":"2.2.2.2",* > *...}* > > I'm a newbie with Streaming Application technologies, with Flink, and for > the moment i still discover how it works and what are the different > fonctionnalities. But when i was looking for a solution to obtain my final > result, i came across KafkaJsonTableSource. > > Does anyone think this can be a good solution for my use case ? > > I think i will be able to store JSON from Kafka, process data then modify > the table and send data to another Kafka, is it correct ? > > Regards, > Sebastien > > >