Hello,

I have a usecase where I use flink to execute sql query on data that's flowing 
in from a kafka topic. The output of this query is written to another kafka 
topic.  The query that I want to execute is  supposed to do a lookup on a small 
csv file (from which entries can be deleted/updated at any point in time). I 
initially used the filesystem connector which worked fine until the csv file 
was updated. On reading more about it I found out that flink processes each 
file just once on job startup and so any updates after that aren't taken care 
of.


  1.  Is there a plan to extend filesystem connector to support lookup on the 
file in the way I mentioned above? Or if it already does, I would really 
appreciate if you could point me to some example or documentation.
  2.  Can you suggest an alternative to this? I did try using a compacted kafka 
topic (that always keeps the latest records for a key) where I would pass on 
the records after reading from csv, but even in that case I found that if I 
update a record in compacted kafka topic such that it does not match the query, 
I still get the matched output. Is it perhaps matching against stale/cached 
data?



Example of the tables I created and the query-

 Input table ->

"create table input(sessionid string, usern string, ipsrc string, filehash 
string, time_sql_timestamp TIMESTAMP(3), WATERMARK FOR time_sql_timestamp AS 
time_sql_timestamp - INTERVAL '5' SECOND) WITH ( 'connector'\

    \ = 'kafka','topic' = 'input','properties.bootstrap.servers' = 
'kafka:9092','json.ignore-parse-errors'\

    \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id' = 
'input','scan.startup.mode'\

    \ = 'latest-offset','format' = 'json');"



Context Table for lookup ->

   "create table context(id string, username string, blacklisted string, 
update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL

   ) WITH ( 'connector'\

    \ = 'kafka','topic' = 'context', 'properties.bootstrap.servers' = 
'kafka:9092', 'json.ignore-parse-errors'\

    \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id' = 
'context','scan.startup.mode'\

    \ = 'earliest-offset', 'format' = 'json');"



Output Table ->

  "create table output(sessionid string, usern string, ipsrc string, filehash 
string, time_sql_timestamp TIMESTAMP(3)) WITH ( 'connector'\

    \ = 'kafka','topic' = 'output','properties.bootstrap.servers' = 
'kafka:9092', 'properties.bootstrap.servers' = 
'kafka:9092','json.ignore-parse-errors'\

   \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id' = 
'input','scan.startup.mode'\

    \ = 'latest-offset','format' = 'json');"





Sql -->

"INSERT INTO output WITH input_cte AS (SELECT * FROM TABLE (TUMBLE(TABLE input, 
DESCRIPTOR(time_sql_timestamp), INTERVAL '10' SECOND)), context where (usern = 
context.username) ORDER BY time_sql_timestamp) (SELECT i.sessionid, i.usern, 
i.ipsrc, i.filehash, i.time_sql_timestamp  FROM input_cte as i  join context on 
 i.usern = context.username)"





Sample input=

{"sessionid":"101", "usern":"abc","ipsrc":"2.1.1.1","filehash":"hash1", 
"time_sql_timestamp":"2023-04-11 23:08:23.000"}

{"sessionid":"102", "usern":"def","ipsrc":"3.1.1.1","filehash":"hash1", 
"time_sql_timestamp":"2023-04-11 23:08:24.000"}

{"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1", 
"time_sql_timestamp":"2023-04-11 23:08:25.000"}



Sample context =

{"id" : "1", "username" : "neha1", "blacklisted" : "false"}

{"id" : "2", "username" : "neha2", "blacklisted" : "false"}

{"id" : "3", "username" : "neha3", "blacklisted" : "false"}

{"id" : "4", "username" : "neha", "blacklisted" : "false"}





First I get record in output as we do have username=neha in context.

{"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1", 
"time_sql_timestamp":"2023-04-11 23:08:25.000"}



But when I update {"id" : "4", "username" : "neha", "blacklisted" : "false"} to 
{"id" : "4", "username" : "some-other-value", "blacklisted" : "false"}, even 
then I get same result in output -

{"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1", 
"time_sql_timestamp":"2023-04-11 23:08:25.000"}





Its only after restarting the job that I do not get the output due to the 
updated record in context.







Thanks,

Neha





Caution: External email. Do not click or open attachments unless you know and 
trust the sender.

Reply via email to