Hi Max,

Thanks for the response! the reason to setup the state backend is to
experiment Kafka EOS with Beam running on Flink.  Reading through the code
and this PR <https://github.com/apache/beam/pull/7991/files>, can you
please help me clarify my understanding?

1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
EOS, ExactlyOnceWriter processElement method is annotated
with @RequiresStableInput, so all the messages will be cached by
KeyedBufferingElementsHandler,
only after checkpoint succeeds, those messages will be processed by
ExactlyOnceWriter?

2. Upon checkpoint, will those messages cached by
KeyedBufferingEleementsHandler also checkpointed?

3. It seems the way Beam provides Kafka EOS will introduce delays in the
stream processing, the delay is based on the checkpoint interval? How to
reduce the latency while still have EOS guarantee?

4. commitOffsetsInFinalize is also enabled, does this mean, upon checkpoint
successfully, the checkpointed offset will be committed back to kafka, but
if this operation does not finish successfully, and then the job gets
cancelled/stopped, and re-submit the job again (with the same consumer
group for source topics, but different jobID), then it is possible
duplicated processing still exists? because the consumed offset is not
committed back to kafka?

Thanks a lot!
Eleanore

On Thu, Mar 5, 2020 at 12:46 AM Maximilian Michels <[email protected]> wrote:

> Hi Eleanore,
>
> Good question. I think the easiest way is to configure this in the Flink
> configuration file, i.e. flink-conf.yaml. Then you don't need to set
> anything in Beam.
>
> If you want to go with your approach, then just use
> getClass().getClassLoader() unless you have some custom classloader for
> loading your state backend.
>
> Cheers,
> Max
>
> On 04.03.20 01:39, Jin Yi wrote:
> > Hi Experts,
> >
> > I am running Beam application with Flink Runner. I would like to set
> > State Backend to be FsStateBackend instead of MemoryStateBackend.
> >
> > in FlinkPipelineOptions.java
> >
> > I should be able to call setStateBackendFactory(), but I did not find
> > any provided implementations for FlinkStateBackendFactory interface, so
> > that means I have to implement my own?
> >
> > Thanks a lot!
> > Eleanore
> >
> > /**
> > * State backend to store Beam's state during computation. Note: Only
> > applicable when executing in
> > * streaming mode.
> > */
> > @Description(
> >      "Sets the state backend factory to use in streaming mode. "
> > +"Defaults to the flink cluster's state.backend configuration.")
> > Class<?extends FlinkStateBackendFactory> getStateBackendFactory();
> >
> > void setStateBackendFactory(Class<?extends FlinkStateBackendFactory>
> stateBackendFactory);
> >
>

Reply via email to