Hi, I've read the guide below, and filed up a PR: https://github.com/apache/kafka/pull/5809 Started without creating a JIRA ticket.
https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes Thank you, Tomoyuki On Wed, Oct 17, 2018 at 9:19 AM Tomoyuki Saito <aocch...@gmail.com> wrote: > Hi, > > > Would you like to contribute a PR? > > Yes! Sounds great. > > Should I file a JIRA ticket first? > > Tomoyuki > > > > > On Wed, Oct 17, 2018 at 12:19 AM Guozhang Wang <wangg...@gmail.com> wrote: > >> I think we should not allow negative values, and today it seems that this >> is not checked against. >> >> In fact, it should be a one-liner fix in the `config.define` function call >> to constraint its possible value range. Would you like to contribute a PR? >> >> >> Guozhang >> >> >> On Fri, Oct 12, 2018 at 10:56 PM Tomoyuki Saito <aocch...@gmail.com> >> wrote: >> >> > Hello Guozhang, >> > >> > Thank you for your reply. >> > >> > > setting to "0" will actually mean to commit every time. >> > >> > Hum, I somehow misunderstood the code. Now I understand that is true. >> > >> > > You should actually set it to Long.MAX_VALUE to indicate "not commit >> > regularly by intervals" >> > >> > I see. I'd consider taking that approach! >> > >> > >> > Another question just from curiosity: >> > >> > From looking at code, setting `commit.interval.ms` to a negative value >> can >> > also indicate "not commit regularly by intervals." In other words, it >> has >> > the same effect as setting it to Long.MAX_VALUE. >> > Is this true? >> > >> > The implication of setting the property to a negative value is >> > undocumented, so I'm thinking setting it to Long.MAX_VALUE is >> preferable. >> > >> > Thank you. >> > Tomoyuki >> > >> > On Sat, Oct 13, 2018 at 1:48 AM Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > > Hello Tomoyuki, >> > > >> > > 1. Seems a good use case for Streams. >> > > 2. You should actually set it to Long.MAX_VALUE to indicate "not >> commit >> > > regularly by intervals", setting to "0" will actually mean to commit >> > every >> > > time. Then you can leverage on ProcessorContext.commit() to manually >> > commit >> > > after the batch is done. >> > > >> > > >> > > Guozhang >> > > >> > > >> > > >> > > >> > > On Wed, Oct 10, 2018 at 11:15 PM, Tomoyuki Saito <aocch...@gmail.com> >> > > wrote: >> > > >> > > > Hi, >> > > > >> > > > I'm exploring whether it is possible to use Kafka Streams for batch >> > > > processing with at-least-once semantics. >> > > > >> > > > What I want to do is to insert records in an external storage in >> bulk, >> > > and >> > > > execute offset-commit after the bulk insertion to achieve >> at-least-once >> > > > semantics. >> > > > A processing topology can be very simple like: >> > > > ``` >> > > > TopologyBuilder builder = new TopologyBuilder(); >> > > > builder.addSource("source", "topic"); >> > > > builder.addProcessor("processor", processorSupplier, "source"); >> > > > new KafkaStreams(builder, streamsConfig); >> > > > ``` >> > > > >> > > > My questions are: >> > > > >> > > > 1. Could you suggest how to achieve that? Can it be better to use >> > > > KafkaConsumer instead of KafkaStreams? >> > > > >> > > > 2. From my understanding, when setting StreamsConfig ` >> > commit.interval.ms >> > > ` >> > > > to 0, we can turn off offset-commit by KafkaStreams internal logic >> (in >> > > > StreamThread#maybeCommit), and control when to commit offsets with >> > > > `ProcessorContext#commit`. Is my understanding right? Any expected >> > issues >> > > > for this approach? >> > > > >> > > > Thank you, >> > > > Tomoyuki >> > > > >> > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> >> >> -- >> -- Guozhang >> >