Status: this KIP has 2 binding +1 votes, but has not yet passed. :-(

I'm very interested in getting this into AK 2.6, but have another
suggestion to make on the KIP. The current KIP is great and allows
connectors using the latest Connect API to get access to the offset reader.
Unfortunately, this will make it difficult for connectors that want to
maintain compatibility with older versions of the Connect API without
jumping through a lot of hoops to avoid method not found exceptions or
class not found exceptions.

Here's my idea: in addition to the changes proposed on the KIP, we also
make the following changes to make it easier for connectors to use the
OffsetStorageReader only when the Connect version supports it:

   1. Define a protected
   `SourceConnector.withOffsetStorageReader(OffsetStorageReader reader)`
   method that by default does nothing.
   2. Override the two `Connector.initialize(...)` methods in
   `SourceConnector` to call super and to call the
   `withOffsetStorageReader(...)` method using the context's offset storage
   reader.

Any connector that wants to depend on the Connect API when this KIP is
introduced (e.g., maybe AK 2.6) can do that and can get the
OffsetStorageReader from the context. However, any connector that wants to
be able to be deployed on earlier versions of Connect can override the
`withOffsetStorageReader(...)` method; if the connector is deployed to
pre-KIP Connect versions the method will not be called, but if the
connector is deployed to post-KIP Connect versions the connector will have
an OffsetStorageReader it can use. No complications from having to use
reflection (or other tricks) to cast the connector context to a
`SourceConnectorContext` (which may not exist) and then call the
`OffsetStorageReader`.

Thoughts?

Randall

On Mon, Feb 25, 2019 at 9:36 AM Randall Hauch <rha...@gmail.com> wrote:

> +1 (binding)
>
> On Mon, Feb 25, 2019 at 4:32 AM Florian Hussonnois <fhussonn...@gmail.com>
> wrote:
>
>> Hi Kafka Team,
>>
>> I'd like to bring this thread back at the top of the email stack to get a
>> chance to see this KIP merge in the next minor/major release.
>>
>> Thanks.
>>
>> Le ven. 18 janv. 2019 à 01:20, Florian Hussonnois <fhussonn...@gmail.com>
>> a
>> écrit :
>>
>> > Hey folks,
>> >
>> > This KIP has start since a while but has never been merged.
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-131+-+Add+access+to+OffsetStorageReader+from+SourceConnector
>> > Would it be possible to restart the vote ? I still think this KIP could
>> be
>> > useful to implement connectors (the PR has been rebased)
>> >
>> > Thanks,
>> >
>> > Le ven. 22 sept. 2017 à 09:36, Florian Hussonnois <
>> fhussonn...@gmail.com>
>> > a écrit :
>> >
>> >> Hi team,
>> >>
>> >> Are there any more votes  ? Thanks
>> >>
>> >> Le 12 sept. 2017 20:18, "Gwen Shapira" <g...@confluent.io> a écrit :
>> >>
>> >>> Thanks for clarifying.
>> >>>
>> >>> +1 again :)
>> >>>
>> >>>
>> >>>
>> >>> On Sat, Sep 9, 2017 at 6:57 AM Randall Hauch <rha...@gmail.com>
>> wrote:
>> >>>
>> >>> > Gwen,
>> >>> >
>> >>> > I've had more time to look into the code. First, the
>> >>> OffsetStorageReader
>> >>> > JavaDoc says: "OffsetStorageReader provides access to the offset
>> >>> storage
>> >>> > used by sources. This can be used by connectors to determine
>> offsets to
>> >>> > start consuming data from. This is most commonly used during
>> >>> initialization
>> >>> > of a task, but can also be used during runtime, e.g. when
>> >>> reconfiguring a
>> >>> > task."
>> >>> >
>> >>> > Second, this KIP allows the SourceConnector implementations to
>> access
>> >>> the
>> >>> > OffsetStorageReader, but really the SourceConnector methods are
>> *only*
>> >>> > related to lifecycle changes when using the OffsetStorageReader
>> would
>> >>> be
>> >>> > appropriate per the comment above.
>> >>> >
>> >>> > In summary, I don't think there is any concern about the
>> >>> > OffsetStorageReader being used inappropriate by the SourceConnector
>> >>> > implementations.
>> >>> >
>> >>> > Randall
>> >>> >
>> >>> > On Fri, Sep 8, 2017 at 9:46 AM, Gwen Shapira <g...@confluent.io>
>> >>> wrote:
>> >>> >
>> >>> > > Basically, you are saying that the part where the comment says:
>> >>> "Offset
>> >>> > > data should only be read during startup or reconfiguration of a
>> >>> task."
>> >>> > > is incorrect? because the API extension allows reading offset data
>> >>> at any
>> >>> > > point in the lifecycle, right?
>> >>> > >
>> >>> > > On Fri, Sep 8, 2017 at 5:18 AM Florian Hussonnois <
>> >>> fhussonn...@gmail.com
>> >>> > >
>> >>> > > wrote:
>> >>> > >
>> >>> > > > Hi Shapira,
>> >>> > > >
>> >>> > > > We only expose the OffsetStorageReader to connector which
>> relies on
>> >>> > > > KafkaOffsetBackingStore. The store continuesly consumes offsets
>> >>> from
>> >>> > > kafka
>> >>> > > > so I think we can't have stale data.
>> >>> > > >
>> >>> > > >
>> >>> > > > Le 8 sept. 2017 06:13, "Randall Hauch" <rha...@gmail.com> a
>> écrit
>> >>> :
>> >>> > > >
>> >>> > > > > The KIP and PR expose the OffsetStorageReader, which is
>> already
>> >>> > exposed
>> >>> > > > to
>> >>> > > > > the tasks. The OffsetStorageWriter is part of the
>> >>> implementation, but
>> >>> > > was
>> >>> > > > > not and is not exposed thru the API.
>> >>> > > > >
>> >>> > > > > > On Sep 7, 2017, at 9:04 PM, Gwen Shapira <g...@confluent.io
>> >
>> >>> > wrote:
>> >>> > > > > >
>> >>> > > > > > I just re-read the code for the OffsetStorageWriter, and ran
>> >>> into
>> >>> > > this
>> >>> > > > > > comment:
>> >>> > > > > >
>> >>> > > > > > * Note that this only provides write functionality. This is
>> >>> > > > > > intentional to ensure stale data is
>> >>> > > > > > * never read. Offset data should only be read during
>> startup or
>> >>> > > > > > reconfiguration of a task. By
>> >>> > > > > > * always serving those requests by reading the values from
>> the
>> >>> > > backing
>> >>> > > > > > store, we ensure we never
>> >>> > > > > > * accidentally use stale data. (One example of how this can
>> >>> occur:
>> >>> > a
>> >>> > > > > > task is processing input
>> >>> > > > > > * partition A, writing offsets; reconfiguration causes
>> >>> partition A
>> >>> > to
>> >>> > > > > > be reassigned elsewhere;
>> >>> > > > > > * reconfiguration causes partition A to be reassigned to
>> this
>> >>> node,
>> >>> > > > > > but now the offset data is out
>> >>> > > > > > * of date). Since these offsets are created and managed by
>> the
>> >>> > > > > > connector itself, there's no way
>> >>> > > > > > * for the offset management layer to know which keys are
>> >>> "owned" by
>> >>> > > > > > which tasks at any given
>> >>> > > > > > * time.
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > > I can't figure out how the KIP avoids the stale-reads
>> problem
>> >>> > > explained
>> >>> > > > > here.
>> >>> > > > > >
>> >>> > > > > > Can you talk me through it? I'm cancelling my vote since
>> right
>> >>> now
>> >>> > > > > > exposing this interface sounds risky and misleading.
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > > Gwen
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > >> On Thu, Sep 7, 2017 at 5:04 PM Gwen Shapira <
>> >>> g...@confluent.io>
>> >>> > > > wrote:
>> >>> > > > > >>
>> >>> > > > > >> +1 (binding)
>> >>> > > > > >>
>> >>> > > > > >> Looking forward to see how connector implementations use
>> this
>> >>> in
>> >>> > > > > practice
>> >>> > > > > >> :)
>> >>> > > > > >>
>> >>> > > > > >>> On Thu, Sep 7, 2017 at 3:49 PM Randall Hauch <
>> >>> rha...@gmail.com>
>> >>> > > > wrote:
>> >>> > > > > >>>
>> >>> > > > > >>> I'd like to open the vote for KIP-131:
>> >>> > > > > >>>
>> >>> > > > > >>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-131+-+
>> >>> > > > > Add+access+to+OffsetStorageReader+from+SourceConnector
>> >>> > > > > >>>
>> >>> > > > > >>> Thanks to Florian for submitting the KIP and the
>> >>> implementation,
>> >>> > > and
>> >>> > > > to
>> >>> > > > > >>> everyone else that helped review.
>> >>> > > > > >>>
>> >>> > > > > >>> Best regards,
>> >>> > > > > >>>
>> >>> > > > > >>> Randall
>> >>> > > > > >>>
>> >>> > > > > >>
>> >>> > > > >
>> >>> > > >
>> >>> > >
>> >>> >
>> >>>
>> >>
>> >
>> > --
>> > Florian HUSSONNOIS
>> >
>>
>>
>> --
>> Florian HUSSONNOIS
>>
>

Reply via email to