Hi Max,

Thanks a lot for the clarification!

Best
Eleanore

On Wed, Mar 11, 2020 at 11:32 AM Maximilian Michels <[email protected]> wrote:

> Please see my answers inline.
>
> -Max
>
> On 10.03.20 05:02, Eleanore Jin wrote:
> > Hi Max,
> >
> > Thanks for the response! the reason to setup the state backend is to
> > experiment Kafka EOS with Beam running on Flink.  Reading through the
> > code and this PR <https://github.com/apache/beam/pull/7991/files>, can
> > you please help me clarify my understanding?
> >
> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> > EOS, ExactlyOnceWriter processElement method is annotated
> > with @RequiresStableInput, so all the messages will be cached
> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> > messages will be processed by ExactlyOnceWriter?
>
> That's correct.
>
> >
> > 2. Upon checkpoint, will those messages cached by
> > KeyedBufferingEleementsHandler also checkpointed?
>
> Yes, the buffered elements will be checkpointed.
>
> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> > stream processing, the delay is based on the checkpoint interval? How to
> > reduce the latency while still have EOS guarantee?
>
> Indeed, the checkpoint interval and the checkpoint duration limits the
> latency. Given the current design and the guarantees, there is no other
> way to influence the latency.
>
> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> > checkpoint successfully, the checkpointed offset will be committed back
> > to kafka, but if this operation does not finish successfully, and then
> > the job gets cancelled/stopped, and re-submit the job again (with the
> > same consumer group for source topics, but different jobID), then it is
> > possible duplicated processing still exists? because the consumed offset
> > is not committed back to kafka?
>
> This option is for the Kafka consumer. AFAIK this is just a convenience
> method to commit the latest checkpointed offset to Kafka. This offset is
> not used when restoring from a checkpoint. However, if you don't restore
> from a checkpoint, you can resume from that offset which might be
> convenient or not, depending on your use case.
>
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Thu, Mar 5, 2020 at 12:46 AM Maximilian Michels <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> >     Hi Eleanore,
> >
> >     Good question. I think the easiest way is to configure this in the
> >     Flink
> >     configuration file, i.e. flink-conf.yaml. Then you don't need to set
> >     anything in Beam.
> >
> >     If you want to go with your approach, then just use
> >     getClass().getClassLoader() unless you have some custom classloader
> for
> >     loading your state backend.
> >
> >     Cheers,
> >     Max
> >
> >     On 04.03.20 01:39, Jin Yi wrote:
> >     > Hi Experts,
> >     >
> >     > I am running Beam application with Flink Runner. I would like to
> set
> >     > State Backend to be FsStateBackend instead of MemoryStateBackend.
> >     >
> >     > in FlinkPipelineOptions.java
> >     >
> >     > I should be able to call setStateBackendFactory(), but I did not
> find
> >     > any provided implementations for FlinkStateBackendFactory
> >     interface, so
> >     > that means I have to implement my own?
> >     >
> >     > Thanks a lot!
> >     > Eleanore
> >     >
> >     > /**
> >     > * State backend to store Beam's state during computation. Note:
> Only
> >     > applicable when executing in
> >     > * streaming mode.
> >     > */
> >     > @Description(
> >     >      "Sets the state backend factory to use in streaming mode. "
> >     > +"Defaults to the flink cluster's state.backend configuration.")
> >     > Class<?extends FlinkStateBackendFactory> getStateBackendFactory();
> >     >
> >     > void setStateBackendFactory(Class<?extends
> >     FlinkStateBackendFactory> stateBackendFactory);
> >     >
> >
>

Reply via email to