Hi Schwalbe, weijie,

Thanks for your reply.


>    - Each state primitive/per subtask stores state into a separate file
>
>
In this picture You can see Operator Chain
https://nightlies.apache.org/flink/flink-docs-master/fig/tasks_chains.svg

Source and Map are in the same chain. Today Flink creates two files for
that operator chain. When we have OperatorChain, All subtasks are running
in the same machine, same thread for memory optimization.  However Flink
creates separate files per subtasks. Our question is whether there is a way
to have one file not multiple files.

Thanks



On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Kishore,
>
>
>
>
>
> Having followed this thread for a while it is still quite a bit of
> confusion of concepts and in order to help resolve your original we would
> need to know,
>
>    - *what makes your observation a problem to be solved?*
>    - You write, you have no shuffling, does that mean you don’t use any
>    keyBy(), or rebalance()?
>    - How do you determine that there are 7 checkpoint, one for each
>    operator?
>    - In general please relate a bit more details about how you configure
>    state primitives: kinds/also operator state?/on all operators/etc.
>
>
>
> In general (as Weijie told) checkpointing works like that (simplified):
>
>    - Jobmanager creates checkpoint mark/barrier in a configured interval
>    - For synchronous checkpointing this flows along with the events
>    through the chain of tasks
>    - For asynchronous checkpointing, the checkpointing marker is directly
>    sent to the subtasks
>    - A single checkpoint looks like that:
>       - Each state primitive/per subtask stores state into a separate file
>       - At the end jobmager writes a “_metadata” file for the checkpoint
>       metadata and for state that is too small to end up in a separate file
>       - i.e. each checkpoint generates only one checkpoint (multiple
>       files) not 7
>
>
>
> Hope we shed a little light on this
>
>
>
> Best regards
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Kishore Pola <kishore.p...@hotmail.com>
> *Sent:* Thursday, February 2, 2023 4:12 AM
> *To:* weijie guo <guoweijieres...@gmail.com>; Talat Uyarer <
> tuya...@paloaltonetworks.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> Hi Weijie,
>
>
>
> In our case we do have 7 operators. All the 7 operators are getting
> executed as one chain within a single StreamTask. As checkpoint barrier is
> passing through all the operators, there are 7 checkpoints being stored. So
> our checkpoint size is up by 7 times. We are investigating to see if we can
> checkpoint the start operator (kafka source) or end operator (BQ sink), we
> are good and check point size comes down. Hence the question, when the
> operators are executed in the same StreamTask as one chain, is it possible
> to checkpoint at operator chain or single operator level?
>
>
>
> Thanks,
>
> Kishore
>
>
> ------------------------------
>
> *From:* weijie guo <guoweijieres...@gmail.com>
> *Sent:* Wednesday, February 1, 2023 6:59 PM
> *To:* Talat Uyarer <tuya...@paloaltonetworks.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> Hi Talat,
>
>
>
> In Flink, a checkpoint barrier will be injected from source, and then pass
> through all operators in turn. Each stateful operator will do checkpoint in
> this process, the state is managed at operator granularity, not operator
> chain. So what is the significance of checkpoint based on the granularity
> of operator chain?
>
>
>
> Best regards,
>
> Weijie
>
>
>
>
>
> Talat Uyarer <tuya...@paloaltonetworks.com> 于2023年2月2日周四 02:20写道:
>
> Hi Weijie,
>
>
>
> Thanks for replying back.
>
>
>
> Our job is  a streaming job. The OperatorChain contains all operators that
> are executed as one chain within a single StreamTask. But each
> operator creates their own checkpoint at checkpointing time . Rather than
> creating a checkpoint per operator in checkpointing time. Can I have one
> checkpoint per OperatorChain? This is my question.
>
>
>
> Thanks
>
>
>
> On Wed, Feb 1, 2023 at 1:02 AM weijie guo <guoweijieres...@gmail.com>
> wrote:
>
> Hi Talat,
>
>
>
> Can you elaborate on what it means to create one checkpoint object per
> chain operator more than all operators? If you mean to do checkpoint
> independently for each task, this is not supported.
>
>
>
> Best regards,
>
> Weijie
>
>
>
>
>
> Talat Uyarer via user <user@flink.apache.org> 于2023年2月1日周三 15:34写道:
>
> Hi,
>
>
>
> We have a job that is reading from kafka and writing some endpoints. The
> job does not have any shuffling steps.  I implement it with multiple
> steps.  Flink chained those operators in one operator in submission time.
> However I see all operators are doing checkpointing.
>
>
>
> Is there any way to create one checkpoint object per chain operator rather
> than all operators ?
>
>
>
> Thanks
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to