Hi, Neha
> 1. Is there a plan to extend filesystem connector to support lookup on
the file in the way I mentioned above? Or if it already does, I would
really appreciate if you could point me to some example or documentation.
According to my understanding, there are no plans for community to support
it at this time. I think you try to extend.

Best,
Ron

Neha Rawat <n.ra...@netwitness.com> 于2023年5月26日周五 09:29写道:

> Hello,
>
>
>
> I have a usecase where I use flink to execute sql query on data that’s
> flowing in from a kafka topic. The output of this query is written to
> another kafka topic.  The query that I want to execute is  supposed to do a
> lookup on a small csv file (from which entries can be deleted/updated at
> any point in time). I initially used the filesystem connector which worked
> fine until the csv file was updated. On reading more about it I found out
> that flink processes each file just once on job startup and so any updates
> after that aren’t taken care of.
>
>
>
>    1. Is there a plan to extend filesystem connector to support lookup on
>    the file in the way I mentioned above? Or if it already does, I would
>    really appreciate if you could point me to some example or documentation.
>    2. Can you suggest an alternative to this? I did try using a compacted
>    kafka topic (that always keeps the latest records for a key) where I would
>    pass on the records after reading from csv, but even in that case I found
>    that if I update a record in compacted kafka topic such that it does not
>    match the query, I still get the matched output. Is it perhaps matching
>    against stale/cached data?
>
>
>
> Example of the tables I created and the query-
>
>  Input table ->
>
> "create table input(sessionid string, usern string, ipsrc string, filehash
> string, time_sql_timestamp TIMESTAMP(3), WATERMARK FOR time_sql_timestamp
> AS time_sql_timestamp - INTERVAL '5' SECOND) WITH ( 'connector'\
>
>     \ = 'kafka','topic' = 'input','properties.bootstrap.servers' =
> 'kafka:9092','json.ignore-parse-errors'\
>
>     \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id'
> = 'input','scan.startup.mode'\
>
>     \ = 'latest-offset','format' = 'json');"
>
>
>
> Context Table for lookup ->
>
>    "create table context(id string, username string, blacklisted string,
> update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL
>
>    ) WITH ( 'connector'\
>
>     \ = 'kafka','topic' = 'context', 'properties.bootstrap.servers' =
> 'kafka:9092', 'json.ignore-parse-errors'\
>
>     \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id'
> = 'context','scan.startup.mode'\
>
>     \ = 'earliest-offset', 'format' = 'json');"
>
>
>
> Output Table ->
>
>   "create table output(sessionid string, usern string, ipsrc string,
> filehash string, time_sql_timestamp TIMESTAMP(3)) WITH ( 'connector'\
>
>     \ = 'kafka','topic' = 'output','properties.bootstrap.servers' =
> 'kafka:9092', 'properties.bootstrap.servers' =
> 'kafka:9092','json.ignore-parse-errors'\
>
>    \ = 'true','json.fail-on-missing-field'= 'false','properties.group.id'
> = 'input','scan.startup.mode'\
>
>     \ = 'latest-offset','format' = 'json');"
>
>
>
>
>
> Sql à
>
> "INSERT INTO output WITH input_cte AS (SELECT * FROM TABLE (TUMBLE(TABLE
> input, DESCRIPTOR(time_sql_timestamp), INTERVAL '10' SECOND)), context
> where (usern = context.username) ORDER BY time_sql_timestamp) (SELECT
> i.sessionid, i.usern, i.ipsrc, i.filehash, i.time_sql_timestamp  FROM
> input_cte as i  join context on  i.usern = context.username)"
>
>
>
>
>
> Sample input=
>
> {"sessionid":"101", "usern":"abc","ipsrc":"2.1.1.1","filehash":"hash1",
> "time_sql_timestamp":"2023-04-11 23:08:23.000"}
>
> {"sessionid":"102", "usern":"def","ipsrc":"3.1.1.1","filehash":"hash1",
> "time_sql_timestamp":"2023-04-11 23:08:24.000"}
>
> {"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1",
> "time_sql_timestamp":"2023-04-11 23:08:25.000"}
>
>
>
> Sample context =
>
> {"id" : "1", "username" : "neha1", "blacklisted" : "false"}
>
> {"id" : "2", "username" : "neha2", "blacklisted" : "false"}
>
> {"id" : "3", "username" : "neha3", "blacklisted" : "false"}
>
> {"id" : "4", "username" : "neha", "blacklisted" : "false"}
>
>
>
>
>
> First I get record in output as we do have username=neha in context.
>
> {"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1",
> "time_sql_timestamp":"2023-04-11 23:08:25.000"}
>
>
>
> But when I update {"id" : "4", "username" : "neha", "blacklisted" :
> "false"} to {"id" : "4", "username" : "some-other-value", "blacklisted" :
> "false"}, even then I get same result in output –
>
> {"sessionid":"100", "usern":"neha","ipsrc":"1.1.1.1","filehash":"hash1",
> "time_sql_timestamp":"2023-04-11 23:08:25.000"}
>
>
>
>
>
> Its only after restarting the job that I do not get the output due to the
> updated record in context.
>
>
>
>
>
>
>
> Thanks,
>
> Neha
>
>
>
>
>
>
> Caution: External email. Do not click or open attachments unless you know
> and trust the sender.
>

Reply via email to