Hi, Neha > 1. Is there a plan to extend filesystem connector to support lookup on the file in the way I mentioned above? Or if it already does, I would really appreciate if you could point me to some example or documentation. According to my understanding, there are no plans for community to support it at this time. I think you try to extend.
Best, Ron Neha Rawat <n.ra...@netwitness.com> 于2023年5月26日周五 09:29写道: > Hello, > > > > I have a usecase where I use flink to execute sql query on data that’s > flowing in from a kafka topic. The output of this query is written to > another kafka topic. The query that I want to execute is supposed to do a > lookup on a small csv file (from which entries can be deleted/updated at > any point in time). I initially used the filesystem connector which worked > fine until the csv file was updated. On reading more about it I found out > that flink processes each file just once on job startup and so any updates > after that aren’t taken care of. > > > > 1. Is there a plan to extend filesystem connector to support lookup on > the file in the way I mentioned above? Or if it already does, I would > really appreciate if you could point me to some example or documentation. > 2. Can you suggest an alternative to this? I did try using a compacted > kafka topic (that always keeps the latest records for a key) where I would > pass on the records after reading from csv, but even in that case I found > that if I update a record in compacted kafka topic such that it does not > match the query, I still get the matched output. Is it perhaps matching > against stale/cached data? > > > > Example of the tables I created and the query- > > Input table -> > > "create table input(sessionid string, usern string, ipsrc string, filehash > string, time_sql_timestamp TIMESTAMP(3), WATERMARK FOR time_sql_timestamp > AS time_sql_timestamp - INTERVAL '5' SECOND) WITH ( 'connector'\ > > \ = 'kafka','topic' = 'input','properties.bootstrap.servers' = > 'kafka:9092','json.ignore-parse-errors'\ > > \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id' > = 'input','scan.startup.mode'\ > > \ = 'latest-offset','format' = 'json');" > > > > Context Table for lookup -> > > "create table context(id string, username string, blacklisted string, > update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL > > ) WITH ( 'connector'\ > > \ = 'kafka','topic' = 'context', 'properties.bootstrap.servers' = > 'kafka:9092', 'json.ignore-parse-errors'\ > > \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id' > = 'context','scan.startup.mode'\ > > \ = 'earliest-offset', 'format' = 'json');" > > > > Output Table -> > > "create table output(sessionid string, usern string, ipsrc string, > filehash string, time_sql_timestamp TIMESTAMP(3)) WITH ( 'connector'\ > > \ = 'kafka','topic' = 'output','properties.bootstrap.servers' = > 'kafka:9092', 'properties.bootstrap.servers' = > 'kafka:9092','json.ignore-parse-errors'\ > > \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id' > = 'input','scan.startup.mode'\ > > \ = 'latest-offset','format' = 'json');" > > > > > > Sql à > > "INSERT INTO output WITH input_cte AS (SELECT * FROM TABLE (TUMBLE(TABLE > input, DESCRIPTOR(time_sql_timestamp), INTERVAL '10' SECOND)), context > where (usern = context.username) ORDER BY time_sql_timestamp) (SELECT > i.sessionid, i.usern, i.ipsrc, i.filehash, i.time_sql_timestamp FROM > input_cte as i join context on i.usern = context.username)" > > > > > > Sample input= > > {"sessionid":"101", "usern":"abc","ipsrc":"2.1.1.1","filehash":"hash1", > "time_sql_timestamp":"2023-04-11 23:08:23.000"} > > {"sessionid":"102", "usern":"def","ipsrc":"3.1.1.1","filehash":"hash1", > "time_sql_timestamp":"2023-04-11 23:08:24.000"} > > {"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1", > "time_sql_timestamp":"2023-04-11 23:08:25.000"} > > > > Sample context = > > {"id" : "1", "username" : "neha1", "blacklisted" : "false"} > > {"id" : "2", "username" : "neha2", "blacklisted" : "false"} > > {"id" : "3", "username" : "neha3", "blacklisted" : "false"} > > {"id" : "4", "username" : "neha", "blacklisted" : "false"} > > > > > > First I get record in output as we do have username=neha in context. > > {"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1", > "time_sql_timestamp":"2023-04-11 23:08:25.000"} > > > > But when I update {"id" : "4", "username" : "neha", "blacklisted" : > "false"} to {"id" : "4", "username" : "some-other-value", "blacklisted" : > "false"}, even then I get same result in output – > > {"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1", > "time_sql_timestamp":"2023-04-11 23:08:25.000"} > > > > > > Its only after restarting the job that I do not get the output due to the > updated record in context. > > > > > > > > Thanks, > > Neha > > > > > > > Caution: External email. Do not click or open attachments unless you know > and trust the sender. >