Hello, I have a usecase where I use flink to execute sql query on data that's flowing in from a kafka topic. The output of this query is written to another kafka topic. The query that I want to execute is supposed to do a lookup on a small csv file (from which entries can be deleted/updated at any point in time). I initially used the filesystem connector which worked fine until the csv file was updated. On reading more about it I found out that flink processes each file just once on job startup and so any updates after that aren't taken care of.
1. Is there a plan to extend filesystem connector to support lookup on the file in the way I mentioned above? Or if it already does, I would really appreciate if you could point me to some example or documentation. 2. Can you suggest an alternative to this? I did try using a compacted kafka topic (that always keeps the latest records for a key) where I would pass on the records after reading from csv, but even in that case I found that if I update a record in compacted kafka topic such that it does not match the query, I still get the matched output. Is it perhaps matching against stale/cached data? Example of the tables I created and the query- Input table -> "create table input(sessionid string, usern string, ipsrc string, filehash string, time_sql_timestamp TIMESTAMP(3), WATERMARK FOR time_sql_timestamp AS time_sql_timestamp - INTERVAL '5' SECOND) WITH ( 'connector'\ \ = 'kafka','topic' = 'input','properties.bootstrap.servers' = 'kafka:9092','json.ignore-parse-errors'\ \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id' = 'input','scan.startup.mode'\ \ = 'latest-offset','format' = 'json');" Context Table for lookup -> "create table context(id string, username string, blacklisted string, update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL ) WITH ( 'connector'\ \ = 'kafka','topic' = 'context', 'properties.bootstrap.servers' = 'kafka:9092', 'json.ignore-parse-errors'\ \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id' = 'context','scan.startup.mode'\ \ = 'earliest-offset', 'format' = 'json');" Output Table -> "create table output(sessionid string, usern string, ipsrc string, filehash string, time_sql_timestamp TIMESTAMP(3)) WITH ( 'connector'\ \ = 'kafka','topic' = 'output','properties.bootstrap.servers' = 'kafka:9092', 'properties.bootstrap.servers' = 'kafka:9092','json.ignore-parse-errors'\ \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id' = 'input','scan.startup.mode'\ \ = 'latest-offset','format' = 'json');" Sql --> "INSERT INTO output WITH input_cte AS (SELECT * FROM TABLE (TUMBLE(TABLE input, DESCRIPTOR(time_sql_timestamp), INTERVAL '10' SECOND)), context where (usern = context.username) ORDER BY time_sql_timestamp) (SELECT i.sessionid, i.usern, i.ipsrc, i.filehash, i.time_sql_timestamp FROM input_cte as i join context on i.usern = context.username)" Sample input= {"sessionid":"101", "usern":"abc","ipsrc":"2.1.1.1","filehash":"hash1", "time_sql_timestamp":"2023-04-11 23:08:23.000"} {"sessionid":"102", "usern":"def","ipsrc":"3.1.1.1","filehash":"hash1", "time_sql_timestamp":"2023-04-11 23:08:24.000"} {"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1", "time_sql_timestamp":"2023-04-11 23:08:25.000"} Sample context = {"id" : "1", "username" : "neha1", "blacklisted" : "false"} {"id" : "2", "username" : "neha2", "blacklisted" : "false"} {"id" : "3", "username" : "neha3", "blacklisted" : "false"} {"id" : "4", "username" : "neha", "blacklisted" : "false"} First I get record in output as we do have username=neha in context. {"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1", "time_sql_timestamp":"2023-04-11 23:08:25.000"} But when I update {"id" : "4", "username" : "neha", "blacklisted" : "false"} to {"id" : "4", "username" : "some-other-value", "blacklisted" : "false"}, even then I get same result in output - {"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1", "time_sql_timestamp":"2023-04-11 23:08:25.000"} Its only after restarting the job that I do not get the output due to the updated record in context. Thanks, Neha Caution: External email. Do not click or open attachments unless you know and trust the sender.