[jira] [Created] (FLINK-36286) Set up pull request template

2024-09-16 Thread Hong Liang Teoh (Jira)
Hong Liang Teoh created FLINK-36286:
---

 Summary: Set up pull request template
 Key: FLINK-36286
 URL: https://issues.apache.org/jira/browse/FLINK-36286
 Project: Flink
  Issue Type: Sub-task
Reporter: Hong Liang Teoh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36287) Sink with topologies should not participate in UC

2024-09-16 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-36287:
---

 Summary: Sink with topologies should not participate in UC
 Key: FLINK-36287
 URL: https://issues.apache.org/jira/browse/FLINK-36287
 Project: Flink
  Issue Type: Bug
Reporter: Arvid Heise
Assignee: Arvid Heise


When the sink writer and committer are not chained, it's possible that 
committables become part of the channel state. However, then it's possible that 
they are not received before notifyCheckpointComplete. Further, the contract of 
notifyCheckpointComplete dictates that all side effects need to be committed or 
we fail on notifyCheckpointComplete. This contract is essential to final 
checkpoints.

We can change by disallowing channel state within sinks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36288) Set up architectural test

2024-09-16 Thread Hong Liang Teoh (Jira)
Hong Liang Teoh created FLINK-36288:
---

 Summary: Set up architectural test
 Key: FLINK-36288
 URL: https://issues.apache.org/jira/browse/FLINK-36288
 Project: Flink
  Issue Type: Sub-task
Reporter: Hong Liang Teoh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36289) Create Prometheus Flink connector release

2024-09-16 Thread Hong Liang Teoh (Jira)
Hong Liang Teoh created FLINK-36289:
---

 Summary: Create Prometheus Flink connector release
 Key: FLINK-36289
 URL: https://issues.apache.org/jira/browse/FLINK-36289
 Project: Flink
  Issue Type: Sub-task
Reporter: Hong Liang Teoh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36290) OutOfMemoryError in connect test run

2024-09-16 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-36290:
-

 Summary: OutOfMemoryError in connect test run
 Key: FLINK-36290
 URL: https://issues.apache.org/jira/browse/FLINK-36290
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Tests
Affects Versions: 2.0-preview
Reporter: Matthias Pohl


We saw a OOM in the connect stage that's caused a fatal error:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=62173&view=logs&j=1c002d28-a73d-5309-26ee-10036d8476b4&t=d1c117a6-8f13-5466-55f0-d48dbb767fcd&l=12182

{code}
03:19:59,975 [   flink-scheduler-1] ERROR 
org.apache.flink.util.FatalExitExceptionHandler  [] - FATAL: Thread 
'flink-scheduler-1' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Java heap space
[...]
03:19:59,981 [jobmanager_62-main-scheduler-thread-1] ERROR 
org.apache.flink.util.FatalExitExceptionHandler  [] - FATAL: Thread 
'jobmanager_62-main-scheduler-thread-1' produced an uncaught exception. 
Stopping the process...
java.lang.OutOfMemoryError: Java heap space
[...]
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-XXX Amazon SQS Source Connector

2024-09-16 Thread Arvid Heise
Hi Saurabh,

thank you very much for taking the time to explain SQS features to me. It's
a cool concept that I haven't seen before.

While your presented approach already works for Flink EOS, I still need to
poke a bit because I think the usability may be improved.

Some background information on how sinks usually work:
* On checkpoint, we store the committables (=transactions or file
locations).
* On notifyCheckpointComplete, we commit them all or fail on persisting
errors.
* On recovery, we retry all stored committables.
* On recovery, we cancel all opened transactions not belonging to the
recovered state.

Now back to SQS, can we speed up recovery? Let's go over my thoughts:
* I'm assuming you need to set the timeout to a value that corresponds to
Flink checkpoint duration. So I often see 1 minute checkpointing interval
and a checkpoint taking some seconds, so we would choose a SQS timeout of 2
minutes (we'd probably set it higher in realistic settings).
* Before failure, Flink tries to read message1 and fails, there are more
messages afterwards (message2, ...)
* In standard queues, Flink restarts and will not see message1 until the
timeout happens. It can immediately start processing message2 and so on.
After 2 minutes, it will finally see message1 and consume it.
* In FIFO queues, Flink will not see any message until the timeout happens.
So effectively, Flink will not process anything for 2 minutes. That means
that recovery is effectively always as high as the SQS timeout.

I'm curious if there is a way to manually timeout the messages for faster
recovery? I saw that you can decrease the timeout of specific messages,
which could be used to set the timeout to 0 on restart of message1. If that
works, we could do similar things to the sink. On restart, we immediately
try to delete the messages that are part of the checkpoint and set the
timeout to 0 for messages that were read after the checkpoint. If it's
somehow possible to enumerate all messages before and past the respective
checkpoint, we would not depend on the SQS timeout of the consumer for
correctness and recovery time and could set it to its max=12h. Else we
would have the users to trade off correctness (timeout too low) vs recovery
time (timeout too high).

What are you planning to store as the source state?
> *record2 is still captured as part of the state, so it gets deleted from
the queue during the next checkpoint.*
Are you planning to store the message ids of all to-be-deleted messages in
the state? That certainly works for low volume sources but ideally, you
would just store the highest message id and enumerate the other message ids
from it.

Could you please update the FLIP to include the provided information so
that other readers can quickly understand the visibility timeout and the
state management? (external links are fine of course but please provide a
brief summary in case the link becomes dead at some point) You can of
course defer it until we are completely aligned.

Best,

Arvid

On Fri, Sep 13, 2024 at 9:32 AM Saurabh Singh 
wrote:

> Hi Arvid,
>
> Thanks a lot for your review.
>
>1. Exactly once:
>
>
>
> *When using an SQS source, we leverage two key properties of SQS:1.
> Message Retention: This property ensures that messages are retained in the
> queue for a specified period, allowing consumers enough time to process
> them. Post these messages are deleted.2. Visibility Timeout [1] : This
> property prevents other consumers from receiving and processing the same
> message while it's being processed by the current consumer.*
>
> *Regarding the scenario described below:*
>
> * Read record2, emit record2, record2 is written into a Kafka transaction2
> * Checkpoint2 happens, state is checkpointed, sink remembers transaction2
> to be committed
> * NotifyCheckpointCompleted2 is lost for some reason or happens after next
> failure
> * Some failure happens, Flink is restarted with the current system state:
> SQS contains record2, record3, Kafka contains record1 and pending record2.
> On restart, sink will commit transactions2 recovered from the state. No SQS
> contains record2, record3 and Kafka contains record1, record2.
> * Read record2, emit record2, record2 is written into a Kafka transaction3
> * Eventually, we end up with two record2 in the sink.
>
> *In this scenario, we set the Message Visibility Timeout to be multiples
> of the checkpoint duration and ensure it is longer than the recovery
> duration. This way, when recovery occurs, record2 remains invisible and is
> not read again, preventing duplication. record2 is still captured as part
> of the state, so it gets deleted from the queue during the next checkpoint.*
>
> *Message Retention Time: We typically set this to a sufficiently long
> duration. This is beneficial for recovering from extended outages and
> backfilling data. If a message is not deleted, it will be read again once
> it becomes visible after the visibility timeout expires.*
>
> *This is our current un

[jira] [Created] (FLINK-36291) java.lang.IllegalMonitorStateException causing a fatal error on the TaskManager side

2024-09-16 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-36291:
-

 Summary: java.lang.IllegalMonitorStateException causing a fatal 
error on the TaskManager side
 Key: FLINK-36291
 URL: https://issues.apache.org/jira/browse/FLINK-36291
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 2.0-preview
Reporter: Matthias Pohl


HiveDynamicPartitionPruningITCase failed due to the TM timeout. Checking the 
logs though revealed a fatal error on the taskmanager's side:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=62173&view=logs&j=5cae8624-c7eb-5c51-92d3-4d2dacedd221&t=5acec1b4-945b-59ca-34f8-168928ce5199&l=24046

{code}
03:18:32,209 [taskmanager_72-main-scheduler-thread-1] ERROR 
org.apache.flink.util.FatalExitExceptionHandler  [] - FATAL: Thread 
'taskmanager_72-main-scheduler-thread-1' produced an uncaught exception. 
Stopping the process...
java.lang.IllegalMonitorStateException: null
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.signal(AbstractQueuedSynchronizer.java:1939)
 ~[?:1.8.0_292]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1103)
 ~[?:1.8.0_292]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
 ~[?:1.8.0_292]
at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) 
~[?:1.8.0_292]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) 
~[?:1.8.0_292]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_292]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
{code}

But there's also a OutOfMemoryError reported just a line below:
{code}
03:19:01,060 [Source Data Fetcher for Source: part[62] (2/2)#0] ERROR 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - 
Received uncaught exception.
java.lang.OutOfMemoryError: Java heap space
{code}

So that might be related to FLINK-36290



--
This message was sent by Atlassian Jira
(v8.20.10#820010)