[jira] [Created] (FLINK-36286) Set up pull request template
Hong Liang Teoh created FLINK-36286: --- Summary: Set up pull request template Key: FLINK-36286 URL: https://issues.apache.org/jira/browse/FLINK-36286 Project: Flink Issue Type: Sub-task Reporter: Hong Liang Teoh -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36287) Sink with topologies should not participate in UC
Arvid Heise created FLINK-36287: --- Summary: Sink with topologies should not participate in UC Key: FLINK-36287 URL: https://issues.apache.org/jira/browse/FLINK-36287 Project: Flink Issue Type: Bug Reporter: Arvid Heise Assignee: Arvid Heise When the sink writer and committer are not chained, it's possible that committables become part of the channel state. However, then it's possible that they are not received before notifyCheckpointComplete. Further, the contract of notifyCheckpointComplete dictates that all side effects need to be committed or we fail on notifyCheckpointComplete. This contract is essential to final checkpoints. We can change by disallowing channel state within sinks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36288) Set up architectural test
Hong Liang Teoh created FLINK-36288: --- Summary: Set up architectural test Key: FLINK-36288 URL: https://issues.apache.org/jira/browse/FLINK-36288 Project: Flink Issue Type: Sub-task Reporter: Hong Liang Teoh -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36289) Create Prometheus Flink connector release
Hong Liang Teoh created FLINK-36289: --- Summary: Create Prometheus Flink connector release Key: FLINK-36289 URL: https://issues.apache.org/jira/browse/FLINK-36289 Project: Flink Issue Type: Sub-task Reporter: Hong Liang Teoh -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36290) OutOfMemoryError in connect test run
Matthias Pohl created FLINK-36290: - Summary: OutOfMemoryError in connect test run Key: FLINK-36290 URL: https://issues.apache.org/jira/browse/FLINK-36290 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Tests Affects Versions: 2.0-preview Reporter: Matthias Pohl We saw a OOM in the connect stage that's caused a fatal error: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=62173&view=logs&j=1c002d28-a73d-5309-26ee-10036d8476b4&t=d1c117a6-8f13-5466-55f0-d48dbb767fcd&l=12182 {code} 03:19:59,975 [ flink-scheduler-1] ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'flink-scheduler-1' produced an uncaught exception. Stopping the process... java.lang.OutOfMemoryError: Java heap space [...] 03:19:59,981 [jobmanager_62-main-scheduler-thread-1] ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'jobmanager_62-main-scheduler-thread-1' produced an uncaught exception. Stopping the process... java.lang.OutOfMemoryError: Java heap space [...] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-XXX Amazon SQS Source Connector
Hi Saurabh, thank you very much for taking the time to explain SQS features to me. It's a cool concept that I haven't seen before. While your presented approach already works for Flink EOS, I still need to poke a bit because I think the usability may be improved. Some background information on how sinks usually work: * On checkpoint, we store the committables (=transactions or file locations). * On notifyCheckpointComplete, we commit them all or fail on persisting errors. * On recovery, we retry all stored committables. * On recovery, we cancel all opened transactions not belonging to the recovered state. Now back to SQS, can we speed up recovery? Let's go over my thoughts: * I'm assuming you need to set the timeout to a value that corresponds to Flink checkpoint duration. So I often see 1 minute checkpointing interval and a checkpoint taking some seconds, so we would choose a SQS timeout of 2 minutes (we'd probably set it higher in realistic settings). * Before failure, Flink tries to read message1 and fails, there are more messages afterwards (message2, ...) * In standard queues, Flink restarts and will not see message1 until the timeout happens. It can immediately start processing message2 and so on. After 2 minutes, it will finally see message1 and consume it. * In FIFO queues, Flink will not see any message until the timeout happens. So effectively, Flink will not process anything for 2 minutes. That means that recovery is effectively always as high as the SQS timeout. I'm curious if there is a way to manually timeout the messages for faster recovery? I saw that you can decrease the timeout of specific messages, which could be used to set the timeout to 0 on restart of message1. If that works, we could do similar things to the sink. On restart, we immediately try to delete the messages that are part of the checkpoint and set the timeout to 0 for messages that were read after the checkpoint. If it's somehow possible to enumerate all messages before and past the respective checkpoint, we would not depend on the SQS timeout of the consumer for correctness and recovery time and could set it to its max=12h. Else we would have the users to trade off correctness (timeout too low) vs recovery time (timeout too high). What are you planning to store as the source state? > *record2 is still captured as part of the state, so it gets deleted from the queue during the next checkpoint.* Are you planning to store the message ids of all to-be-deleted messages in the state? That certainly works for low volume sources but ideally, you would just store the highest message id and enumerate the other message ids from it. Could you please update the FLIP to include the provided information so that other readers can quickly understand the visibility timeout and the state management? (external links are fine of course but please provide a brief summary in case the link becomes dead at some point) You can of course defer it until we are completely aligned. Best, Arvid On Fri, Sep 13, 2024 at 9:32 AM Saurabh Singh wrote: > Hi Arvid, > > Thanks a lot for your review. > >1. Exactly once: > > > > *When using an SQS source, we leverage two key properties of SQS:1. > Message Retention: This property ensures that messages are retained in the > queue for a specified period, allowing consumers enough time to process > them. Post these messages are deleted.2. Visibility Timeout [1] : This > property prevents other consumers from receiving and processing the same > message while it's being processed by the current consumer.* > > *Regarding the scenario described below:* > > * Read record2, emit record2, record2 is written into a Kafka transaction2 > * Checkpoint2 happens, state is checkpointed, sink remembers transaction2 > to be committed > * NotifyCheckpointCompleted2 is lost for some reason or happens after next > failure > * Some failure happens, Flink is restarted with the current system state: > SQS contains record2, record3, Kafka contains record1 and pending record2. > On restart, sink will commit transactions2 recovered from the state. No SQS > contains record2, record3 and Kafka contains record1, record2. > * Read record2, emit record2, record2 is written into a Kafka transaction3 > * Eventually, we end up with two record2 in the sink. > > *In this scenario, we set the Message Visibility Timeout to be multiples > of the checkpoint duration and ensure it is longer than the recovery > duration. This way, when recovery occurs, record2 remains invisible and is > not read again, preventing duplication. record2 is still captured as part > of the state, so it gets deleted from the queue during the next checkpoint.* > > *Message Retention Time: We typically set this to a sufficiently long > duration. This is beneficial for recovering from extended outages and > backfilling data. If a message is not deleted, it will be read again once > it becomes visible after the visibility timeout expires.* > > *This is our current un
[jira] [Created] (FLINK-36291) java.lang.IllegalMonitorStateException causing a fatal error on the TaskManager side
Matthias Pohl created FLINK-36291: - Summary: java.lang.IllegalMonitorStateException causing a fatal error on the TaskManager side Key: FLINK-36291 URL: https://issues.apache.org/jira/browse/FLINK-36291 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 2.0-preview Reporter: Matthias Pohl HiveDynamicPartitionPruningITCase failed due to the TM timeout. Checking the logs though revealed a fatal error on the taskmanager's side: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=62173&view=logs&j=5cae8624-c7eb-5c51-92d3-4d2dacedd221&t=5acec1b4-945b-59ca-34f8-168928ce5199&l=24046 {code} 03:18:32,209 [taskmanager_72-main-scheduler-thread-1] ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'taskmanager_72-main-scheduler-thread-1' produced an uncaught exception. Stopping the process... java.lang.IllegalMonitorStateException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.signal(AbstractQueuedSynchronizer.java:1939) ~[?:1.8.0_292] at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1103) ~[?:1.8.0_292] at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) ~[?:1.8.0_292] at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) ~[?:1.8.0_292] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) ~[?:1.8.0_292] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_292] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] {code} But there's also a OutOfMemoryError reported just a line below: {code} 03:19:01,060 [Source Data Fetcher for Source: part[62] (2/2)#0] ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Received uncaught exception. java.lang.OutOfMemoryError: Java heap space {code} So that might be related to FLINK-36290 -- This message was sent by Atlassian Jira (v8.20.10#820010)