Hey Chris! Thanks for the comments. Answers inline below:

On Fri, Jan 17, 2020 at 11:47 AM Christopher Egerton <chr...@confluent.io>
wrote:

> Hi Konstantine,
>
> Thanks for the KIP! There's been a lot of productive discussion so far so
> I'll try to keep my remarks brief.
>
> 1. As far as resetting the active topics for a connector goes, it's noted
> in the KIP that this can be done for a deleted connector. Can this also be
> done for connectors that were never created to begin with? What would the
> behavior be in this case? (Can this be clarified in the KIP?)
>

Indeed, the intention is to keep reset as an independent and idempotent
method.
Keep in mind that a tombstone will be written to the topic if the in-memory
view (of active topics) of the worker that serves the request contains this
connector.
This should at least prevent fake reset requests from filling up the topic
with tombstone messages.



> 2. What is the motivation for the `topic.tracking.allow.reset` config? Is
> there any anticipated case where it would be useful to have topic tracking
> enabled but with resets disabled? We could easily add this configuration
> later if a use case arises, but if we add it now it'll be difficult to
> remove.
>

The motivation is for operators of a Connect cluster to be able to disable
resetting the history of active topics altogether, while allowing at the
same time to view the active sets.


> 3. Nit - the JSON formatting in the value format/value example columns
> under the "Format of the new status record" heading is a little confusing.
> Assuming the top-level value is meant to be an object, it should be wrapped
> in braces ("{" and "}").
>

Good catch. Fixed.


> 4. The KIP focuses heavily on the use of the status topic for storage of
> connector topic information, but presumably we'd also want this information
> to be available in standalone mode. If this is the case, it'd be nice to
> tweak the language to refer explicitly to distributed mode when discussing
> the changes to the status topic and note (probably just in once place) that
> similar functionality will also be added to the standalone worker's
> in-memory status store.
>

It's true that the design is detailed w.r.t. what should happen in the
KafkaStatusBackingStore which is a Kafka-based implementation of the
StatusBackingStore interface. This is intentional because this
implementation influences and informs the semantics of topic tracking. I'd
prefer not to make the language too abstract here. A KIP is not exactly a
standard and KIPs often discuss the impact of implementation in behavior
(this KIP is a good example). But I'm happy to add a note to mention that
these semantics will apply to standalone mode too.


> 5. As far as automatic resets for sink connectors go, I agree with your
> reasoning about the inherent asymmetry between sinks and sources, and with
> the motivation to avoid confusing users by listing no-longer-consumed
> topics in the active topics for a sink connector. I think that this
> asymmetry is worth avoiding a scenario where a connector is reconfigured to
> only consume from topic "foo" but, from a prior configuration, topic "bar"
> is still listed in its active topics.
> I do want to request clarification on the meaning of the phrase "any topics
> no longer consumed" as used under the header "Restarting, reconfiguring or
> deleting a connector". Does this mean that the current set of active topics
> for the connector will be filtered and any that are longer contained in the
> sink connector's "topics" config or matched by its "topics.regex" config
> will be removed, or does it mean that all topics will be removed and then
> the active topics list will be repopulated as records are consumed from new
> topics?
>

The intention was to imply the former. But based on Randall's comment
above, I'm changing the KIP to include a reset parameter in the PUT
/connectors/{name}/config endpoint
In this case, the reset will be a complete reset for both source and sink
connectors. This will help keeping the behavior symmetric between the two
connector types.

Best,
Konstantine


> Cheers,
>
> Chris
>
> On Thu, Jan 16, 2020 at 1:51 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Thanks for new comments Randall. Following up with my replies inline
> below.
> > I'll also go ahead and update the KIP with the suggestions that are
> > outstanding right now and post a summary of the changes.
> >
> > On Wed, Jan 15, 2020 at 2:37 PM Randall Hauch <rha...@gmail.com> wrote:
> >
> > > My responses are inline:
> > >
> > > On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis <
> > > konstant...@confluent.io> wrote:
> > >
> > > > Hi Randall, Tom and Almog. I'm excited to read your comments. I'll
> > reply
> > > in
> > > > separate emails, in order.
> > > >
> > > > First, to Randall's comments, I'm replying below with a reference to
> > the
> > > > comment number:
> > > >
> > > > 1. Although I can imagine we'd be interested in adding additional
> > > metadata
> > > > in the record value, I didn't see the need for a timestamp in this
> > first
> > > > draft.
> > > > Now that you mention, the way I'd interpret a timestamp in the
> > connector
> > > > status record value would be as an approximation of since when this
> > > > connector has been using this topic.
> > > > Happy to add this if we think this info is useful. Of course,
> accuracy
> > of
> > > > this information depends on message retention in Kafka and on how
> long
> > > the
> > > > workers have been running without a restart, so this might make this
> > > > approximation less useful if it gets recomputed from time to time.
> > > > To your reference in "Recording active topics" I'll reply below,
> > because
> > > > that's Tom's question too.
> > > >
> > >
> > > Makes sense that the timestamp in the connector is the (approximate)
> time
> > > that the connector has been using the topic. I do think it's worth
> adding
> > > in the record value (not relying upon Kafka record timestamp).
> > >
> > > Regarding "message retention", by default Connect creates the status
> > topic
> > > with compaction but no deletion policy, which means infinite retention.
> > > Don't several things become problematic if finite retention is used on
> > the
> > > status topic, or do we need to worry about this for the active topic
> > > records. Do we need to periodically rewrite all of the active topic
> > > records? If so, we could just write new records using the original
> > > timestamp as originally read by the worker. If the worker does
> > periodically
> > > (maybe just on task startup) rewrite the active topic records, then
> we'd
> > > have to be sure about the semantics of and interplay with concurrent
> > > explicit "reset" calls.
> > >
> >
> > Good point. These topics are configured to have infinite retention. I'll
> > add the timestamp as type 'long'.
> >
> >
> > >
> > > >
> > > > 2. I'll explain with an example, that maybe is worth adding to the
> KIP
> > > > because what's expected to happen might not be as obvious as I
> thought
> > > when
> > > > a new topic is recorded.
> > > > Let's say we have two workers, W1 and W2, each running two worker
> tasks
> > > T11
> > > > T12 and T21 T22 respectively associated with a connector C1. All
> tasks
> > > will
> > > > run producers that will produce records to the same topic,
> > "test-topic".
> > > > When the connector starts, both workers track this connector's set of
> > > > active topics as empty. Given the absence of synchronization (that's
> > > good)
> > > > in how this information is recorded and persisted in the status
> topic,
> > > all
> > > > four tasks might race to record status messages:
> > > >
> > > > For example:
> > > >
> > > > T11, running at worker W1, will send Kafka records with:
> > > > key: topic-test-topic-connector-C1
> > > > value: "topic": {  "connector": "some-source",  "task":
> > > "some-source-TT11",
> > > >  "name": "test-topic" }
> > > >
> > > > and T22, running at worker W2, will send Kafka records with:
> > > > key: topic-test-topic-connector-C1
> > > > value: "topic": {  "connector": "some-source",  "task":
> > > "some-source-TT22",
> > > >  "name": "test-topic" }
> > > >
> > > > (similarly tasks T12 and T21 might send topic status records).
> > > >
> > > > These four records (they might not even be four but there's going to
> be
> > > at
> > > > least one) may be written in any order. Because the topic is
> compacted
> > > and
> > > > these records have the same key, eventually only one message will be
> > > > retained.
> > > > The task ID of that message will be the ID of the task that wrote
> > last. I
> > > > can see this being used mostly for troubleshooting.
> > > >
> > >
> > > Thanks for the clarification. Might be good to clarify the language a
> bit
> > > more, though I'm not convinced an example is really needed.
> > >
> >
> > I'll try to see how they both fit. Sure.
> >
> >
> > >
> > >
> > > >
> > > > 3. I believe across the whole KIP, when I'm referring to the task
> > > entity, I
> > > > imply the worker task. Not the user code that is running as
> > > implementation
> > > > of the SourceTask or SinkTask abstract classes. Didn't want to
> increase
> > > > complexity by referring to a task as worker task.
> > > > But I see your point and I'm going to prefer the terms "worker" and
> > > "worker
> > > > task" to highlight that it's the framework that is aware of this
> > feature
> > > > and not the user code.
> > > >
> > >
> > > Thank you.
> > >
> >
> >   +1
> >
> >
> > >
> > >
> > > >
> > > > 4. I assumed that absence of changes to the public API would indicate
> > > that
> > > > these interfaces/abstract classes remain unchanged. But definitely
> it's
> > > > worth to explicitly mention that.
> > > >
> > >
> > > Thanks!
> > >
> > > +1
> >
> >
> > >
> > > >
> > > > 5. That is correct. My intention is to make reset work well with the
> > > > streaming programming model. Resetting (which btw is not mandatory)
> > means
> > > > that you are cleaning the slate for a connector that is currently
> > > running,
> > > > and its currently active topics will soon be populated from scratch
> > > because
> > > > new records will be produced or consumed.
> > > > But resetting is not required. I see it more like a useful operation,
> > in
> > > > case users want to clean the active topics history, without having to
> > > > delete a connector, since delete has further implications in the
> > > > connector's progress tracking.
> > > >
> > >
> > > I do think it's worth trying to clarify in the document what happens
> when
> > > active topics are cleared for a connector that is currently running.
> > >
> >
> > Good point.
> >
> >
> > >
> > >
> > > >
> > > > 6. I fixed the typo - thanks! I'm very much in favor of preserving
> > > symmetry
> > > > between the two connector types. This has definitely more long term
> > > > benefits and may help to avoid confusion. However, the asymmetry is
> > > > inherited here by the asymmetry that exists today between source and
> > sink
> > > > connectors.
> > > > Source connector don't list topics in their configurations but sink
> > > > connectors do. So, if a user reconfigures a sink connector with a
> > > different
> > > > set of topics, if we don't reset the topics based on the new configs
> > (and
> > > > my thought here was to match the new configuration with the set of
> > active
> > > > topics), the old topics, currently not listed in the connectors
> > > > configuration, will keep showing up as active topics. The user will
> > have
> > > to
> > > > explicitly reset the active topics after reconfiguring to avoid this.
> > If
> > > > there's consensus that preserving this asymmetry is worse than having
> > to
> > > > reset the active topics, I'm happy to change this in the KIP.
> > > >
> > >
> > > Would it be easier to keep the symmetric approach (the active topics
> are
> > > cleared only explicitly) if the POST connector method supported a new
> > query
> > > parameter to reset the topics before starting (but after stopping any
> > > already running tasks)? That makes it easy to reconfigure a connector
> > > (source or sink) and atomically clear the active topics before the
> > > connector is (re)started. Without that feature, I can't just
> reconfigure
> > a
> > > running sink connector and be sure that the active topics are cleared
> > > atomically -- unless we adopt the asymmetric behavior currently in the
> > KIP.
> > >
> > > WDYT?
> > >
> >
> > I like the idea. I'll update the KIP.
> >
> >
> > > > 7. What I try to avoid here is the following situation: For some
> reason
> > > (a
> > > > sequence of failures to write tombstones to the status topic), stale
> > > topic
> > > > status records remain in that topic even after a connector has been
> > > > deleted. Requiring to restart a connector with the same name just to
> > > apply
> > > > a follow up reset of active topics doesn't seem necessary. I like the
> > > idea
> > > > of decoupling connector existence from the maintenance of the status
> > > topic.
> > > > Of course, a similar clean up is something that the workers could
> also
> > > > perform, but to avoid complexity and potential race conditions, I'm
> > > leaving
> > > > this out for the moment (it's also an implementation detail).
> > > >
> > > > 8. Indeed, a security section is warranted. I believe the main
> > > implication
> > > > is that if you are able to query a connector's status, config, etc
> you
> > > will
> > > > be able to also see its active topics. Furthermore, if you are
> > allowing a
> > > > worker task to create topics as well as produce or consume from
> topics
> > > only
> > > > via connector config overrides, leaving the worker configs without
> > > > permissions to these topics, meaning that you assign per connector
> > > > permissions and not across the board, then this feature should
> respect
> > > > this. The topics are still stored in common data structures within
> the
> > > > worker and are persisted in the status topic. But this info should
> not
> > be
> > > > leaked to anyone who's not supposed to have access to the status
> topic
> > or
> > > > the Connect REST API endpoints. To this respect I feel this feature
> > > > inherits the assumptions and security guarantees of similar
> information
> > > > already stored by the Connect framework. I'm happy to add this to a
> > > > security section, if we agree that the above cover the subject.
> > > >
> > >
> > > I think that makes sense, and it'd be great to add that in a Security
> > > section.
> > >
> >
> > I'll go ahead and add this info to a Security section.
> >
> > >
> > > > 9. I assumed that partitioning is implied by default, because there's
> > no
> > > > requirement for complete ordering of topic status records. But I'll
> add
> > > > this fact as a separate bullet. The status.storage.topic is already a
> > > > partitioned topic.
> > > >
> > >
> > > Agreed. I think it'd be sufficient to simply mention that partition
> will
> > be
> > > chosen based upon the active topic records' keys, ensuring that all
> > active
> > > topic records for the same connector will be written to the same
> > partition
> > > and will be totally ordered.
> > >
> >
> > I'm adding a bullet point to refer to partitioning for this topic. Thanks
> >
> > - Konstantine
> >
> >
> > > >
> > > > I'm following up with the rest of the comments, shortly.
> > > > Thanks,
> > > > Konstantine
> > > >
> > > >
> > > > On Wed, Jan 15, 2020 at 9:19 AM Almog Gavra <al...@confluent.io>
> > wrote:
> > > >
> > > > > Hi Konstantine,
> > > > >
> > > > > Thanks for the KIP! This is going to make automatic integration
> with
> > > > > Connect much more powerful.
> > > > >
> > > > > My thoughts are mostly around freshness of the data and being able
> to
> > > > > expose that to users. Riffing on Randall's timestamp question -
> have
> > we
> > > > > considered adding some interval at which point a connector will
> > > republish
> > > > > any topics that it encounters and update the timestamp? That way we
> > > have
> > > > > some refreshing mechanism that isn't as powerful as the complete
> > reset
> > > > > (which may not be practical in many scenarios).
> > > > >
> > > > > I also agree with Randall's other point (Would it be better to not
> > > > > automatically reset connector's active topics when a sink connector
> > is
> > > > > restarted?). I think keeping the behavior as symmetrical between
> sink
> > > and
> > > > > source connectors is a good idea.
> > > > >
> > > > > Lastly, with regards to the API, I can imagine it is also pretty
> > useful
> > > > to
> > > > > answer the inverse question: "which connectors write to topic X".
> > > Perhaps
> > > > > we can achieve this by letting the users compute it and just expose
> > an
> > > > API
> > > > > that returns the entire mapping at once (instead of needing to call
> > the
> > > > > /connectors/{name}/topics endpoint for each connector).
> > > > >
> > > > > Otherwise, looks good to me! Hits the requirements that I had in
> mind
> > > on
> > > > > the nose.
> > > > > - Almog
> > > > >
> > > > > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley <tbent...@redhat.com>
> > > wrote:
> > > > >
> > > > > > Hi Konstantine,
> > > > > >
> > > > > > Thanks for the KIP, I can see how it could be useful.
> > > > > >
> > > > > > a) Did you consider using a metric for this? I don't think it
> would
> > > > > satisfy
> > > > > > all the use cases you have in mind, but you could mention it in
> the
> > > > > > rejected alternatives.
> > > > > >
> > > > > > b) If the topic name contains the string "-connector" then the
> key
> > > > format
> > > > > > is ambiguous. This isn't necessarily fatal because the value will
> > > > > > disambiguate, but it could be misleading. Any reason not to just
> > use
> > > a
> > > > > JSON
> > > > > > key, and simplify the value?
> > > > > >
> > > > > > c) I didn't understand this part: "As soon as a worker detects
> the
> > > > > addition
> > > > > > of a topic to a connector's set of active topics, the worker will
> > > cease
> > > > > to
> > > > > > post update messages to the status.storage.topic for that
> > connector.
> > > ".
> > > > > I'm
> > > > > > sure I've overlooking something but why is this necessary? Is
> this
> > > were
> > > > > the
> > > > > > task id in the value is used?
> > > > > >
> > > > > > Thanks again,
> > > > > >
> > > > > > Tom
> > > > > >
> > > > > > On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch <rha...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Oh, one more thing:
> > > > > > >
> > > > > > > 9. There's no mention of how the status topic is partitioned,
> or
> > > how
> > > > > > > partitioning will be used by the new topic records. The KIP
> > should
> > > > > > probably
> > > > > > > outline this for clarity and completeness.
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > Randall
> > > > > > >
> > > > > > > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch <
> rha...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Thanks, Konstantine. Overall, this KIP looks interesting and
> > > really
> > > > > > > > useful, and for the most part is spot on. I do have a number
> of
> > > > > > > > questions/comments about specifics:
> > > > > > > >
> > > > > > > >    1. The topic records have a value that includes the
> > connector
> > > > > name,
> > > > > > > >    task number that last reported the topic is used, and the
> > > topic
> > > > > > name.
> > > > > > > >    There's no mention of record timestamps, but I wonder if
> > it'd
> > > be
> > > > > > > useful to
> > > > > > > >    record this. One challenge might be that a connector does
> > not
> > > > > write
> > > > > > > to a
> > > > > > > >    topic for a while or the task remains running for long
> > periods
> > > > of
> > > > > > > time and
> > > > > > > >    therefore the worker doesn't record that this topic has
> been
> > > > newly
> > > > > > > written
> > > > > > > >    to since it the task was restarted. IOW, the semantics of
> > the
> > > > > > > timestamp may
> > > > > > > >    be a bit murky. Have you thought about recording the
> > > timestamp,
> > > > > and
> > > > > > > if so
> > > > > > > >    what are the pros and cons?
> > > > > > > >    - The "Recording active topics" section says the
> following:
> > > > > > > >       "As soon as a worker detects the addition of a topic
> to a
> > > > > > > >       connector's set of active topics, all the connector's
> > tasks
> > > > > that
> > > > > > > inspect
> > > > > > > >       source or sink records will cease to post update
> messages
> > > to
> > > > > the
> > > > > > > >       status.storage.topic."
> > > > > > > >       This probably means the timestamp won't be very useful.
> > > > > > > >    2. The KIP says "the Kafka record value stores the ID of
> the
> > > > task
> > > > > > that
> > > > > > > >    succeeded to store a topic status record last." However,
> > this
> > > > is a
> > > > > > bit
> > > > > > > >    unclear: is it really storing the last task that
> > successfully
> > > > > wrote
> > > > > > > to that
> > > > > > > >    topic (as this would require very frequent writes to this
> > > > topic),
> > > > > or
> > > > > > > is it
> > > > > > > >    more that this is the task that was last *recorded* as
> > having
> > > > > > written
> > > > > > > >    to the topic? (Here, "recorded" could be a bit of a gray
> > area,
> > > > > since
> > > > > > > this
> > > > > > > >    would depend on the how the worker periodically records
> this
> > > > > > > information.)
> > > > > > > >    Any kind of clarity here might be helpful.
> > > > > > > >    3. In the "Recording active topics" section (and the
> > > surrounding
> > > > > > > >    sections), the "task" is used ambiguously. For example,
> > "when
> > > > its
> > > > > > > tasks
> > > > > > > >    start processing their first records ... these tasks will
> > > start
> > > > > > > inspecting
> > > > > > > >    which is the Kafka topic of each of these records". IIUC,
> > the
> > > > > first
> > > > > > > "task"
> > > > > > > >    mentioned is the connector's task, and the second is the
> > > > worker's
> > > > > > > task. Do
> > > > > > > >    we need to distinguish this more clearly?
> > > > > > > >    4. Maybe I missed it, but does this KIP explicitly say
> that
> > > the
> > > > > > > >    Connector API is unchanged? It's probably worth pointing
> out
> > > to
> > > > > help
> > > > > > > >    assuage any concerns that connector implementations have
> to
> > > > change
> > > > > > to
> > > > > > > make
> > > > > > > >    use of this feature.
> > > > > > > >    5. In the "Resetting a connector's set of active topics"
> > > section
> > > > > the
> > > > > > > >    behavior is not exactly clear. Consider a user running
> > > connector
> > > > > > "A",
> > > > > > > the
> > > > > > > >    connector has been fully started and is processing
> records,
> > > and
> > > > > the
> > > > > > > worker
> > > > > > > >    has recorded topic usage records. Then the user resets the
> > > > active
> > > > > > > topics
> > > > > > > >    for connector A while the connector is still running? If
> the
> > > > > > connector
> > > > > > > >    writes to no new topics, before the tasks are rebalanced
> > then
> > > is
> > > > > it
> > > > > > > correct
> > > > > > > >    that Connect would report no active topics? And after the
> > > tasks
> > > > > are
> > > > > > > >    rebalance, will the worker record any topics used by
> > connector
> > > > A?
> > > > > > > >    6. In the "Restaring" (misspelled) section:
> "Reconfiguring a
> > > > > source
> > > > > > > >    connector has also no altering effect for a source
> > connector.
> > > > > > > However, when
> > > > > > > >    reconfiguring a sink connector if the new configuration no
> > > > longer
> > > > > > > includes
> > > > > > > >    any of the previously tracked topics, these topics will be
> > > > removed
> > > > > > > from the
> > > > > > > >    set of active topics for this sink connector by appending
> > > > > tombstone
> > > > > > > >    messages appropriately after the reconfiguration of the
> > > > > connector."
> > > > > > > Would
> > > > > > > >    it be better to not automatically reset connector's active
> > > > topics
> > > > > > > when a
> > > > > > > >    sink connector is restarted? Isn't that more consistent
> with
> > > the
> > > > > > > >    "Resetting" behavior and the goals at the top of the KIP:
> > > "it'd
> > > > be
> > > > > > > useful
> > > > > > > >    for users, operators and applications to know which are
> the
> > > > topics
> > > > > > > that a
> > > > > > > >    connector has used since it was first created"?
> > > > > > > >    7. The `PUT /connectors/{name}/topics/reset` endpoint
> "this
> > > > > request
> > > > > > > >    can be reapplied after the deletion of the connector".
> IOW,
> > > even
> > > > > > > though
> > > > > > > >    connector with that name doesn't exist, we can still make
> > this
> > > > > > > request? How
> > > > > > > >    does this compare with other methods such as "status"?
> > > > > > > >    8. What are the security implications of this proposal?
> > > > > > > >
> > > > > > > > As you can see, most of these can probably be addressed
> without
> > > > much
> > > > > > > work.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > >
> > > > > > > > Randall
> > > > > > > >
> > > > > > > > On Mon, Jan 13, 2020 at 11:05 PM Konstantine Karantasis <
> > > > > > > > konstant...@confluent.io> wrote:
> > > > > > > >
> > > > > > > >> Hi all.
> > > > > > > >>
> > > > > > > >> I just posted KIP-558: Track the set of actively used topics
> > by
> > > > > > > connectors
> > > > > > > >> in Kafka Connect
> > > > > > > >>
> > > > > > > >> Wiki link here:
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect
> > > > > > > >>
> > > > > > > >> I think it's a nice extension to follow up on KIP-158 and a
> > > useful
> > > > > > > feature
> > > > > > > >> to the ever increasing number of applications that are built
> > > > around
> > > > > > > Kafka
> > > > > > > >> Connect.
> > > > > > > >> Would love to hear what you think.
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> Konstantine
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to