Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-04-05 Thread mejri houssem
hy the kafka source does not rely on the committed offset for recovery, even though the offset stored in checkpoint/savepoint is the same as the one committed to kafka ? [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#consumer-offset-committing Best

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-27 Thread Andrew Otto
t;<*Kafka > source does not **rely on committed offsets for fault tolerance. > Committing offset is only for exposing the progress of consumer and > consuming group for monitoring*>>. > > Can someone explain please why the kafka source does not rely on the > committed offse

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-26 Thread Gabor Somogyi
> Are they two different things? There are no consumer and broker offsets, there are offsets which belong to a topic + partition pair. > And which offset is saved in the checkpoint/savepoint? Which Flink thinks is processed already. Regarding the PROD deploy now you know the risks so fee

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-26 Thread mejri houssem
Hello Gabor, Thanks for your response. I just want to clarify one thing: is there any difference between the Kafka source offset and the Kafka broker offset? Are they two different things? And which offset is saved in the checkpoint/savepoint? For our use case, we intend to take a savepoint

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-25 Thread Gabor Somogyi
on regarding the subject, > please? > > Best regards. > > Le mer. 19 mars 2025 à 15:02, mejri houssem a > écrit : > >> Hello, >> >> So if I understand you well, I cannot rely on the kafka broker offset to >> achieve at-least-once guarantee. Without checkpoint

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-25 Thread mejri houssem
tee. Without checkpoint/savepoint enabled, > that would not be possible. > > Best regards > > Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy a écrit : > >> Hi Mejri, >> Not exactly, you can still rely on savepoint to restart/redeploy the job >> from the latest offse

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-19 Thread mejri houssem
Hello, So if I understand you well, I cannot rely on the kafka broker offset to achieve at-least-once guarantee. Without checkpoint/savepoint enabled, that would not be possible. Best regards Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy a écrit : > Hi Mejri, > Not exactly, you can still r

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-19 Thread Ahmed Hamdy
ondering if this is strictly necessary, since the Kafka broker >> itself keeps track of offsets (i am not mistaken). In other words, if we >> redeploy the job, will it automatically resume from the last Kafka offset, >> or should we still rely on Flink’s checkpoint/savepoint mechanism to

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-18 Thread mejri houssem
’m wondering if this is strictly necessary, since the Kafka broker > itself keeps track of offsets (i am not mistaken). In other words, if we > redeploy the job, will it automatically resume from the last Kafka offset, > or should we still rely on Flink’s checkpoint/savepoint mechanism to en

Re: Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-18 Thread Ahmed Hamdy
Hi Mejri > I’m wondering if this is strictly necessary, since the Kafka broker itself keeps track of offsets (i am not mistaken). In other words, if we redeploy the job, will it automatically resume from the last Kafka offset, or should we still rely on Flink’s checkpoint/savepoint mechanism

Clarification on checkpoint/savepoint usage with Kafka (and RabbitMQ)

2025-03-18 Thread mejri houssem
Hello everyone, We have a stateless Flink job that uses a Kafka source with at-least-once guarantees. We’ve enabled checkpoints so that, in the event of a restart, Flink can restore from the last committed offset stored in a successful checkpoint. Now we’re considering enabling savepoints for our

Re: How can we read checkpoint data for debugging state

2025-02-26 Thread Gabor Somogyi
nitializerImpl.java:393) >>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173) >>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>> .createAndRestore(Back

Re: How can we read checkpoint data for debugging state

2025-02-26 Thread Sachin Mittal
estorerProcedure.java:137) >> >> My code to read the state is like: >> >> SavepointReader savepoint = >> SavepointReader.read(env, >> "s3://{bucket}/flink-checkpoints/{jobId}/chk-{num}/_metadata", new >> EmbeddedRocksDBStateBackend(true)); >&

Re: How can we read checkpoint data for debugging state

2025-02-26 Thread Gabor Somogyi
adata", new > EmbeddedRocksDBStateBackend(true)); > > DataStream keyedState = > savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new > StateReaderFunction(), Types.LONG, Types.POJO(State.class)); > > keyedState.print(); > > env.execu

Re: How can we read checkpoint data for debugging state

2025-02-26 Thread Sachin Mittal
ckend(true)); DataStream keyedState = savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new StateReaderFunction(), Types.LONG, Types.POJO(State.class)); keyedState.print(); env.execute("Analysis"); Any idea as to what could be going wrong. Also note this is my checkpo

Re: How can we read checkpoint data for debugging state

2025-02-21 Thread Gabor Somogyi
vents = env.fromSource(..., "Src"); >>>> >>>> SingleOutputStreamOperator statsData = >>>> events >>>> .keyBy(new MyKeySelector()) >>>> .process(new MyStatsProcessor(), Types.POJO(StatsDa

Re: How can we read checkpoint data for debugging state

2025-02-21 Thread Sachin Mittal
DataStream events = env.fromSource(..., "Src"); >>> >>> SingleOutputStreamOperator statsData = >>> events >>> .keyBy(new MyKeySelector()) >>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) &

Re: How can we read checkpoint data for debugging state

2025-02-21 Thread Gabor Somogyi
tsData >> .addSink(new MySink<>(...)) >> .name("Sink"); >> >> env.execute("Exec"); >> >> >> The MyStatsProcessor has keyed states defined as: >> >> state1 = >> getRuntimeContext().g

Re: How can we read checkpoint data for debugging state

2025-02-21 Thread Gabor Somogyi
state2 = > getRuntimeContext().getState(new ValueStateDescriptor<>("state2", > Types.POJO(StateTwo.class))); > > > So my question is how can I read any checkpoint state. I see this API > flink-state-processor-api. > Can I use the same here, if

Re: How can we read checkpoint data for debugging state

2025-02-21 Thread Nikola Milutinovic
.description("This is my first Proc"); Nix,. From: Sachin Mittal Date: Friday, February 21, 2025 at 8:40 AM To: Xuyang Cc: user Subject: Re: How can we read checkpoint data for debugging state Hi, I am working on Flink 1.19.1, so I guess I cannot use the SQL connector as that&#

Re: How can we read checkpoint data for debugging state

2025-02-20 Thread Sachin Mittal
env.execute("Exec"); > > > The MyStatsProcessor has keyed states defined as: > > state1 = > getRuntimeContext().getState(new ValueStateDescriptor<>("state1", > Types.POJO(StateOne.class))); > state2 = > getRuntimeContext().

Re:How can we read checkpoint data for debugging state

2025-02-20 Thread Xuyang
criptor<>("state2", Types.POJO(StateTwo.class))); So my question is how can I read any checkpoint state. I see this API flink-state-processor-api. Can I use the same here, if so how do I instantiate it: StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecu

How can we read checkpoint data for debugging state

2025-02-20 Thread Sachin Mittal
etRuntimeContext().getState(new ValueStateDescriptor<>("state2", Types.POJO(StateTwo.class))); So my question is how can I read any checkpoint state. I see this API flink-state-processor-api. Can I use the same here, if so how do I instantiate it: StreamExecutionEnvironment env =

Generic log-based incremental checkpoint seems to be not working

2025-01-06 Thread Sachin Mittal
Hello, I am running a job on apache Flink 1.19, on AWS EMR (EC2) cluster as a YARN application. I have implemented a generic log-based incremental checkpointing for faster checkpoint. It is more described in here: https://flink.apache.org/2022/05/30/improving-speed-and-stability-of

Re: Q: How to best configure checkpoint to ensure they do not fill-up the storage?

2025-01-03 Thread Jean-Marc Paulin
HA clusterId to force a resume from a savepoints, This can be the case when we change something in the application and we cannot then resume from a checkpoint. So I suspect we will have some housekeeping to do ourselves in these scenarios. Keep learning, they said... > That's weird. Are these

Re: Q: How to best configure checkpoint to ensure they do not fill-up the storage?

2025-01-02 Thread Zakelly Lan
lerance/state/#state-time-to-live-ttl . And if it is a SQL job, please set 'table.exec.state.ttl'. Besides that, I did a test locally, it seems the job will recover from the latest checkpoint instead of a savepoint (even though it is the latest), and all the checkpoints will be properl

Re: Q: How to best configure checkpoint to ensure they do not fill-up the storage?

2025-01-01 Thread Jean-Marc Paulin
-application/checkpoints increasing? And have you set the state >> TTL? >> >> >> Best, >> Zakelly >> >> On Tue, Dec 31, 2024 at 7:58 PM Jean-Marc Paulin >> wrote: >> >>> Hi, >>> >>> We are on Flink 1.20/Java17 running in a

Re: Q: How to best configure checkpoint to ensure they do not fill-up the storage?

2025-01-01 Thread Zakelly Lan
, > Zakelly > > On Tue, Dec 31, 2024 at 7:58 PM Jean-Marc Paulin > wrote: > >> Hi, >> >> We are on Flink 1.20/Java17 running in a k8s environment, with >> checkpoints enabled on S3 and the following checkpoint options: >>

Q: How to best configure checkpoint to ensure they do not fill-up the storage?

2024-12-31 Thread Jean-Marc Paulin
Hi, We are on Flink 1.20/Java17 running in a k8s environment, with checkpoints enabled on S3 and the following checkpoint options: execution.checkpointing.dir: s3://flink-application/checkpoints execution.checkpointing.externalized-checkpoint-retention: DELETE_ON_CANCELLATION

Flink job unable to checkpoint (timeout) after restart with savepoint with KafkaSink

2024-09-03 Thread vararu.va...@gmail.com
e sink operator. For both cases we changed the UID of the kafka sink to make sure that its state resets. However, we did it via savepoint to keep the source operator state (no data duplication/loss allowed). The problem is that for both cases the job couldn’t checkpoint anymore. Each checkpoint failed aft

Re: Troubleshooting checkpoint expiration

2024-08-31 Thread Alexis Sarda-Espinosa
I found a Hadoop class that can log latency information [1], but since I >> don't see any exceptions in the logs when a checkpoint expires due to >> timeout, I'm still wondering if I can change other log levels to get more >> insights, maybe somewhere in Flink's f

Losing externalized checkpoint reference in certain failure modes

2024-08-19 Thread Max Feng
ility: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATIONIn this scenario, we had a job with a retained checkpoint. Upon resubmitting the job, the job ran into a failure: Classifying stack trace: org.apache.flink.table.api.TableException: Failed to ex

Re: Troubleshooting checkpoint expiration

2024-08-07 Thread Alexis Sarda-Espinosa
inosa < sarda.espin...@gmail.com>: > Hi again, > > I found a Hadoop class that can log latency information [1], but since I > don't see any exceptions in the logs when a checkpoint expires due to > timeout, I'm still wondering if I can change other log levels to get

Re: Checkpoint failures due to other subtasks sharing the ChannelState file (Caused the Job to Stall)

2024-08-02 Thread Dhruv Patel
We have also enabled unaligned checkpoints. Could it be because of that? We were experience slowness and intermittent packet loss when this issue occurred. On Wed, Jul 31, 2024 at 7:43 PM Dhruv Patel wrote: > Hi Everyone, > > We are observing an interesting issue with continuous c

?????? checkpoint upload thread

2024-08-01 Thread Enric Ott
Thanks for the clarification,Yanfei.And I will dig it deeper later. --  -- ??: "Yanfei Lei"

Re: checkpoint upload thread

2024-08-01 Thread Yanfei Lei
d but with applied probability? > > Thanks. > > > -- 原始邮件 -- > 发件人: "Yanfei Lei" ; > 发送时间: 2024年7月30日(星期二) 下午5:15 > 收件人: "Enric Ott"<243816...@qq.com>; > 抄送: "user"; > 主题: Re: checkpoint upload thread > > Hi Enric

Checkpoint failures due to other subtasks sharing the ChannelState file (Caused the Job to Stall)

2024-07-31 Thread Dhruv Patel
Hi Everyone, We are observing an interesting issue with continuous checkpoint failures in our job causing the event to not be forwarded through the pipeline. We saw a spam of the below log in all our task manager instances. Caused by: org.apache.flink.runtime.checkpoint.CheckpointException

?????? checkpoint upload thread

2024-07-31 Thread Enric Ott
Hi,Yanfei:   What do you mean by using the word possible in statment it is possible to use the same connection for an operator chain? Meaning able to be done but not applied in fact? Or actually applied but with applied probability?   Thanks. --  -- ??

Re: checkpoint upload thread

2024-07-30 Thread Yanfei Lei
Hi Enric, If I understand correctly, one subtask would use one `asyncOperationsThreadPool`[1,2], it is possible to use the same connection for an operator chain. [1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask

checkpoint upload thread

2024-07-29 Thread Enric Ott
Hi,Community:   Does Flink upload states and inflight buffers within the same opratorchain using the same connection (instead of per connection per operator)?

Re: Troubleshooting checkpoint expiration

2024-07-23 Thread Alexis Sarda-Espinosa
Hi again, I found a Hadoop class that can log latency information [1], but since I don't see any exceptions in the logs when a checkpoint expires due to timeout, I'm still wondering if I can change other log levels to get more insights, maybe somewhere in Flink's file system ab

Troubleshooting checkpoint expiration

2024-07-19 Thread Alexis Sarda-Espinosa
Hello, We have a Flink job that uses ABFSS for checkpoints and related state. Lately we see a lot of exceptions due to expiration of checkpoints, and I'm guessing that's an issue in the infrastructure or on Azure's side, but I was wondering if there are Flink/Hadoop Java packages that log potentia

Kubernetes HA checkpoint not retained on termination

2024-07-11 Thread Clemens Valiente
hi, I have a problem that Flink deletes checkpoint information on kubernetes HA setup even if execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION is set. config documentation: "RETAIN_ON_CANCELLATION": Checkpoint state is kept when the owning job is cancelle

flinkcdc????postgrep????????checkpoint????????????????????????

2024-07-06 Thread Eleven
PostgresSourceBuilder

Re: Failed to resume from HA when the checkpoint has been deleted.

2024-06-11 Thread Zhanghao Chen
ernal job monitoring system to manually recover it. Best, Zhanghao Chen From: Jean-Marc Paulin Sent: Tuesday, June 11, 2024 16:04 To: Zhanghao Chen ; user@flink.apache.org Subject: Re: Failed to resume from HA when the checkpoint has been deleted. Thanks for you

Re: Failed to resume from HA when the checkpoint has been deleted.

2024-06-11 Thread Jean-Marc Paulin
Thanks for you reply, Yes, this is indeed an option. But I was more after a config option to handle that scenario. If the HA metadata points to a checkpoint that is obviously not present (err 404 in the S3 case) there is little value in retrying. The HA data are obviously worthless in that

Re: Failed to resume from HA when the checkpoint has been deleted.

2024-06-10 Thread Zhanghao Chen
resume from HA when the checkpoint has been deleted. Hi, We have a 1.19 Flink streaming job, with HA enabled (ZooKeeper), checkpoint/savepoint in S3. We had an outage and now the jobmanager keeps restarting. We think it because it read the job id to be restarted from ZooKeeper, but because we lost

Failed to resume from HA when the checkpoint has been deleted.

2024-06-10 Thread Jean-Marc Paulin
Hi, We have a 1.19 Flink streaming job, with HA enabled (ZooKeeper), checkpoint/savepoint in S3. We had an outage and now the jobmanager keeps restarting. We think it because it read the job id to be restarted from ZooKeeper, but because we lost our S3 Storage as part of the outage it cannot

Re: How to start a flink job on a long running yarn cluster from a checkpoint (with arguments)

2024-05-25 Thread Junrui Lee
Hi Sachin, Yes, that's correct. To resume from a savepoint, use the command bin/flink run -s . You can find more details in the Flink documentation on [1]. Additionally, information on how to trigger a savepoint can be found in the section for triggering savepoints [2]. [1] https://nightlies.ap

How to start a flink job on a long running yarn cluster from a checkpoint (with arguments)

2024-05-25 Thread Sachin Mittal
Hi, I have a long running yarn cluster and I submit my streaming job using the following command: flink run -m yarn-cluster -yid application_1473169569237_0001 /usr/lib/flink/examples/streaming/WordCount.jar --input file:///input.txt --output file:///output/ Let's say I want to stop this job, mak

Re: Flink autoscaler with AWS ASG: checkpoint access issue

2024-05-20 Thread Chetas Joshi
with the flink-s3-fs-presto plugin as if I switch to the hadoop plugin, I don't run into 403 errors after the scale-up events. 3. What is the reason why the presto plugin is recommended over the hadoop plugin while working with the checkpoint files in S3? Thank you Chetas On Mon, May 13, 20

Re: Restore from checkpoint

2024-05-19 Thread archzi lu
gt; 2. "file:///home/foo/boo" > 3. "hdfs:///home/foo/boo" > 4. or Win32 directory form > > Best regards, > Jiadong Lu > > On 2024/5/20 02:28, Phil Stavridis wrote: > > Hi Lu, > > > > Thanks for your reply. In what way are the paths to get pa

Re: Restore from checkpoint

2024-05-19 Thread Jiadong Lu
s:///home/foo/boo" 4. or Win32 directory form Best regards, Jiadong Lu On 2024/5/20 02:28, Phil Stavridis wrote: Hi Lu, Thanks for your reply. In what way are the paths to get passed to the job that needs to used the checkpoint? Is the standard way, using -s :/ or by passing the path in the mo

Re: Restore from checkpoint

2024-05-19 Thread Jinzhong Li
Hi Phil, I think you can use the "-s :checkpointMetaDataPath" arg to resume the job from a retained checkpoint[1]. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint Best, Jinzhong Li On Mon, May 20, 2024 at 2:

Re: Restore from checkpoint

2024-05-19 Thread Phil Stavridis
Hi Lu, Thanks for your reply. In what way are the paths to get passed to the job that needs to used the checkpoint? Is the standard way, using -s :/ or by passing the path in the module as a Python arg? Kind regards Phil > On 18 May 2024, at 03:19, jiadong.lu wrote: > > Hi Phil, &

Re: Restore from checkpoint

2024-05-17 Thread jiadong.lu
m trying to test how the checkpoints work for restoring state, but not sure how to run a new instance of a flink job, after I have cancelled it, using the checkpoints which I store in the filesystem of the job manager, e.g. /opt/flink/checkpoints. I have tried passing the checkpoint as an argu

Restore from checkpoint

2024-05-17 Thread Phil Stavridis
Hi, I am trying to test how the checkpoints work for restoring state, but not sure how to run a new instance of a flink job, after I have cancelled it, using the checkpoints which I store in the filesystem of the job manager, e.g. /opt/flink/checkpoints. I have tried passing the checkpoint as

Flink autoscaler with AWS ASG: checkpoint access issue

2024-05-13 Thread Chetas Joshi
rest of the TM pods are scheduled on these new nodes. Issue After the scale-up, the TM pods scheduled on the existing nodes with available resources successfully read the checkpoint from S3 however the TM pods scheduled on the new nodes added by ASG run into 403 (access denied) while reading the same

Re: Flink SQL checkpoint failed when running on yarn

2024-04-29 Thread Biao Geng
Hi there, Would you mind sharing the whole JM/TM log? It looks like the error log in the previous email is not the root cause. Best, Biao Geng ou...@139.com 于2024年4月29日周一 16:07写道: > Hi all: >When I ran flink sql datagen source and wrote to jdbc, checkpoint kept > failing

Flink SQL checkpoint failed when running on yarn

2024-04-29 Thread ou...@139.com
Hi all: When I ran flink sql datagen source and wrote to jdbc, checkpoint kept failing with the following error log. 2024-04-29 15:46:25,270 ERROR org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler [] - Unhandled exception

Re: Understanding checkpoint/savepoint storage requirements

2024-04-02 Thread Robert Young
ns) to >resume processing from the correct point after failures. >- Checkpoint Counters: Provides metrics (ID, timestamp, duration) for >monitoring checkpointing behavior. > > > *Task Managers:* > While the JobManager handles checkpoint metadata, TaskManagers are the

Re: Understanding checkpoint/savepoint storage requirements

2024-03-27 Thread Asimansu Bera
ation: Preserves job settings (parallelism, state backend) for consistent restarts. - Progress Information: Stores offsets (source/sink positions) to resume processing from the correct point after failures. - Checkpoint Counters: Provides metrics (ID, timestamp, duration) for monit

Re:Understanding checkpoint/savepoint storage requirements

2024-03-27 Thread Feifan Wang
Hi Robert : Your understanding are right ! Add some more information : JobManager not only responsible for cleaning old checkpoints, but also needs to write metadata file to checkpoint storage after all taskmanagers have taken snapshots. --- Best Feifan Wang

Understanding checkpoint/savepoint storage requirements

2024-03-27 Thread Robert Young
Hi all, I have some questions about checkpoint and savepoint storage. >From what I understand a distributed, production-quality job with a lot of state should use durable shared storage for checkpoints and savepoints. All job managers and task managers should access the same volume. So typica

Re: Unaligned checkpoint blocked by long Async operation

2024-03-17 Thread Zakelly Lan
t version of yield > that would actually yield to the checkpoint barrier too. That way operator > implementations could decide whether any state modification may or may not > have happened and can optionally allow checkpoint to be taken in the > "middle of record processing".

Re: Unaligned checkpoint blocked by long Async operation

2024-03-15 Thread Gyula Fóra
Posting this to dev as well... Thanks Zakelly, Sounds like a solution could be to add a new different version of yield that would actually yield to the checkpoint barrier too. That way operator implementations could decide whether any state modification may or may not have happened and can

Re: Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Zakelly Lan
Hi Gyula, Processing checkpoint halfway through `processElement` is problematic. The current element will not be included in the input in-flight data, and we cannot assume it has taken effect on the state by user code. So the best way is to treat `processElement` as an 'atomic' operatio

Re: Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Gyula Fóra
Thank you for the detailed analysis Zakelly. I think we should consider whether yield should process checkpoint barriers because this puts quite a serious limitation on the unaligned checkpoints in these cases. Do you know what is the reason behind the current priority setting? Is there a problem

Re: Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Zakelly Lan
Hi Gyula, Well I tried your example in local mini-cluster, and it seems the source can take checkpoints but it will block in the following AsyncWaitOperator. IIUC, the unaligned checkpoint barrier should wait until the current `processElement` finishes its execution. In your example, the element

Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Gyula Fóra
benefits of the AsyncIO would be that we can simply checkpoint the queue and not have to wait for the completion. To repro you can simply run: AsyncDataStream.orderedWait( env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(), new AsyncFunction() { @Override public void

Re: Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-03-10 Thread Hang Ruan
Hi, Xuyang & Daniel. I have checked this part of code. I think it is an expected behavior. As marked in code comments, this loop makes sure that the transactions before this checkpoint id are re-created. The situation Daniel mentioned will happen only when all checkpoint between 1 and 2

Re:Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-03-10 Thread Xuyang
Hi, Danny. When the problem occurs, can you use flame graph to confirm whether the loop in this code is causing the busyness? Since I'm not particularly familiar with kafka connector, I can't give you an accurate reply. I think Hang Ruan is an expert in this field :). Hi, Ruan Hang. Can you t

Re: Flink Checkpoint & Offset Commit

2024-03-07 Thread Yanfei Lei
2] may be helpful to implement a custom Flink’s Table API connector. Specifically in terms of “Flink Checkpoint & Offset Commit”, the custom source needs to inherit the `SourceReader` interfaces, and you can override `snapshotState()` and `notifyCheckpointComplete()` into your implementations. [3] i

Re: Flink Checkpoint & Offset Commit

2024-03-07 Thread xia rui
Hi Jacob. Flink uses "notification" to let an operator callback the completion of a checkpoint. After gathering all checkpoint done messages from TMs, JM sends a "notify checkpoint completed" RPC to all TMs. Operators will handle this notification, where checkpoint success c

Fwd: Flink Checkpoint & Offset Commit

2024-03-07 Thread Jacob Rollings
ensuring the consistency between Flink’s checkpoint state and committed offsets on Kafka brokers*. How is Flink able to control the callbacks from checkpointing? Is there a way to override this into my implementations. I have multiple upstream sources to connect to depending on the business model w

Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-02-20 Thread Daniel Peled
Hello Guys, Can someone please assist us regarding the following issue ? We have noticed that when we add a *new kafka sink* operator to the graph, *and start from the last save point*, the operator is 100% busy for several minutes and *even 1/2-1 hour* !!! The problematic code seems to be the f

Re: Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-02-07 Thread Daniel Peled
Good morning, Any updates/progress on this issue ? BR, Danny ‫בתאריך יום א׳, 4 בפבר׳ 2024 ב-13:20 מאת ‪Daniel Peled‬‏ <‪ daniel.peled.w...@gmail.com‬‏>:‬ > Hello, > > We have noticed that when we add a *new kafka sink* operator to the > graph, *and start from the last save point*, the operator

Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-02-04 Thread Daniel Peled
Hello, We have noticed that when we add a *new kafka sink* operator to the graph, *and start from the last save point*, the operator is 100% busy for several minutes and *even 1/2-1 hour* !!! The problematic code seems to be the following for-loop in getTransactionalProducer() method: *org.apach

Re: Why calling ListBucket for each file in a checkpoint

2024-01-21 Thread Zakelly Lan
, Zakelly On Fri, Jan 19, 2024 at 9:32 PM Evgeniy Lyutikov wrote: > Hi all! > I'm trying to understand the logic of saving checkpoint files and from the > exchange dump with ceph I see the following requests > > HEAD > /checkpoints/example-job/00

Why calling ListBucket for each file in a checkpoint

2024-01-18 Thread Evgeniy Lyutikov
Hi all! I'm trying to understand the logic of saving checkpoint files and from the exchange dump with ceph I see the following requests HEAD /checkpoints/example-job//shared/9701fae2-0de3-4d6c-b08b-0a92fb7285c9 HTTP/1.1 HTTP/1.1 404 Not Found HEAD /checkp

Re: Advice on checkpoint interval best practices

2023-12-05 Thread Hangxiang Yu
Hi, Oscar. Just share my thoughts: Benefits of more aggressive checkpoint: 1. less recovery time as you mentioned (which is also related to data flink has to rollback to process) 2. less end-to-end latency for checkpoint-bounded sink in exactly-once mode Costs of more aggressive checkpoint: 1

Advice on checkpoint interval best practices

2023-12-05 Thread Oscar Perez via user
Hei, We are tuning some of the flink jobs we have in production and we would like to know what are the best numbers/considerations for checkpoint interval. We have set a default of 30 seconds for checkpoint interval and the checkpoint operation takes around 2 seconds. We have also enabled

Re: Checkpoint RMM

2023-11-27 Thread xiangyu feng
Hi Oscar, > but we don't understand why this incremental checkpoint keeps increasing AFAIK, when performing incremental checkpoint, the RocksDBStateBackend will upload the new created SST files to remote storage. The total size of these files is the incremental checkpoint size. However,

Checkpoint RMM

2023-11-27 Thread Oscar Perez via user
Hi, We have a long running job in production and we are trying to understand the metrics for this job, see attached screenshot. We have enabled incremental checkpoint for this job and we use RocksDB as a state backend. When deployed from fresh state, the initial checkpoint size is about* 2.41G

Re: Disable flink old checkpoint clean

2023-11-15 Thread Yang LI
Hi Jinzhong, Sorry to answer you just now. We have switched from incremental checkpoint to non-incremental checkpoint before, I think one of the reasons is the difficulty to handle properly the clean up of checkpoints on S3. But with the flink operator's periodic savepoint it may change. I&

Re: Disable flink old checkpoint clean

2023-11-08 Thread Alexis Sarda-Espinosa
Hello, maybe someone can correct me if I'm wrong, but reading through [1], it seems to me that manually triggered checkpoints were meant for these scenarios. If the implementation follows the ticket's description, a user-triggered checkpoint would "break the chain of incremen

Re: Disable flink old checkpoint clean

2023-11-07 Thread Jinzhong Li
Hi Yang, I think there is no configuration option available that allow users to disable checkpoint file cleanup at runtime. Does your flink application use incremental checkpoint? 1) If yes, i think leveraging S3's lifecycle management to clean checkpoint files is not safe, because i

Re: Disable flink old checkpoint clean

2023-11-07 Thread Yang LI
Hi Martijn, We're currently utilizing flink-s3-fs-presto. After reviewing the flink-s3-fs-hadoop source code, I believe we would encounter similar issues with it as well. When we say, 'The purpose of a checkpoint, in principle, is that Flink manages its lifecycle,' I think it i

Re: Disable flink old checkpoint clean

2023-11-07 Thread Yang LI
Hi Junrui, Currently, we have configured our flink cluster with execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION and state.checkpoints.num-retained: 10. However, this setup begins to delete the oldest checkpoint once we exceed 10. Typically, by the time

Re: Disable flink old checkpoint clean

2023-11-07 Thread Martijn Visser
Ah, I actually misread checkpoint and savepoints, sorry. The purpose of a checkpoint in principle is that Flink manages its lifecycle. Which S3 interface are you using for the checkpoint storage? On Tue, Nov 7, 2023 at 6:39 PM Martijn Visser wrote: > > Hi Yang, > > If you use the N

Re: Disable flink old checkpoint clean

2023-11-07 Thread Martijn Visser
-formats/#no_claim-default-mode On Tue, Nov 7, 2023 at 5:29 PM Junrui Lee wrote: > > Hi Yang, > > > You can try configuring > "execution.checkpointing.externalized-checkpoint-retention: > RETAIN_ON_CANCELLATION"[1] and increasing the value of > "state.chec

Re: Disable flink old checkpoint clean

2023-11-07 Thread Junrui Lee
Hi Yang, You can try configuring "execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION"[1] and increasing the value of "state.checkpoints.num-retained"[2] to retain more checkpoints. Here are the official documentation links for more

Disable flink old checkpoint clean

2023-11-07 Thread Yang LI
Dear Flink Community, In our Flink application, we persist checkpoints to AWS S3. Recently, during periods of high job parallelism and traffic, we've experienced checkpoint failures. Upon investigating, it appears these may be related to S3 delete object requests interrupting checkpoi

Flink Checkpoint Behavior/Missing Records

2023-10-12 Thread Abhishek SP
Batch mode OR Streaming with Unbounded input due to the bug[Ref <https://flink.apache.org/2022/07/11/flip-147-support-checkpoints-after-tasks-finished-part-one/> ]. Custom File Source Strategy: The custom File source listens to Checkpoint Start and Complete calls. It waits

Checkpoint/Savepoint restore of S3 file reads using continuous read mode

2023-10-06 Thread Mark Petronic
the complete list to be able to avoid reprocessing existing files and that would be quite a lot of state. 2. I understand from the docs that you can restart Flink using state from either a savepoint or a checkpoint. I was trying to restart my test application standalone using the

Flink - problem with removing checkpoint _metadata from google cloud storage when encrypting

2023-10-03 Thread Simon
cryption key". I checked that _metadata files are not encrypted. I don't understand this behavior because I set encryption in flink config and the _metadata are done by flink. Since Flink created them, it should be able to delete them as well. It turns out that flink creates a _metadat

Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-27 Thread Feng Jin
for your reply,We observed the GC situation, there is no >>>>> change before and after replacement, several tasks on our line using >>>>> jemalloc have appeared stuck, after removing jemalloc, no stuck situation >>>>> has been found. >>>>> >>&g

Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-27 Thread rui chen
served the GC situation, there is no >>>> change before and after replacement, several tasks on our line using >>>> jemalloc have appeared stuck, after removing jemalloc, no stuck situation >>>> has been found. >>>> >>>> Best, >>&

Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-27 Thread rui chen
Feng, >>> >>> Thank you for your reply,We observed the GC situation, there is no >>> change before and after replacement, several tasks on our line using >>> jemalloc have appeared stuck, after removing jemalloc, no stuck situation >>> has been found. >

Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-27 Thread Feng Jin
gt; Best, >> rui >> >> Feng Jin 于2023年9月27日周三 19:19写道: >> >>> >>> hi rui, >>> >>> In general, checkpoint timeouts are typically associated with the job's >>> processing performance. When using jemalloc, performance degradation

  1   2   3   4   5   6   7   8   9   10   >