Hi Jason, Fine by me; I wanted to be conservative with the return type but the case you've outlined sounds enticing enough that adding a little flexibility to the API seems warranted. I've added your suggestion to the proposed admin API expansions; let me know what you think.
Cheers, Chris On Mon, Feb 1, 2021 at 3:38 PM Jason Gustafson <ja...@confluent.io> wrote: > Hi Chris, > > If we add the new `fenceProducers` admin API, can we return the information > from the `InitProducerId` response (i.e. producer id and epoch)? We may not > have a use case for it yet, but I don't see any harm exposing it for the > future. For example, we could allow this state to be provided to the > Producer instance on initialization, which would save the need for the > second `InitProducerId` request in the current proposal. Also, the `Void` > type does give us much room for extension. > > -Jason > > > On Mon, Jan 25, 2021 at 7:29 AM Chris Egerton <chr...@confluent.io> wrote: > > > Hi Ning, > > > > Apologies for the delay in response. I realized after publishing the KIP > > that there were some finer points I hadn't considered in my design and > that > > it was far from providing exactly-once guarantees. In response to your > > questions: > > > > 1) The goal of the KIP is to ensure the accuracy of the offsets that the > > framework provides to source tasks; if tasks choose to manage offsets > > outside of the framework, they're on their own. So, the source records > and > > their offsets will be written/committed to Kafka, and the task will be > > provided them on startup, but it (or really, its predecessor) may not > have > > had time to do cleanup on resources associated with those records before > > being killed. > > > > 2) I've cleaned up this section and removed the pseudocode as it seems > too > > low-level to be worth discussing in a KIP. I'll try to summarize here, > > though: task.commit() is not what causes offsets provided to the > framework > > by tasks to be committed; it's simply a follow-up hook provided out of > > convenience to tasks so that they can clean up resources associated with > > the most recent batch of records (by ack'ing JMS messages, for example). > > The Connect framework uses an internal Kafka topic to store source task > > offsets. > > > > 3) In order to benefit from the improvements proposed in this KIP, yes, > the > > single source-of-truth should be the OffsetStorageReader provided to the > > task by the Connect framework, at least at startup. After startup, tasks > > should ideally bookkeep their own offset progress as each request to read > > offsets requires a read to the end of the offsets topic, which can be > > expensive in some cases. > > > > I've since expanded the KIP to include general exactly-once support for > > source connectors that should cover the points I neglected in my initial > > design, so it should be ready for review again. > > > > Cheers, > > > > Chris > > > > On Mon, Jul 27, 2020 at 11:42 PM Ning Zhang <ning2008w...@gmail.com> > > wrote: > > > > > Hello Chris, > > > > > > That is an interesting KIP. I have a couple of questions: > > > > > > (1) in section of pseudo-code, what if the failure happens between 4(b) > > > and 5(a), meaning after the producer commit the transaction, and before > > > task.commitRecord(). > > > > > > (2) in section "source task life time", what is the difference between > > > "commit offset" and "offsets to commit"? Given that the offset storage > > can > > > be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could > only > > > produce to a kafka topic, are / is the topic(s) the same ? (the topic > > that > > > producer writes offsets to and the topic task.commit() to) > > > > > > (3) for JDBC source task, it relies on `context.offsetStorageReader()` > ( > > > > > > https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140 > > ) > > > to retrieve the previously committed offset (if from a fresh start or > > > resume from failure). so it seems that the single-source-of-truth of > > where > > > to consume from last known / committed position stored in offset > storage > > > (e.g. kafka topic) managed by the periodic task.commit()? > > > > > > On 2020/05/22 06:20:51, Chris Egerton <chr...@confluent.io> wrote: > > > > Hi all, > > > > > > > > I know it's a busy time with the upcoming 2.6 release and I don't > > expect > > > > this to get a lot of traction until that's done, but I've published a > > KIP > > > > for allowing atomic commit of offsets and records for source > connectors > > > and > > > > would appreciate your feedback: > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets > > > > > > > > This feature should make it possible to implement source connectors > > with > > > > exactly-once delivery guarantees, and even allow a wide range of > > existing > > > > source connectors to provide exactly-once delivery guarantees with no > > > > changes required. > > > > > > > > Cheers, > > > > > > > > Chris > > > > > > > > > >