hy the kafka source does not rely on the
committed offset for recovery, even though the offset stored in
checkpoint/savepoint is the same as the one committed to kafka ?
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#consumer-offset-committing
Best
t;<*Kafka
> source does not **rely on committed offsets for fault tolerance.
> Committing offset is only for exposing the progress of consumer and
> consuming group for monitoring*>>.
>
> Can someone explain please why the kafka source does not rely on the
> committed offse
> Are they two different things?
There are no consumer and broker offsets, there are offsets which belong to
a topic + partition pair.
> And which offset is saved in the checkpoint/savepoint?
Which Flink thinks is processed already.
Regarding the PROD deploy now you know the risks so fee
Hello Gabor,
Thanks for your response.
I just want to clarify one thing: is there any difference between the Kafka
source offset and the Kafka broker offset? Are they two different things?
And which offset is saved in the checkpoint/savepoint?
For our use case, we intend to take a savepoint
on regarding the subject,
> please?
>
> Best regards.
>
> Le mer. 19 mars 2025 à 15:02, mejri houssem a
> écrit :
>
>> Hello,
>>
>> So if I understand you well, I cannot rely on the kafka broker offset to
>> achieve at-least-once guarantee. Without checkpoint
tee. Without checkpoint/savepoint enabled,
> that would not be possible.
>
> Best regards
>
> Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy a écrit :
>
>> Hi Mejri,
>> Not exactly, you can still rely on savepoint to restart/redeploy the job
>> from the latest offse
Hello,
So if I understand you well, I cannot rely on the kafka broker offset to
achieve at-least-once guarantee. Without checkpoint/savepoint enabled,
that would not be possible.
Best regards
Le mer. 19 mars 2025 à 12:00, Ahmed Hamdy a écrit :
> Hi Mejri,
> Not exactly, you can still r
ondering if this is strictly necessary, since the Kafka broker
>> itself keeps track of offsets (i am not mistaken). In other words, if we
>> redeploy the job, will it automatically resume from the last Kafka offset,
>> or should we still rely on Flink’s checkpoint/savepoint mechanism to
’m wondering if this is strictly necessary, since the Kafka broker
> itself keeps track of offsets (i am not mistaken). In other words, if we
> redeploy the job, will it automatically resume from the last Kafka offset,
> or should we still rely on Flink’s checkpoint/savepoint mechanism to en
Hi Mejri
> I’m wondering if this is strictly necessary, since the Kafka broker
itself keeps track of offsets (i am not mistaken). In other words, if we
redeploy the job, will it automatically resume from the last Kafka offset,
or should we still rely on Flink’s checkpoint/savepoint mechanism
Hello everyone,
We have a stateless Flink job that uses a Kafka source with at-least-once
guarantees. We’ve enabled checkpoints so that, in the event of a restart,
Flink can restore from the last committed offset stored in a successful
checkpoint. Now we’re considering enabling savepoints for our
nitializerImpl.java:393)
>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>> .createAndRestore(Back
estorerProcedure.java:137)
>>
>> My code to read the state is like:
>>
>> SavepointReader savepoint =
>> SavepointReader.read(env,
>> "s3://{bucket}/flink-checkpoints/{jobId}/chk-{num}/_metadata", new
>> EmbeddedRocksDBStateBackend(true));
>&
adata", new
> EmbeddedRocksDBStateBackend(true));
>
> DataStream keyedState =
> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
> StateReaderFunction(), Types.LONG, Types.POJO(State.class));
>
> keyedState.print();
>
> env.execu
ckend(true));
DataStream keyedState =
savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
StateReaderFunction(), Types.LONG, Types.POJO(State.class));
keyedState.print();
env.execute("Analysis");
Any idea as to what could be going wrong.
Also note this is my checkpo
vents = env.fromSource(..., "Src");
>>>>
>>>> SingleOutputStreamOperator statsData =
>>>> events
>>>> .keyBy(new MyKeySelector())
>>>> .process(new MyStatsProcessor(), Types.POJO(StatsDa
DataStream events = env.fromSource(..., "Src");
>>>
>>> SingleOutputStreamOperator statsData =
>>> events
>>> .keyBy(new MyKeySelector())
>>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
&
tsData
>> .addSink(new MySink<>(...))
>> .name("Sink");
>>
>> env.execute("Exec");
>>
>>
>> The MyStatsProcessor has keyed states defined as:
>>
>> state1 =
>> getRuntimeContext().g
state2 =
> getRuntimeContext().getState(new ValueStateDescriptor<>("state2",
> Types.POJO(StateTwo.class)));
>
>
> So my question is how can I read any checkpoint state. I see this API
> flink-state-processor-api.
> Can I use the same here, if
.description("This is my first Proc");
Nix,.
From: Sachin Mittal
Date: Friday, February 21, 2025 at 8:40 AM
To: Xuyang
Cc: user
Subject: Re: How can we read checkpoint data for debugging state
Hi,
I am working on Flink 1.19.1, so I guess I cannot use the SQL connector as
that
env.execute("Exec");
>
>
> The MyStatsProcessor has keyed states defined as:
>
> state1 =
> getRuntimeContext().getState(new ValueStateDescriptor<>("state1",
> Types.POJO(StateOne.class)));
> state2 =
> getRuntimeContext().
criptor<>("state2",
Types.POJO(StateTwo.class)));
So my question is how can I read any checkpoint state. I see this API
flink-state-processor-api.
Can I use the same here, if so how do I instantiate it:
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecu
etRuntimeContext().getState(new
ValueStateDescriptor<>("state2", Types.POJO(StateTwo.class)));
So my question is how can I read any checkpoint state. I see this API
flink-state-processor-api.
Can I use the same here, if so how do I instantiate it:
StreamExecutionEnvironment env =
Hello,
I am running a job on apache Flink 1.19, on AWS EMR (EC2) cluster as a YARN
application.
I have implemented a generic log-based incremental checkpointing for faster
checkpoint.
It is more described in here:
https://flink.apache.org/2022/05/30/improving-speed-and-stability-of
HA clusterId to force a resume from a savepoints, This can be the case
when we change something in the application and we cannot then resume from
a checkpoint. So I suspect we will have some housekeeping to do
ourselves in these scenarios. Keep learning, they said...
> That's weird. Are these
lerance/state/#state-time-to-live-ttl
. And if it is a SQL job, please set 'table.exec.state.ttl'.
Besides that, I did a test locally, it seems the job will recover from the
latest checkpoint instead of a savepoint (even though it is the latest),
and all the checkpoints will be properl
-application/checkpoints increasing? And have you set the state
>> TTL?
>>
>>
>> Best,
>> Zakelly
>>
>> On Tue, Dec 31, 2024 at 7:58 PM Jean-Marc Paulin
>> wrote:
>>
>>> Hi,
>>>
>>> We are on Flink 1.20/Java17 running in a
,
> Zakelly
>
> On Tue, Dec 31, 2024 at 7:58 PM Jean-Marc Paulin
> wrote:
>
>> Hi,
>>
>> We are on Flink 1.20/Java17 running in a k8s environment, with
>> checkpoints enabled on S3 and the following checkpoint options:
>>
Hi,
We are on Flink 1.20/Java17 running in a k8s environment, with checkpoints
enabled on S3 and the following checkpoint options:
execution.checkpointing.dir: s3://flink-application/checkpoints
execution.checkpointing.externalized-checkpoint-retention:
DELETE_ON_CANCELLATION
e sink operator. For both cases we changed the UID of the kafka sink to make sure that its state resets. However, we did it via savepoint to keep the source operator state (no data duplication/loss allowed). The problem is that for both cases the job couldn’t checkpoint anymore. Each checkpoint failed aft
I found a Hadoop class that can log latency information [1], but since I
>> don't see any exceptions in the logs when a checkpoint expires due to
>> timeout, I'm still wondering if I can change other log levels to get more
>> insights, maybe somewhere in Flink's f
ility:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATIONIn this scenario, we had a job with a retained
checkpoint. Upon resubmitting the job, the job ran into a failure:
Classifying stack trace:
org.apache.flink.table.api.TableException: Failed to ex
inosa <
sarda.espin...@gmail.com>:
> Hi again,
>
> I found a Hadoop class that can log latency information [1], but since I
> don't see any exceptions in the logs when a checkpoint expires due to
> timeout, I'm still wondering if I can change other log levels to get
We have also enabled unaligned checkpoints. Could it be because of that? We
were experience slowness and intermittent packet loss when this issue
occurred.
On Wed, Jul 31, 2024 at 7:43 PM Dhruv Patel wrote:
> Hi Everyone,
>
> We are observing an interesting issue with continuous c
Thanks for the clarification,Yanfei.And I will dig it deeper later.
-- --
??:
"Yanfei Lei"
d but with applied probability?
>
> Thanks.
>
>
> -- 原始邮件 --
> 发件人: "Yanfei Lei" ;
> 发送时间: 2024年7月30日(星期二) 下午5:15
> 收件人: "Enric Ott"<243816...@qq.com>;
> 抄送: "user";
> 主题: Re: checkpoint upload thread
>
> Hi Enric
Hi Everyone,
We are observing an interesting issue with continuous checkpoint
failures in our job causing the event to not be forwarded through the
pipeline. We saw a spam of the below log in all our task manager instances.
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException
Hi,Yanfei:
What do you mean by using the word possible in statment it is possible
to use the same
connection for an operator chain? Meaning able to be done but not applied
in fact? Or actually applied but with applied probability?
Thanks.
-- --
??
Hi Enric,
If I understand correctly, one subtask would use one
`asyncOperationsThreadPool`[1,2], it is possible to use the same
connection for an operator chain.
[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask
Hi,Community:
Does Flink upload states and inflight buffers within the same
opratorchain using the same connection (instead of per connection per operator)?
Hi again,
I found a Hadoop class that can log latency information [1], but since I
don't see any exceptions in the logs when a checkpoint expires due to
timeout, I'm still wondering if I can change other log levels to get more
insights, maybe somewhere in Flink's file system ab
Hello,
We have a Flink job that uses ABFSS for checkpoints and related state.
Lately we see a lot of exceptions due to expiration of checkpoints, and I'm
guessing that's an issue in the infrastructure or on Azure's side, but I
was wondering if there are Flink/Hadoop Java packages that log potentia
hi, I have a problem that Flink deletes checkpoint information on
kubernetes HA setup even if
execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION
is set.
config documentation:
"RETAIN_ON_CANCELLATION": Checkpoint state is kept when the owning job is
cancelle
PostgresSourceBuilder
ernal job monitoring
system to manually recover it.
Best,
Zhanghao Chen
From: Jean-Marc Paulin
Sent: Tuesday, June 11, 2024 16:04
To: Zhanghao Chen ; user@flink.apache.org
Subject: Re: Failed to resume from HA when the checkpoint has been deleted.
Thanks for you
Thanks for you reply,
Yes, this is indeed an option. But I was more after a config option to handle
that scenario. If the HA metadata points to a checkpoint that is obviously not
present (err 404 in the S3 case) there is little value in retrying. The HA data
are obviously worthless in that
resume from HA when the checkpoint has been deleted.
Hi,
We have a 1.19 Flink streaming job, with HA enabled (ZooKeeper),
checkpoint/savepoint in S3. We had an outage and now the jobmanager keeps
restarting. We think it because it read the job id to be restarted from
ZooKeeper, but because we lost
Hi,
We have a 1.19 Flink streaming job, with HA enabled (ZooKeeper),
checkpoint/savepoint in S3. We had an outage and now the jobmanager keeps
restarting. We think it because it read the job id to be restarted from
ZooKeeper, but because we lost our S3 Storage as part of the outage it cannot
Hi Sachin,
Yes, that's correct. To resume from a savepoint, use the command bin/flink
run -s . You can find more details in the Flink
documentation on [1].
Additionally, information on how to trigger a savepoint can be found in the
section for triggering savepoints [2].
[1]
https://nightlies.ap
Hi,
I have a long running yarn cluster and I submit my streaming job using the
following command:
flink run -m yarn-cluster -yid application_1473169569237_0001
/usr/lib/flink/examples/streaming/WordCount.jar --input file:///input.txt
--output file:///output/
Let's say I want to stop this job, mak
with the flink-s3-fs-presto plugin as if
I switch to the hadoop plugin, I don't run into 403 errors after the
scale-up events.
3. What is the reason why the presto plugin is recommended over the hadoop
plugin while working with the checkpoint files in S3?
Thank you
Chetas
On Mon, May 13, 20
gt; 2. "file:///home/foo/boo"
> 3. "hdfs:///home/foo/boo"
> 4. or Win32 directory form
>
> Best regards,
> Jiadong Lu
>
> On 2024/5/20 02:28, Phil Stavridis wrote:
> > Hi Lu,
> >
> > Thanks for your reply. In what way are the paths to get pa
s:///home/foo/boo"
4. or Win32 directory form
Best regards,
Jiadong Lu
On 2024/5/20 02:28, Phil Stavridis wrote:
Hi Lu,
Thanks for your reply. In what way are the paths to get passed to the job that needs
to used the checkpoint? Is the standard way, using -s :/ or by passing
the path in the mo
Hi Phil,
I think you can use the "-s :checkpointMetaDataPath" arg to resume the job
from a retained checkpoint[1].
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
Best,
Jinzhong Li
On Mon, May 20, 2024 at 2:
Hi Lu,
Thanks for your reply. In what way are the paths to get passed to the job that
needs to used the checkpoint? Is the standard way, using -s :/ or by
passing the path in the module as a Python arg?
Kind regards
Phil
> On 18 May 2024, at 03:19, jiadong.lu wrote:
>
> Hi Phil,
&
m trying to test how the checkpoints work for restoring state, but not sure
how to run a new instance of a flink job, after I have cancelled it, using the
checkpoints which I store in the filesystem of the job manager, e.g.
/opt/flink/checkpoints.
I have tried passing the checkpoint as an argu
Hi,
I am trying to test how the checkpoints work for restoring state, but not sure
how to run a new instance of a flink job, after I have cancelled it, using the
checkpoints which I store in the filesystem of the job manager, e.g.
/opt/flink/checkpoints.
I have tried passing the checkpoint as
rest of the TM pods are scheduled on these new
nodes.
Issue
After the scale-up, the TM pods scheduled on the existing nodes with
available resources successfully read the checkpoint from S3 however the TM
pods scheduled on the new nodes added by ASG run into 403 (access denied)
while reading the same
Hi there,
Would you mind sharing the whole JM/TM log? It looks like the error log in
the previous email is not the root cause.
Best,
Biao Geng
ou...@139.com 于2024年4月29日周一 16:07写道:
> Hi all:
>When I ran flink sql datagen source and wrote to jdbc, checkpoint kept
> failing
Hi all:
When I ran flink sql datagen source and wrote to jdbc, checkpoint kept
failing with the following error log.
2024-04-29 15:46:25,270 ERROR
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler
[] - Unhandled exception
ns) to
>resume processing from the correct point after failures.
>- Checkpoint Counters: Provides metrics (ID, timestamp, duration) for
>monitoring checkpointing behavior.
>
>
> *Task Managers:*
> While the JobManager handles checkpoint metadata, TaskManagers are the
ation: Preserves job settings (parallelism, state backend)
for consistent restarts.
- Progress Information: Stores offsets (source/sink positions) to resume
processing from the correct point after failures.
- Checkpoint Counters: Provides metrics (ID, timestamp, duration) for
monit
Hi Robert :
Your understanding are right !
Add some more information : JobManager not only responsible for cleaning old
checkpoints, but also needs to write metadata file to checkpoint storage after
all taskmanagers have taken snapshots.
---
Best
Feifan Wang
Hi all, I have some questions about checkpoint and savepoint storage.
>From what I understand a distributed, production-quality job with a lot of
state should use durable shared storage for checkpoints and savepoints. All
job managers and task managers should access the same volume. So typica
t version of yield
> that would actually yield to the checkpoint barrier too. That way operator
> implementations could decide whether any state modification may or may not
> have happened and can optionally allow checkpoint to be taken in the
> "middle of record processing".
Posting this to dev as well...
Thanks Zakelly,
Sounds like a solution could be to add a new different version of yield
that would actually yield to the checkpoint barrier too. That way operator
implementations could decide whether any state modification may or may not
have happened and can
Hi Gyula,
Processing checkpoint halfway through `processElement` is problematic. The
current element will not be included in the input in-flight data, and we
cannot assume it has taken effect on the state by user code. So the best
way is to treat `processElement` as an 'atomic' operatio
Thank you for the detailed analysis Zakelly.
I think we should consider whether yield should process checkpoint barriers
because this puts quite a serious limitation on the unaligned checkpoints
in these cases.
Do you know what is the reason behind the current priority setting? Is
there a problem
Hi Gyula,
Well I tried your example in local mini-cluster, and it seems the source
can take checkpoints but it will block in the following AsyncWaitOperator.
IIUC, the unaligned checkpoint barrier should wait until the current
`processElement` finishes its execution. In your example, the element
benefits of the AsyncIO would be
that we can simply checkpoint the queue and not have to wait for the
completion.
To repro you can simply run:
AsyncDataStream.orderedWait(
env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
new AsyncFunction() {
@Override
public void
Hi, Xuyang & Daniel.
I have checked this part of code. I think it is an expected behavior.
As marked in code comments, this loop makes sure that the transactions
before this checkpoint id are re-created.
The situation Daniel mentioned will happen only when all checkpoint between
1 and 2
Hi, Danny.
When the problem occurs, can you use flame graph to confirm whether the loop in
this code is causing the busyness?
Since I'm not particularly familiar with kafka connector, I can't give you an
accurate reply. I think Hang Ruan is an expert in this field :).
Hi, Ruan Hang. Can you t
2] may
be helpful to implement a custom Flink’s Table API connector.
Specifically in terms of “Flink Checkpoint & Offset Commit”, the
custom source needs to inherit the `SourceReader` interfaces, and you
can override `snapshotState()` and `notifyCheckpointComplete()` into
your implementations.
[3] i
Hi Jacob.
Flink uses "notification" to let an operator callback the completion of a
checkpoint. After gathering all checkpoint done messages from TMs, JM sends
a "notify checkpoint completed" RPC to all TMs. Operators will handle this
notification, where checkpoint success c
ensuring the consistency between Flink’s checkpoint
state and committed offsets on Kafka brokers*.
How is Flink able to control the callbacks from checkpointing? Is there a
way to override this into my implementations. I have multiple upstream
sources to connect to depending on the business model w
Hello Guys,
Can someone please assist us regarding the following issue ?
We have noticed that when we add a *new kafka sink* operator to the graph, *and
start from the last save point*, the operator is 100% busy for several
minutes and *even 1/2-1 hour* !!!
The problematic code seems to be the f
Good morning,
Any updates/progress on this issue ?
BR,
Danny
בתאריך יום א׳, 4 בפבר׳ 2024 ב-13:20 מאת Daniel Peled <
daniel.peled.w...@gmail.com>:
> Hello,
>
> We have noticed that when we add a *new kafka sink* operator to the
> graph, *and start from the last save point*, the operator
Hello,
We have noticed that when we add a *new kafka sink* operator to the graph, *and
start from the last save point*, the operator is 100% busy for several
minutes and *even 1/2-1 hour* !!!
The problematic code seems to be the following for-loop in
getTransactionalProducer() method:
*org.apach
,
Zakelly
On Fri, Jan 19, 2024 at 9:32 PM Evgeniy Lyutikov
wrote:
> Hi all!
> I'm trying to understand the logic of saving checkpoint files and from the
> exchange dump with ceph I see the following requests
>
> HEAD
> /checkpoints/example-job/00
Hi all!
I'm trying to understand the logic of saving checkpoint files and from the
exchange dump with ceph I see the following requests
HEAD
/checkpoints/example-job//shared/9701fae2-0de3-4d6c-b08b-0a92fb7285c9
HTTP/1.1
HTTP/1.1 404 Not Found
HEAD
/checkp
Hi, Oscar.
Just share my thoughts:
Benefits of more aggressive checkpoint:
1. less recovery time as you mentioned (which is also related to data flink
has to rollback to process)
2. less end-to-end latency for checkpoint-bounded sink in exactly-once mode
Costs of more aggressive checkpoint:
1
Hei,
We are tuning some of the flink jobs we have in production and we would
like to know what are the best numbers/considerations for checkpoint
interval. We have set a default of 30 seconds for checkpoint interval and
the checkpoint operation takes around 2 seconds.
We have also enabled
Hi Oscar,
> but we don't understand why this incremental checkpoint keeps increasing
AFAIK, when performing incremental checkpoint, the RocksDBStateBackend will
upload the new created SST files to remote storage. The total size of these
files is the incremental checkpoint size. However,
Hi,
We have a long running job in production and we are trying to understand
the metrics for this job, see attached screenshot.
We have enabled incremental checkpoint for this job and we use RocksDB as a
state backend.
When deployed from fresh state, the initial checkpoint size is about* 2.41G
Hi Jinzhong,
Sorry to answer you just now. We have switched from incremental checkpoint
to non-incremental checkpoint before, I think one of the reasons is the
difficulty to handle properly the clean up of checkpoints on S3. But with
the flink operator's periodic savepoint it may change. I&
Hello,
maybe someone can correct me if I'm wrong, but reading through [1], it
seems to me that manually triggered checkpoints were meant for these
scenarios. If the implementation follows the ticket's description, a
user-triggered checkpoint would "break the chain of incremen
Hi Yang,
I think there is no configuration option available that allow users to
disable checkpoint file cleanup at runtime.
Does your flink application use incremental checkpoint?
1) If yes, i think leveraging S3's lifecycle management to clean checkpoint
files is not safe, because i
Hi Martijn,
We're currently utilizing flink-s3-fs-presto. After reviewing the
flink-s3-fs-hadoop source code, I believe we would encounter similar issues
with it as well.
When we say, 'The purpose of a checkpoint, in principle, is that Flink
manages its lifecycle,' I think it i
Hi Junrui,
Currently, we have configured our flink cluster with
execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATION and state.checkpoints.num-retained: 10. However,
this setup begins to delete the oldest checkpoint once we exceed 10.
Typically, by the time
Ah, I actually misread checkpoint and savepoints, sorry. The purpose
of a checkpoint in principle is that Flink manages its lifecycle.
Which S3 interface are you using for the checkpoint storage?
On Tue, Nov 7, 2023 at 6:39 PM Martijn Visser wrote:
>
> Hi Yang,
>
> If you use the N
-formats/#no_claim-default-mode
On Tue, Nov 7, 2023 at 5:29 PM Junrui Lee wrote:
>
> Hi Yang,
>
>
> You can try configuring
> "execution.checkpointing.externalized-checkpoint-retention:
> RETAIN_ON_CANCELLATION"[1] and increasing the value of
> "state.chec
Hi Yang,
You can try configuring
"execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATION"[1] and increasing the value of
"state.checkpoints.num-retained"[2] to retain more checkpoints.
Here are the official documentation links for more
Dear Flink Community,
In our Flink application, we persist checkpoints to AWS S3. Recently,
during periods of high job parallelism and traffic, we've experienced
checkpoint failures. Upon investigating, it appears these may be related to
S3 delete object requests interrupting checkpoi
Batch mode OR Streaming with
Unbounded input due to the bug[Ref
<https://flink.apache.org/2022/07/11/flip-147-support-checkpoints-after-tasks-finished-part-one/>
].
Custom File Source Strategy: The custom File source listens to Checkpoint
Start and Complete calls. It waits
the complete list to be able to avoid
reprocessing existing files and that would be quite a lot of state.
2. I understand from the docs that you can restart Flink using state
from either a savepoint or a checkpoint. I was trying to restart my test
application standalone using the
cryption key".
I checked that _metadata files are not encrypted. I don't understand
this behavior because I set encryption in flink config and the _metadata
are done by flink. Since Flink created them, it should be able to delete
them as well.
It turns out that flink creates a _metadat
for your reply,We observed the GC situation, there is no
>>>>> change before and after replacement, several tasks on our line using
>>>>> jemalloc have appeared stuck, after removing jemalloc, no stuck situation
>>>>> has been found.
>>>>>
>>&g
served the GC situation, there is no
>>>> change before and after replacement, several tasks on our line using
>>>> jemalloc have appeared stuck, after removing jemalloc, no stuck situation
>>>> has been found.
>>>>
>>>> Best,
>>&
Feng,
>>>
>>> Thank you for your reply,We observed the GC situation, there is no
>>> change before and after replacement, several tasks on our line using
>>> jemalloc have appeared stuck, after removing jemalloc, no stuck situation
>>> has been found.
>
gt; Best,
>> rui
>>
>> Feng Jin 于2023年9月27日周三 19:19写道:
>>
>>>
>>> hi rui,
>>>
>>> In general, checkpoint timeouts are typically associated with the job's
>>> processing performance. When using jemalloc, performance degradation
1 - 100 of 1268 matches
Mail list logo