Hi Max, Thanks for the response! the reason to setup the state backend is to experiment Kafka EOS with Beam running on Flink. Reading through the code and this PR <https://github.com/apache/beam/pull/7991/files>, can you please help me clarify my understanding?
1. Beam uses KafkaExactlyOnceSink to publish messages to achieve EOS, ExactlyOnceWriter processElement method is annotated with @RequiresStableInput, so all the messages will be cached by KeyedBufferingElementsHandler, only after checkpoint succeeds, those messages will be processed by ExactlyOnceWriter? 2. Upon checkpoint, will those messages cached by KeyedBufferingEleementsHandler also checkpointed? 3. It seems the way Beam provides Kafka EOS will introduce delays in the stream processing, the delay is based on the checkpoint interval? How to reduce the latency while still have EOS guarantee? 4. commitOffsetsInFinalize is also enabled, does this mean, upon checkpoint successfully, the checkpointed offset will be committed back to kafka, but if this operation does not finish successfully, and then the job gets cancelled/stopped, and re-submit the job again (with the same consumer group for source topics, but different jobID), then it is possible duplicated processing still exists? because the consumed offset is not committed back to kafka? Thanks a lot! Eleanore On Thu, Mar 5, 2020 at 12:46 AM Maximilian Michels <[email protected]> wrote: > Hi Eleanore, > > Good question. I think the easiest way is to configure this in the Flink > configuration file, i.e. flink-conf.yaml. Then you don't need to set > anything in Beam. > > If you want to go with your approach, then just use > getClass().getClassLoader() unless you have some custom classloader for > loading your state backend. > > Cheers, > Max > > On 04.03.20 01:39, Jin Yi wrote: > > Hi Experts, > > > > I am running Beam application with Flink Runner. I would like to set > > State Backend to be FsStateBackend instead of MemoryStateBackend. > > > > in FlinkPipelineOptions.java > > > > I should be able to call setStateBackendFactory(), but I did not find > > any provided implementations for FlinkStateBackendFactory interface, so > > that means I have to implement my own? > > > > Thanks a lot! > > Eleanore > > > > /** > > * State backend to store Beam's state during computation. Note: Only > > applicable when executing in > > * streaming mode. > > */ > > @Description( > > "Sets the state backend factory to use in streaming mode. " > > +"Defaults to the flink cluster's state.backend configuration.") > > Class<?extends FlinkStateBackendFactory> getStateBackendFactory(); > > > > void setStateBackendFactory(Class<?extends FlinkStateBackendFactory> > stateBackendFactory); > > >
