Please see my answers inline. -Max
On 10.03.20 05:02, Eleanore Jin wrote: > Hi Max, > > Thanks for the response! the reason to setup the state backend is to > experiment Kafka EOS with Beam running on Flink. Reading through the > code and this PR <https://github.com/apache/beam/pull/7991/files>, can > you please help me clarify my understanding? > > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve > EOS, ExactlyOnceWriter processElement method is annotated > with @RequiresStableInput, so all the messages will be cached > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those > messages will be processed by ExactlyOnceWriter? That's correct. > > 2. Upon checkpoint, will those messages cached by > KeyedBufferingEleementsHandler also checkpointed? Yes, the buffered elements will be checkpointed. > 3. It seems the way Beam provides Kafka EOS will introduce delays in the > stream processing, the delay is based on the checkpoint interval? How to > reduce the latency while still have EOS guarantee? Indeed, the checkpoint interval and the checkpoint duration limits the latency. Given the current design and the guarantees, there is no other way to influence the latency. > 4. commitOffsetsInFinalize is also enabled, does this mean, upon > checkpoint successfully, the checkpointed offset will be committed back > to kafka, but if this operation does not finish successfully, and then > the job gets cancelled/stopped, and re-submit the job again (with the > same consumer group for source topics, but different jobID), then it is > possible duplicated processing still exists? because the consumed offset > is not committed back to kafka? This option is for the Kafka consumer. AFAIK this is just a convenience method to commit the latest checkpointed offset to Kafka. This offset is not used when restoring from a checkpoint. However, if you don't restore from a checkpoint, you can resume from that offset which might be convenient or not, depending on your use case. > > Thanks a lot! > Eleanore > > On Thu, Mar 5, 2020 at 12:46 AM Maximilian Michels <[email protected] > <mailto:[email protected]>> wrote: > > Hi Eleanore, > > Good question. I think the easiest way is to configure this in the > Flink > configuration file, i.e. flink-conf.yaml. Then you don't need to set > anything in Beam. > > If you want to go with your approach, then just use > getClass().getClassLoader() unless you have some custom classloader for > loading your state backend. > > Cheers, > Max > > On 04.03.20 01:39, Jin Yi wrote: > > Hi Experts, > > > > I am running Beam application with Flink Runner. I would like to set > > State Backend to be FsStateBackend instead of MemoryStateBackend. > > > > in FlinkPipelineOptions.java > > > > I should be able to call setStateBackendFactory(), but I did not find > > any provided implementations for FlinkStateBackendFactory > interface, so > > that means I have to implement my own? > > > > Thanks a lot! > > Eleanore > > > > /** > > * State backend to store Beam's state during computation. Note: Only > > applicable when executing in > > * streaming mode. > > */ > > @Description( > > "Sets the state backend factory to use in streaming mode. " > > +"Defaults to the flink cluster's state.backend configuration.") > > Class<?extends FlinkStateBackendFactory> getStateBackendFactory(); > > > > void setStateBackendFactory(Class<?extends > FlinkStateBackendFactory> stateBackendFactory); > > >
