HI ,
Assuming that your looking for streaming   use case , i think this is a
better approach

   1. Send Avro from logstash  ,better performance.
   2. Deserialize it to POJO .
   3. Do logic...




On Mon, Apr 23, 2018 at 4:03 PM, Lehuede sebastien <lehued...@gmail.com>
wrote:

> Hi Guys,
>
> I'm actually trying to understand the purpose of Table and in particular
> KafkaJsonTableSource. I try to see if for my use case ths can be usefull.
>
> Here is my context :
>
> I send logs on logstash, i add some information (Type, Tags), Logstash
> send logs to Kafka in JSON format and finally i use Flink-Connector-Kafka
> to read from Kafka and parse the logs.
>
>
> Before any processing events from Kafka to Flink look like this :
>
> *{"message":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2;
> proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port:
> 32769","@timestamp":"2018-04-20T14:47:35.285Z","host":"FW"}*
>
> Then i use "JSONDeserializationSchema" to deserialize events :
>
> *FlinkKafkaConsumer011<ObjectNode> kafkaConsumer = new
> FlinkKafkaConsumer011<>("Firewall",new
> JSONDeserializationSchema(),properties);*
>
> I take the value of the key "message" :
>
> *public String map(ObjectNode value) throws Exception {*
> *                                String message =
> value.get("message").asText();*
>
> Then parse it with Java Regex and put each match group in a String/Int/...
> :
>
> action : accept
> service_id : doamin-udp
> src_ip : 1.1.1.1
> dst_ip : 2.2.2.2
> .....
>
> Now i want to replace "message" key by "rawMessage" and put each match
> group in JSON object to obain the final result :
>
> *{"rawMessage":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2;
> proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769",*
> *"@timestamp":"2018-04-20T14:47:35.285Z",*
> *"host":"FW",*
> *"type":"firewall",*
> *"tags":["Checkpoint"],*
> *"action":"accept",*
> *"service_id":"domain-udp",*
> *"src_ip":"1.1.1.1",*
> *"dst_ip":"2.2.2.2",*
> *...}*
>
> I'm a newbie with Streaming Application technologies, with Flink, and for
> the moment i still discover how it works and what are the different
> fonctionnalities. But when i was looking for a solution to obtain my final
> result, i came across KafkaJsonTableSource.
>
> Does anyone think this can be a good solution for my use case ?
>
> I think i will be able to store JSON from Kafka, process data then modify
> the table and send data to another Kafka, is it correct ?
>
> Regards,
> Sebastien
>
>
>

Reply via email to