Hi Talat Uyarer,
* There is no way to have only one file unless you lower the parallelism to 1 (= only one subtask) * So which files do you see: 1 “_metadata” + multiple data files (or just one)? * The idea of having multiple files is to allow multiple threads to be able to stare checkpoints at the same time, and when restarting from a checkpoint to consume from more files potentially distributed to multiple physical hard driver (more I/O capacity) * So in general it is good to have multiple files Still (out of curiosity) why would you want to have everything in a single file? Sincere greetings Thias From: Talat Uyarer <tuya...@paloaltonetworks.com> Sent: Thursday, February 2, 2023 5:57 PM To: Schwalbe Matthias <matthias.schwa...@viseca.ch> Cc: Kishore Pola <kishore.p...@hotmail.com>; weijie guo <guoweijieres...@gmail.com>; user@flink.apache.org Subject: Re: Reducing Checkpoint Count for Chain Operator ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hi Schwalbe, weijie, Thanks for your reply. * Each state primitive/per subtask stores state into a separate file In this picture You can see Operator Chain https://nightlies.apache.org/flink/flink-docs-master/fig/tasks_chains.svg Source and Map are in the same chain. Today Flink creates two files for that operator chain. When we have OperatorChain, All subtasks are running in the same machine, same thread for memory optimization. However Flink creates separate files per subtasks. Our question is whether there is a way to have one file not multiple files. Thanks On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias <matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> wrote: Hi Kishore, Having followed this thread for a while it is still quite a bit of confusion of concepts and in order to help resolve your original we would need to know, * what makes your observation a problem to be solved? * You write, you have no shuffling, does that mean you don’t use any keyBy(), or rebalance()? * How do you determine that there are 7 checkpoint, one for each operator? * In general please relate a bit more details about how you configure state primitives: kinds/also operator state?/on all operators/etc. In general (as Weijie told) checkpointing works like that (simplified): * Jobmanager creates checkpoint mark/barrier in a configured interval * For synchronous checkpointing this flows along with the events through the chain of tasks * For asynchronous checkpointing, the checkpointing marker is directly sent to the subtasks * A single checkpoint looks like that: * Each state primitive/per subtask stores state into a separate file * At the end jobmager writes a “_metadata” file for the checkpoint metadata and for state that is too small to end up in a separate file * i.e. each checkpoint generates only one checkpoint (multiple files) not 7 Hope we shed a little light on this Best regards Thias From: Kishore Pola <kishore.p...@hotmail.com<mailto:kishore.p...@hotmail.com>> Sent: Thursday, February 2, 2023 4:12 AM To: weijie guo <guoweijieres...@gmail.com<mailto:guoweijieres...@gmail.com>>; Talat Uyarer <tuya...@paloaltonetworks.com<mailto:tuya...@paloaltonetworks.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Reducing Checkpoint Count for Chain Operator Hi Weijie, In our case we do have 7 operators. All the 7 operators are getting executed as one chain within a single StreamTask. As checkpoint barrier is passing through all the operators, there are 7 checkpoints being stored. So our checkpoint size is up by 7 times. We are investigating to see if we can checkpoint the start operator (kafka source) or end operator (BQ sink), we are good and check point size comes down. Hence the question, when the operators are executed in the same StreamTask as one chain, is it possible to checkpoint at operator chain or single operator level? Thanks, Kishore ________________________________ From: weijie guo <guoweijieres...@gmail.com<mailto:guoweijieres...@gmail.com>> Sent: Wednesday, February 1, 2023 6:59 PM To: Talat Uyarer <tuya...@paloaltonetworks.com<mailto:tuya...@paloaltonetworks.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Reducing Checkpoint Count for Chain Operator Hi Talat, In Flink, a checkpoint barrier will be injected from source, and then pass through all operators in turn. Each stateful operator will do checkpoint in this process, the state is managed at operator granularity, not operator chain. So what is the significance of checkpoint based on the granularity of operator chain? Best regards, Weijie Talat Uyarer <tuya...@paloaltonetworks.com<mailto:tuya...@paloaltonetworks.com>> 于2023年2月2日周四 02:20写道: Hi Weijie, Thanks for replying back. Our job is a streaming job. The OperatorChain contains all operators that are executed as one chain within a single StreamTask. But each operator creates their own checkpoint at checkpointing time . Rather than creating a checkpoint per operator in checkpointing time. Can I have one checkpoint per OperatorChain? This is my question. Thanks On Wed, Feb 1, 2023 at 1:02 AM weijie guo <guoweijieres...@gmail.com<mailto:guoweijieres...@gmail.com>> wrote: Hi Talat, Can you elaborate on what it means to create one checkpoint object per chain operator more than all operators? If you mean to do checkpoint independently for each task, this is not supported. Best regards, Weijie Talat Uyarer via user <user@flink.apache.org<mailto:user@flink.apache.org>> 于2023年2月1日周三 15:34写道: Hi, We have a job that is reading from kafka and writing some endpoints. The job does not have any shuffling steps. I implement it with multiple steps. Flink chained those operators in one operator in submission time. However I see all operators are doing checkpointing. Is there any way to create one checkpoint object per chain operator rather than all operators ? Thanks Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited. Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.