Re:How can we read checkpoint data for debugging state
Hi, FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint data[1] [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data -- Best! Xuyang At 2025-02-21 12:11:02, "Sachin Mittal" wrote: Hi, So I have a flink application which stores state (RocksDB state backend) in S3 with the following directory structure: s3://{bucket}/flink-checkpoints/ /{job-id}|+ --shared/ + --taskowned/ + --chk-/ I have my job pipeline defined like: finalDataStreamevents = env.fromSource(..., "Src"); SingleOutputStreamOperator statsData = events .keyBy(newMyKeySelector()) .process(newMyStatsProcessor(), Types.POJO(StatsData.class)) .name("Proc"); statsData .addSink(newMySink<>(...)) .name("Sink"); env.execute("Exec"); The MyStatsProcessor has keyed states defined as: state1 = getRuntimeContext().getState(newValueStateDescriptor<>("state1", Types.POJO(StateOne.class))); state2 = getRuntimeContext().getState(newValueStateDescriptor<>("state2", Types.POJO(StateTwo.class))); So my question is how can I read any checkpoint state. I see this API flink-state-processor-api. Can I use the same here, if so how do I instantiate it: StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();SavepointReadersavepoint=SavepointReader.read(env,"s3://{bucket}/flink-checkpoints/{job-id}", ); Is this the correct way to use this API for reading a checkpoint ? Please note I have also enabled: state.backend.incremental: 'true' state.backend.local-recovery: 'true' state.backend.changelog.enabled: 'true' state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://{bucket}/changelog dstl.dfs.compression.enabled: 'true' Now after say I am able to create the reader how do I inspect a particular keyed state. I see a function called readKeyedState but I am unsure as to what uuid I need to pass to read a particular state? Would something like this work: DataStream keyedState = savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), newReaderFunction()); And now in my KeyedState class, I can access state1 and state2. Would this work? Please let me know if I am on the right track or this is something not possible to read checkpointed states via any external application for debugging. Thanks Sachin
Re: FlinkSQL LAG, strange behavior?
Another option would be add an extra field like: ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY *ts, row_number*), ARRAY[-1]), -1) AS prev_zoneIds, But it isn't possible either. El jue, 20 feb 2025 a las 11:48, Guillermo Ortiz Fernández (< guillermo.ortiz.f...@gmail.com>) escribió: > I tried to create a additional field based on ROW_NUMBER but it can't be > used in LAG function. > > The idea was: > WITH ranked AS ( > SELECT > msisdn, > eventTimestamp, > zoneIds, > ts, > TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY > ts), 3) AS ts_ranked > FROM example_2 > ) > SELECT > msisdn, > eventTimestamp, > ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, > ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY > ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds, > ts > FROM ranked; > > RANKED subselect: > msisdn eventTimestampzoneIds >ts row_num ts_ranked >673944959173999638[1] > 2025-02-19 21:19:40.0001 1970-01-01 01:00:00.001 >673944959173999638[1] > 2025-02-19 21:19:40.0002 1970-01-01 01:00:00.002 > > *But [ERROR] Could not execute SQL statement. Reason:* > *org.apache.flink.table.api.TableException: OVER windows' ordering in > stream mode must be defined on a time attribute. * > > I guess just could use "ts" field because if it's the temporal field? > > > El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (< > alessio.berne...@gmail.com>) escribió: > >> Hello, >> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs. >> TIMESTAMP ts i had some not understandable behaviors. >> >> Changing the ts to TIMESTAMP (not LTZ) the identical query works as >> expected. >> So, first all, we changed all the parsing to TIMESTAMP to exclude this >> kind of problems. >> >> If someone is interested i can provide the exact query to reproduce the >> problem we figured out. >> >> Greetings, >> Alessio >> >> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández < >> guillermo.ortiz.f...@gmail.com> wrote: >> >>> I have created a table that reads from a Kafka topic. What I want to do >>> is order the data by eventTime and add a new field that represents the >>> previous value using the LAG function. >>> >>> The problem arises when two records have exactly the same eventTime, >>> which produces a "strange" behavior. >>> >>> >>> CREATE TABLE example ( >>> eventTimestamp BIGINT NOT NULL, >>> msisdn INT NOT NULL, >>> zoneIds ARRAY NOT NULL, >>> ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), >>> `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL, >>> WATERMARK FOR ts AS ts >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'example-offset', >>> 'properties.bootstrap.servers' = '', >>> 'properties.auto.offset.reset' = 'latest', >>> 'scan.startup.mode' = 'latest-offset', >>> 'key.format' = 'raw', >>> 'key.fields' = 'msisdn', >>> 'value.format' = 'avro', >>> 'value.fields-include' = 'ALL', >>> 'scan.watermark.idle-timeout' = '1000', >>> >>> ); >>> >>> >>> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES >>> (173999638, 673944959, ARRAY[1]), >>> (173999638, 673944959, ARRAY[1]); >>> >>> >>> SELECT >>> msisdn, >>> eventTimestamp, >>> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, >>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY >>> ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >>> tsFROM example; >>> >>> *Actual Result:* >>> msisdneventTimestampzoneIdsprev_zoneIdsts >>> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000 >>> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000*Expected >>> Result:* >>> msisdneventTimestampzoneIdsprev_zoneIdsts >>> 673944959 173999638 [1] [ ] 2025-02-19 21:19:40.000 >>> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000 >>> -- >>> *Is this behavior normal?* >>> >>> I am trying to achieve the expected behavior by including the metadata >>> of the offset in the example table and adding it to the OVER clause in >>> the LAG function. However, it seems that Flink does not allow ordering >>> by more than one column: >>> >>> >>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, >>> kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >>> >>> Results in: >>> >>> [ERROR] Could not execute SQL statement. Reason: >>> org.apache.flink.table.api.TableException: The window can only be ordered >>> by a single time column. >>> >>> -- >>> >>> Would you happen to know how to achieve the expected result? >>> >>
Question about large windows and RocksDB state backend
Dear Flink Community, I am working with an application developed using the DataStream API. The application has a window operator using with a large temporal window. Due to the processing done, we are using the ProcessWindowFunction implementation as we need to explicitly buffer all the tuples falling within the boundaries of each triggered window. We are interested in running the application with the RocksDB embedded state backend to reduce the memory footprint. We would like to know if there is information available about how windows are stored and represented in RocksDB, whether windows represent separate objects in RocksDB, if tuples are grouped by key, replicated in case of overlapping windows, and so forth. We understand that this question can be answered by manually inspecting the source code. However, we would be very grateful if someone could share their knowledge on this topic and suggest any relevant documents available online. Unfortunately, the documentation on Flink's website does not provide such low-level details. Thanks a lot, Gabriele
Re: FlinkSQL LAG, strange behavior?
Hello, don't know if it's strictly related, but using a TIMESTAMP_LTZ vs. TIMESTAMP ts i had some not understandable behaviors. Changing the ts to TIMESTAMP (not LTZ) the identical query works as expected. So, first all, we changed all the parsing to TIMESTAMP to exclude this kind of problems. If someone is interested i can provide the exact query to reproduce the problem we figured out. Greetings, Alessio On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández < guillermo.ortiz.f...@gmail.com> wrote: > I have created a table that reads from a Kafka topic. What I want to do is > order the data by eventTime and add a new field that represents the > previous value using the LAG function. > > The problem arises when two records have exactly the same eventTime, > which produces a "strange" behavior. > > > CREATE TABLE example ( > eventTimestamp BIGINT NOT NULL, > msisdn INT NOT NULL, > zoneIds ARRAY NOT NULL, > ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), > `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL, > WATERMARK FOR ts AS ts > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'example-offset', > 'properties.bootstrap.servers' = '', > 'properties.auto.offset.reset' = 'latest', > 'scan.startup.mode' = 'latest-offset', > 'key.format' = 'raw', > 'key.fields' = 'msisdn', > 'value.format' = 'avro', > 'value.fields-include' = 'ALL', > 'scan.watermark.idle-timeout' = '1000', > > ); > > > INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES > (173999638, 673944959, ARRAY[1]), > (173999638, 673944959, ARRAY[1]); > > > SELECT > msisdn, > eventTimestamp, > ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, > ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY > ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, > tsFROM example; > > *Actual Result:* > msisdneventTimestampzoneIdsprev_zoneIdsts > 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000 > 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000*Expected Result:* > msisdneventTimestampzoneIdsprev_zoneIdsts > 673944959 173999638 [1] [ ] 2025-02-19 21:19:40.000 > 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000 > -- > *Is this behavior normal?* > > I am trying to achieve the expected behavior by including the metadata of > the offset in the example table and adding it to the OVER clause in the > LAG function. However, it seems that Flink does not allow ordering by > more than one column: > > > ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, > kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, > > Results in: > > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: The window can only be ordered by > a single time column. > > -- > > Would you happen to know how to achieve the expected result? >
Re: FlinkSQL LAG, strange behavior?
I tried to create a additional field based on ROW_NUMBER but it can't be used in LAG function. The idea was: WITH ranked AS ( SELECT msisdn, eventTimestamp, zoneIds, ts, TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY ts), 3) AS ts_ranked FROM example_2 ) SELECT msisdn, eventTimestamp, ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds, ts FROM ranked; RANKED subselect: msisdn eventTimestampzoneIds ts row_num ts_ranked 673944959173999638[1] 2025-02-19 21:19:40.0001 1970-01-01 01:00:00.001 673944959173999638[1] 2025-02-19 21:19:40.0002 1970-01-01 01:00:00.002 *But [ERROR] Could not execute SQL statement. Reason:* *org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute. * I guess just could use "ts" field because if it's the temporal field? El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (< alessio.berne...@gmail.com>) escribió: > Hello, > don't know if it's strictly related, but using a TIMESTAMP_LTZ vs. > TIMESTAMP ts i had some not understandable behaviors. > > Changing the ts to TIMESTAMP (not LTZ) the identical query works as > expected. > So, first all, we changed all the parsing to TIMESTAMP to exclude this > kind of problems. > > If someone is interested i can provide the exact query to reproduce the > problem we figured out. > > Greetings, > Alessio > > On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández < > guillermo.ortiz.f...@gmail.com> wrote: > >> I have created a table that reads from a Kafka topic. What I want to do >> is order the data by eventTime and add a new field that represents the >> previous value using the LAG function. >> >> The problem arises when two records have exactly the same eventTime, >> which produces a "strange" behavior. >> >> >> CREATE TABLE example ( >> eventTimestamp BIGINT NOT NULL, >> msisdn INT NOT NULL, >> zoneIds ARRAY NOT NULL, >> ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), >> `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL, >> WATERMARK FOR ts AS ts >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'example-offset', >> 'properties.bootstrap.servers' = '', >> 'properties.auto.offset.reset' = 'latest', >> 'scan.startup.mode' = 'latest-offset', >> 'key.format' = 'raw', >> 'key.fields' = 'msisdn', >> 'value.format' = 'avro', >> 'value.fields-include' = 'ALL', >> 'scan.watermark.idle-timeout' = '1000', >> >> ); >> >> >> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES >> (173999638, 673944959, ARRAY[1]), >> (173999638, 673944959, ARRAY[1]); >> >> >> SELECT >> msisdn, >> eventTimestamp, >> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, >> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY >> ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >> tsFROM example; >> >> *Actual Result:* >> msisdneventTimestampzoneIdsprev_zoneIdsts >> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000 >> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000*Expected Result:* >> msisdneventTimestampzoneIdsprev_zoneIdsts >> 673944959 173999638 [1] [ ] 2025-02-19 21:19:40.000 >> 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000 >> -- >> *Is this behavior normal?* >> >> I am trying to achieve the expected behavior by including the metadata of >> the offset in the example table and adding it to the OVER clause in the >> LAG function. However, it seems that Flink does not allow ordering by >> more than one column: >> >> >> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, >> kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >> >> Results in: >> >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.flink.table.api.TableException: The window can only be ordered by >> a single time column. >> >> -- >> >> Would you happen to know how to achieve the expected result? >> >
FlinkSQL LAG, strange behavior?
I have created a table that reads from a Kafka topic. What I want to do is order the data by eventTime and add a new field that represents the previous value using the LAG function. The problem arises when two records have exactly the same eventTime, which produces a "strange" behavior. CREATE TABLE example ( eventTimestamp BIGINT NOT NULL, msisdn INT NOT NULL, zoneIds ARRAY NOT NULL, ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'kafka', 'topic' = 'example-offset', 'properties.bootstrap.servers' = '', 'properties.auto.offset.reset' = 'latest', 'scan.startup.mode' = 'latest-offset', 'key.format' = 'raw', 'key.fields' = 'msisdn', 'value.format' = 'avro', 'value.fields-include' = 'ALL', 'scan.watermark.idle-timeout' = '1000', ); INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES (173999638, 673944959, ARRAY[1]), (173999638, 673944959, ARRAY[1]); SELECT msisdn, eventTimestamp, ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, tsFROM example; *Actual Result:* msisdneventTimestampzoneIdsprev_zoneIdsts 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000*Expected Result:* msisdneventTimestampzoneIdsprev_zoneIdsts 673944959 173999638 [1] [ ] 2025-02-19 21:19:40.000 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000 -- *Is this behavior normal?* I am trying to achieve the expected behavior by including the metadata of the offset in the example table and adding it to the OVER clause in the LAG function. However, it seems that Flink does not allow ordering by more than one column: ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, Results in: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: The window can only be ordered by a single time column. -- Would you happen to know how to achieve the expected result?
Re:Re: Performance problem with FlinkSQL
Hi, There can be various reasons for performance issues, such as: 1. Bottlenecks in the Kafka connector when reading data 2. Time-consuming filter conditions 3. Bottlenecks in the Kafka connector when writing data To further diagnose the problem, you can try: 1. Set the Flink configuration `pipeline.operator-chaining.enabled` to false. This will allow you to see the individual vertexes (source -> calc -> sink) on the Flink UI. Check the entire topology to see if any vertex are experiencing backpressure. 2. If there is no backpressure, it is likely that the bottleneck lies in data reading. Otherwise, the operator following the last one experiencing backpressure is likely the bottleneck (for example, if the source is under backpressure, the calc operator could be the bottleneck, indicating that the filtering logic is CPU-intensive). 3. ``` and the process has been running for several days without failures, although with high CPU and memory usage. ``` If CPU utilization is high, you can also further analyze the CPU flame graph to see what tasks are being processed by that node and share it here. Hope this helpful! -- Best! Xuyang At 2025-02-20 22:03:19, "Guillermo Ortiz Fernández" wrote: I have been configuring the Flink cluster (application mode) to process the Kafka data volume. The current configuration consists of 16 pods, each with 12GB of memory and 2 CPUs. Each TaskManager has 4 slots. All processing is done using Flink SQL, and since Kafka topics may contain out-of-order data, a 5-minute watermark is applied. In previous configurations, we allocated between 4-8GB per pod, but this caused memory errors that led to process crashes. Currently, this process is a migration from Kafka Streams, which is running in parallel, consuming exactly the same data on the same infrastructure, but using significantly fewer resources than Flink. Approximately, Flink is consuming 10x more memory and 8x more CPU than Kafka Streams. I am unsure whether this is due to Flink inherently consuming more resources, whether using SQL results in poorly optimized auto-generated code, or if there is a configuration issue. The metrics do not show any backpressure or errors, and the process has been running for several days without failures, although with high CPU and memory usage. Any ideas on how to diagnose or optimize this to reduce resource consumption? El jue, 30 ene 2025 a las 10:02, Pedro Mázala () escribió: > The average output rate needed to avoid lag after filtering messages should > be around 60K messages per second. I’ve been testing different configurations > of parallelism, slots and pods (everything runs on Kubernetes), but I’m far > from achieving those numbers. How are you partitioning your query? Do you see any backpressure happening on the Flink UI? > In the latest configuration, I used 20 pods, a parallelism of 120, with 4 > slots per taskmanager. Were all tasks working properly or you had idleness? > Additionally, setting parallelism to 120 creates hundreds of subtasks for the > smaller topics, which don’t do much but still consume minimal resources even > if idle. On the table API I'm not sure if you can choose parallelism per "task" DataStream I'm sure you can do it. Att, Pedro Mázala +31 (06) 3819 3814 Be awesome On Wed, 29 Jan 2025 at 22:06, Guillermo Ortiz Fernández wrote: After last checking it uses about 200-400 millicores each pod and 2.2Gb. El mié, 29 ene 2025 a las 21:41, Guillermo Ortiz Fernández () escribió: I have a job entirely written in Flink SQL. The first part of the program processes 10 input topics and generates one output topic with normalized messages and some filtering applied (really easy, some where by fields and substring). Nine of the topics produce between hundreds and thousands of messages per second, with an average of 4–10 partitions each. The other topic produces 150K messages per second and has 500 partitions. They are unioned to the output topic. The average output rate needed to avoid lag after filtering messages should be around 60K messages per second. I’ve been testing different configurations of parallelism, slots and pods (everything runs on Kubernetes), but I’m far from achieving those numbers. In the latest configuration, I used 20 pods, a parallelism of 120, with 4 slots per taskmanager. With this setup, I achieve approximately 20K messages per second, but I’m unable to consume the largest topic at the rate messages are being produced. Additionally, setting parallelism to 120 creates hundreds of subtasks for the smaller topics, which don’t do much but still consume minimal resources even if idle. I started trying with parallelism 12 and got about 1000-3000 messages per second. When I check the use of cpu and memory to the pods and don't see any problem and they are far from the limit, each taskmanager has 4gb and 2cpus and they are never close to u
How can we read checkpoint data for debugging state
Hi, So I have a flink application which stores state (RocksDB state backend) in S3 with the following directory structure: s3://{bucket}/flink-checkpoints//{job-id}|+ --shared/+ --taskowned/+ --chk-/ I have my job pipeline defined like: final DataStream events = env.fromSource(..., "Src"); SingleOutputStreamOperator statsData = events .keyBy(new MyKeySelector()) .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) .name("Proc"); statsData .addSink(new MySink<>(...)) .name("Sink"); env.execute("Exec"); The MyStatsProcessor has keyed states defined as: state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("state1", Types.POJO(StateOne.class))); state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("state2", Types.POJO(StateTwo.class))); So my question is how can I read any checkpoint state. I see this API flink-state-processor-api. Can I use the same here, if so how do I instantiate it: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader savepoint = SavepointReader.read(env, "s3://{bucket}/flink-checkpoints/{job-id}", ); Is this the correct way to use this API for reading a checkpoint ? Please note I have also enabled: state.backend.incremental: 'true' state.backend.local-recovery: 'true' state.backend.changelog.enabled: 'true' state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://{bucket}/changelog dstl.dfs.compression.enabled: 'true' Now after say I am able to create the reader how do I inspect a particular keyed state. I see a function called readKeyedState but I am unsure as to what uuid I need to pass to read a particular state? Would something like this work: DataStream keyedState = savepoint.readKeyedState( OperatorIdentifier.forUid("Proc"), new ReaderFunction()); And now in my KeyedState class, I can access state1 and state2. Would this work? Please let me know if I am on the right track or this is something not possible to read checkpointed states via any external application for debugging. Thanks Sachin
Re: How can we read checkpoint data for debugging state
Hi, I am working on Flink 1.19.1, so I guess I cannot use the SQL connector as that's to be released for 2.1.0 If I directly try to use the flink-state-processor-api, my very first question is how do I know the uuid for each keyed state ? Is it the step name? as in events .keyBy(new MyKeySelector()) .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) .name("Proc"); If now I want to access keyed states for MyStatsProcessor then I need to access it using: savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new ReaderFunction()); Is this the right way to access it ? Thanks Sachin On Fri, Feb 21, 2025 at 11:09 AM Xuyang wrote: > Hi, > > FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint data[1] > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data > > > -- > Best! > Xuyang > > > At 2025-02-21 12:11:02, "Sachin Mittal" wrote: > > Hi, > So I have a flink application which stores state (RocksDB state backend) > in S3 with the following directory structure: > > s3://{bucket}/flink-checkpoints//{job-id}|+ --shared/ >+ --taskowned/+ --chk-/ > > > I have my job pipeline defined like: > > final DataStream events = env.fromSource(..., "Src"); > > SingleOutputStreamOperator statsData = > events > .keyBy(new MyKeySelector()) > .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) > .name("Proc"); > > statsData > .addSink(new MySink<>(...)) > .name("Sink"); > > env.execute("Exec"); > > > The MyStatsProcessor has keyed states defined as: > > state1 = > getRuntimeContext().getState(new ValueStateDescriptor<>("state1", > Types.POJO(StateOne.class))); > state2 = > getRuntimeContext().getState(new ValueStateDescriptor<>("state2", > Types.POJO(StateTwo.class))); > > > So my question is how can I read any checkpoint state. I see this API > flink-state-processor-api. > Can I use the same here, if so how do I instantiate it: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader > savepoint = SavepointReader.read(env, > "s3://{bucket}/flink-checkpoints/{job-id}", ); > > Is this the correct way to use this API for reading a checkpoint ? > > Please note I have also enabled: > state.backend.incremental: 'true' > state.backend.local-recovery: 'true' > state.backend.changelog.enabled: 'true' > state.backend.changelog.storage: filesystem > dstl.dfs.base-path: s3://{bucket}/changelog > dstl.dfs.compression.enabled: 'true' > > Now after say I am able to create the reader how do I inspect a particular > keyed state. > I see a function called readKeyedState but I am unsure as to what uuid I > need to pass to read a particular state? > Would something like this work: > > DataStream keyedState = savepoint.readKeyedState( > OperatorIdentifier.forUid("Proc"), new ReaderFunction()); > And now in my KeyedState class, I can access state1 and state2. > > Would this work? > > Please let me know if I am on the right track or this is something not > possible to read checkpointed states via any external application for > debugging. > > Thanks > Sachin > > > > > > > > > > > > > > > > > > > > > > >
SASL_SSL snik kafka
Hello, We are encountering a connection issue with our Kafka sink when using AWS MSK IAM authentication. While our Kafka source connects successfully, the sink fails to establish a connection. Here's how we're building the sink: ```java KafkaSinkBuilder builder = KafkaSink.builder() .setBootstrapServers(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) .setRecordSerializer(new FlinkKafkaRecordSerializationSchema<>(outputConfig)) .setKafkaProducerConfig(producerProps) .setDeliveryGuarantee(deliveryGuarantee) .setTransactionalIdPrefix(getTransactionalIdPrefix(outputConfig, jobName)); ``` The `producerProps` include the following configuration for IAM authentication: ``` { security.protocol=SASL_SSL, sasl.mechanism=AWS_MSK_IAM, sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;, sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler } ``` The sink connection fails with the following error: ``` 2025-02-20 13:57:35,299 WARN org.apache.kafka.clients.NetworkClient ... - [AdminClient clientId=pdfOrchestrator-enumerator-admin-client] Error connecting to node kafka-central-1.amazonaws.com:9098 (id: -1 rack: null) java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed] ... Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator ... Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism AWS_MSK_IAM ``` This error suggests a problem with the SASL/IAM authentication configuration for the sink. We have already verified the following: * **Endpoint:** We are using port 9098, which is correct for MSK with IAM authentication. We have also confirmed that the source connection uses the same port. * **Dependencies:** The `aws-msk-iam-auth` dependency is included in our project. We suspect the issue might be related to IAM role configuration or missing AWS region information. Could you please provide guidance on how to further troubleshoot this? Specifically, we would appreciate information on: * Best practices for configuring IAM roles for Flink jobs connecting to MSK with IAM authentication. * How to explicitly set the AWS region for the Kafka client in this context. * Any other common pitfalls related to MSK IAM authentication with Flink. Thank you for your assistance.
Re: FlinkSQL LAG, strange behavior?
And another option I tried it's: WITH ranked AS ( select * FROM ( SELECT msisdn, eventTimestamp, zoneIds, ts, ROW_NUMBER() OVER (PARTITION BY msisdn,ts ORDER BY ts) AS rownum FROM example) WHERE rownum = 1 ) SELECT msisdn, eventTimestamp, ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts), ARRAY[-1]), -1) AS prev_zoneIds, ts FROM ranked; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: StreamPhysicalOverAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[FirstRow], key=[msisdn, ts], order=[ROWTIME]) El jue, 20 feb 2025 a las 12:00, Guillermo Ortiz Fernández (< guillermo.ortiz.f...@gmail.com>) escribió: > Another option would be add an extra field like: > > ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY > *ts, > row_number*), ARRAY[-1]), -1) AS prev_zoneIds, > > But it isn't possible either. > > El jue, 20 feb 2025 a las 11:48, Guillermo Ortiz Fernández (< > guillermo.ortiz.f...@gmail.com>) escribió: > >> I tried to create a additional field based on ROW_NUMBER but it can't be >> used in LAG function. >> >> The idea was: >> WITH ranked AS ( >> SELECT >> msisdn, >> eventTimestamp, >> zoneIds, >> ts, >> TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY >> ts), 3) AS ts_ranked >> FROM example_2 >> ) >> SELECT >> msisdn, >> eventTimestamp, >> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, >> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER >> BY ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds, >> ts >> FROM ranked; >> >> RANKED subselect: >> msisdn eventTimestampzoneIds >> ts row_num ts_ranked >>673944959173999638[1] >> 2025-02-19 21:19:40.0001 1970-01-01 01:00:00.001 >>673944959173999638[1] >> 2025-02-19 21:19:40.0002 1970-01-01 01:00:00.002 >> >> *But [ERROR] Could not execute SQL statement. Reason:* >> *org.apache.flink.table.api.TableException: OVER windows' ordering in >> stream mode must be defined on a time attribute. * >> >> I guess just could use "ts" field because if it's the temporal field? >> >> >> El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (< >> alessio.berne...@gmail.com>) escribió: >> >>> Hello, >>> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs. >>> TIMESTAMP ts i had some not understandable behaviors. >>> >>> Changing the ts to TIMESTAMP (not LTZ) the identical query works as >>> expected. >>> So, first all, we changed all the parsing to TIMESTAMP to exclude this >>> kind of problems. >>> >>> If someone is interested i can provide the exact query to reproduce the >>> problem we figured out. >>> >>> Greetings, >>> Alessio >>> >>> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández < >>> guillermo.ortiz.f...@gmail.com> wrote: >>> I have created a table that reads from a Kafka topic. What I want to do is order the data by eventTime and add a new field that represents the previous value using the LAG function. The problem arises when two records have exactly the same eventTime, which produces a "strange" behavior. CREATE TABLE example ( eventTimestamp BIGINT NOT NULL, msisdn INT NOT NULL, zoneIds ARRAY NOT NULL, ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'kafka', 'topic' = 'example-offset', 'properties.bootstrap.servers' = '', 'properties.auto.offset.reset' = 'latest', 'scan.startup.mode' = 'latest-offset', 'key.format' = 'raw', 'key.fields' = 'msisdn', 'value.format' = 'avro', 'value.fields-include' = 'ALL', 'scan.watermark.idle-timeout' = '1000', ); INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES (173999638, 673944959, ARRAY[1]), (173999638, 673944959, ARRAY[1]); SELECT msisdn, eventTimestamp, ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, tsFROM example; *Actual Result:* msisdneventTimestampzoneIdsprev_zoneIdsts 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000 673944959 173999638 [1] [1] 2025-02-19 21:19:40.000*Expected Result:* msisdneventTimestampzoneIdsprev_zoneIdsts 67394495
Re: Performance problem with FlinkSQL
I have been configuring the Flink cluster (application mode) to process the Kafka data volume. The current configuration consists of 16 pods, each with *12GB of memory and 2 CPUs*. Each TaskManager has *4 slots*. All processing is done using *Flink SQL*, and since Kafka topics may contain out-of-order data, a *5-minute watermark* is applied. In previous configurations, we allocated between *4-8GB per pod*, but this caused memory errors that led to process crashes. Currently, this process is a *migration from Kafka Streams*, which is running in parallel, consuming exactly the same data on the same infrastructure, but using *significantly fewer resources than Flink*. Approximately, Flink is consuming *10x more memory* and *8x more CPU* than Kafka Streams. I am unsure whether this is due to Flink inherently consuming more resources, whether using SQL results in poorly optimized auto-generated code, or if there is a configuration issue. The *metrics do not show any backpressure or errors*, and the process has been running for several days without failures, although with *high CPU and memory usage*. Any ideas on how to diagnose or optimize this to reduce resource consumption? El jue, 30 ene 2025 a las 10:02, Pedro Mázala () escribió: > > The average output rate needed to avoid lag after filtering messages > should be around 60K messages per second. I’ve been testing different > configurations of parallelism, slots and pods (everything runs on > Kubernetes), but I’m far from achieving those numbers. > > How are you partitioning your query? Do you see any backpressure happening > on the Flink UI? > > > In the latest configuration, I used 20 pods, a parallelism of 120, with > 4 slots per taskmanager. > > Were all tasks working properly or you had idleness? > > > Additionally, setting parallelism to 120 creates hundreds of subtasks > for the smaller topics, which don’t do much but still consume minimal > resources even if idle. > > On the table API I'm not sure if you can choose parallelism per "task" > DataStream I'm sure you can do it. > > > > Att, > Pedro Mázala > +31 (06) 3819 3814 > Be awesome > > > On Wed, 29 Jan 2025 at 22:06, Guillermo Ortiz Fernández < > guillermo.ortiz.f...@gmail.com> wrote: > >> After last checking it uses about 200-400 millicores each pod and 2.2Gb. >> >> El mié, 29 ene 2025 a las 21:41, Guillermo Ortiz Fernández (< >> guillermo.ortiz.f...@gmail.com>) escribió: >> >>> I have a job entirely written in Flink SQL. The first part of the >>> program processes 10 input topics and generates one output topic with >>> normalized messages and some filtering applied (really easy, some where by >>> fields and substring). Nine of the topics produce between hundreds and >>> thousands of messages per second, with an average of 4–10 partitions each. >>> The other topic produces 150K messages per second and has 500 partitions. >>> They are unioned to the output topic. >>> >>> The average output rate needed to avoid lag after filtering messages >>> should be around 60K messages per second. I’ve been testing different >>> configurations of parallelism, slots and pods (everything runs on >>> Kubernetes), but I’m far from achieving those numbers. >>> >>> In the latest configuration, I used 20 pods, a parallelism of 120, with >>> 4 slots per taskmanager. With this setup, I achieve approximately 20K >>> messages per second, but I’m unable to consume the largest topic at the >>> rate messages are being produced. Additionally, setting parallelism to 120 >>> creates hundreds of subtasks for the smaller topics, which don’t do much >>> but still consume minimal resources even if idle. >>> >>> I started trying with parallelism 12 and got about 1000-3000 messages >>> per second. >>> >>> When I check the use of cpu and memory to the pods and don't see any >>> problem and they are far from the limit, each taskmanager has 4gb and >>> 2cpus and they are never close to using the cpu. >>> >>> It’s a requirement to do everything 100% in SQL. How can I improve the >>> throughput rate? Should I be concerned about the hundreds of subtasks >>> created for the smaller topics? >>> >>