Hi Talat Uyarer,

  *   There is no way to have only one file unless you lower the parallelism to 
1 (= only one subtask)
  *   So which files do you see: 1 “_metadata” + multiple data files (or just 
  *   The idea of having multiple files is to allow multiple threads to be able 
to stare checkpoints at the same time, and when restarting from a checkpoint to 
consume from more files potentially distributed to multiple physical hard 
driver (more I/O capacity)
  *   So in general it is good to have multiple files

Still (out of curiosity) why would you want to have everything in a single file?

Sincere greetings


From: Talat Uyarer <tuya...@paloaltonetworks.com>
Sent: Thursday, February 2, 2023 5:57 PM
To: Schwalbe Matthias <matthias.schwa...@viseca.ch>
Cc: Kishore Pola <kishore.p...@hotmail.com>; weijie guo 
<guoweijieres...@gmail.com>; user@flink.apache.org
Subject: Re: Reducing Checkpoint Count for Chain Operator

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

Hi Schwalbe, weijie,

Thanks for your reply.

  *   Each state primitive/per subtask stores state into a separate file

In this picture You can see Operator Chain 

Source and Map are in the same chain. Today Flink creates two files for that 
operator chain. When we have OperatorChain, All subtasks are running in the 
same machine, same thread for memory optimization.  However Flink creates 
separate files per subtasks. Our question is whether there is a way to have one 
file not multiple files.


On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> wrote:
Hi Kishore,

Having followed this thread for a while it is still quite a bit of confusion of 
concepts and in order to help resolve your original we would need to know,

  *   what makes your observation a problem to be solved?
  *   You write, you have no shuffling, does that mean you don’t use any 
keyBy(), or rebalance()?
  *   How do you determine that there are 7 checkpoint, one for each operator?
  *   In general please relate a bit more details about how you configure state 
primitives: kinds/also operator state?/on all operators/etc.

In general (as Weijie told) checkpointing works like that (simplified):

  *   Jobmanager creates checkpoint mark/barrier in a configured interval
  *   For synchronous checkpointing this flows along with the events through 
the chain of tasks
  *   For asynchronous checkpointing, the checkpointing marker is directly sent 
to the subtasks
  *   A single checkpoint looks like that:

     *   Each state primitive/per subtask stores state into a separate file
     *   At the end jobmager writes a “_metadata” file for the checkpoint 
metadata and for state that is too small to end up in a separate file
     *   i.e. each checkpoint generates only one checkpoint (multiple files) 
not 7

Hope we shed a little light on this

Best regards


From: Kishore Pola <kishore.p...@hotmail.com<mailto:kishore.p...@hotmail.com>>
Sent: Thursday, February 2, 2023 4:12 AM
To: weijie guo <guoweijieres...@gmail.com<mailto:guoweijieres...@gmail.com>>; 
Talat Uyarer <tuya...@paloaltonetworks.com<mailto:tuya...@paloaltonetworks.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Weijie,

In our case we do have 7 operators. All the 7 operators are getting executed as 
one chain within a single StreamTask. As checkpoint barrier is passing through 
all the operators, there are 7 checkpoints being stored. So our checkpoint size 
is up by 7 times. We are investigating to see if we can checkpoint the start 
operator (kafka source) or end operator (BQ sink), we are good and check point 
size comes down. Hence the question, when the operators are executed in the 
same StreamTask as one chain, is it possible to checkpoint at operator chain or 
single operator level?


From: weijie guo <guoweijieres...@gmail.com<mailto:guoweijieres...@gmail.com>>
Sent: Wednesday, February 1, 2023 6:59 PM
To: Talat Uyarer 
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Talat,

In Flink, a checkpoint barrier will be injected from source, and then pass 
through all operators in turn. Each stateful operator will do checkpoint in 
this process, the state is managed at operator granularity, not operator chain. 
So what is the significance of checkpoint based on the granularity of operator 

Best regards,


Talat Uyarer 
于2023年2月2日周四 02:20写道:
Hi Weijie,

Thanks for replying back.

Our job is  a streaming job. The OperatorChain contains all operators that are 
executed as one chain within a single StreamTask. But each operator creates 
their own checkpoint at checkpointing time . Rather than creating a checkpoint 
per operator in checkpointing time. Can I have one checkpoint per 
OperatorChain? This is my question.


On Wed, Feb 1, 2023 at 1:02 AM weijie guo 
<guoweijieres...@gmail.com<mailto:guoweijieres...@gmail.com>> wrote:
Hi Talat,

Can you elaborate on what it means to create one checkpoint object per chain 
operator more than all operators? If you mean to do checkpoint independently 
for each task, this is not supported.

Best regards,


Talat Uyarer via user <user@flink.apache.org<mailto:user@flink.apache.org>> 
于2023年2月1日周三 15:34写道:

We have a job that is reading from kafka and writing some endpoints. The job 
does not have any shuffling steps.  I implement it with multiple steps.  Flink 
chained those operators in one operator in submission time. However I see all 
operators are doing checkpointing.

Is there any way to create one checkpoint object per chain operator rather than 
all operators ?

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to