Thanks for new comments Randall. Following up with my replies inline below.
I'll also go ahead and update the KIP with the suggestions that are
outstanding right now and post a summary of the changes.

On Wed, Jan 15, 2020 at 2:37 PM Randall Hauch <rha...@gmail.com> wrote:

> My responses are inline:
>
> On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Hi Randall, Tom and Almog. I'm excited to read your comments. I'll reply
> in
> > separate emails, in order.
> >
> > First, to Randall's comments, I'm replying below with a reference to the
> > comment number:
> >
> > 1. Although I can imagine we'd be interested in adding additional
> metadata
> > in the record value, I didn't see the need for a timestamp in this first
> > draft.
> > Now that you mention, the way I'd interpret a timestamp in the connector
> > status record value would be as an approximation of since when this
> > connector has been using this topic.
> > Happy to add this if we think this info is useful. Of course, accuracy of
> > this information depends on message retention in Kafka and on how long
> the
> > workers have been running without a restart, so this might make this
> > approximation less useful if it gets recomputed from time to time.
> > To your reference in "Recording active topics" I'll reply below, because
> > that's Tom's question too.
> >
>
> Makes sense that the timestamp in the connector is the (approximate) time
> that the connector has been using the topic. I do think it's worth adding
> in the record value (not relying upon Kafka record timestamp).
>
> Regarding "message retention", by default Connect creates the status topic
> with compaction but no deletion policy, which means infinite retention.
> Don't several things become problematic if finite retention is used on the
> status topic, or do we need to worry about this for the active topic
> records. Do we need to periodically rewrite all of the active topic
> records? If so, we could just write new records using the original
> timestamp as originally read by the worker. If the worker does periodically
> (maybe just on task startup) rewrite the active topic records, then we'd
> have to be sure about the semantics of and interplay with concurrent
> explicit "reset" calls.
>

Good point. These topics are configured to have infinite retention. I'll
add the timestamp as type 'long'.


>
> >
> > 2. I'll explain with an example, that maybe is worth adding to the KIP
> > because what's expected to happen might not be as obvious as I thought
> when
> > a new topic is recorded.
> > Let's say we have two workers, W1 and W2, each running two worker tasks
> T11
> > T12 and T21 T22 respectively associated with a connector C1. All tasks
> will
> > run producers that will produce records to the same topic, "test-topic".
> > When the connector starts, both workers track this connector's set of
> > active topics as empty. Given the absence of synchronization (that's
> good)
> > in how this information is recorded and persisted in the status topic,
> all
> > four tasks might race to record status messages:
> >
> > For example:
> >
> > T11, running at worker W1, will send Kafka records with:
> > key: topic-test-topic-connector-C1
> > value: "topic": {  "connector": "some-source",  "task":
> "some-source-TT11",
> >  "name": "test-topic" }
> >
> > and T22, running at worker W2, will send Kafka records with:
> > key: topic-test-topic-connector-C1
> > value: "topic": {  "connector": "some-source",  "task":
> "some-source-TT22",
> >  "name": "test-topic" }
> >
> > (similarly tasks T12 and T21 might send topic status records).
> >
> > These four records (they might not even be four but there's going to be
> at
> > least one) may be written in any order. Because the topic is compacted
> and
> > these records have the same key, eventually only one message will be
> > retained.
> > The task ID of that message will be the ID of the task that wrote last. I
> > can see this being used mostly for troubleshooting.
> >
>
> Thanks for the clarification. Might be good to clarify the language a bit
> more, though I'm not convinced an example is really needed.
>

I'll try to see how they both fit. Sure.


>
>
> >
> > 3. I believe across the whole KIP, when I'm referring to the task
> entity, I
> > imply the worker task. Not the user code that is running as
> implementation
> > of the SourceTask or SinkTask abstract classes. Didn't want to increase
> > complexity by referring to a task as worker task.
> > But I see your point and I'm going to prefer the terms "worker" and
> "worker
> > task" to highlight that it's the framework that is aware of this feature
> > and not the user code.
> >
>
> Thank you.
>

  +1


>
>
> >
> > 4. I assumed that absence of changes to the public API would indicate
> that
> > these interfaces/abstract classes remain unchanged. But definitely it's
> > worth to explicitly mention that.
> >
>
> Thanks!
>
> +1


>
> >
> > 5. That is correct. My intention is to make reset work well with the
> > streaming programming model. Resetting (which btw is not mandatory) means
> > that you are cleaning the slate for a connector that is currently
> running,
> > and its currently active topics will soon be populated from scratch
> because
> > new records will be produced or consumed.
> > But resetting is not required. I see it more like a useful operation, in
> > case users want to clean the active topics history, without having to
> > delete a connector, since delete has further implications in the
> > connector's progress tracking.
> >
>
> I do think it's worth trying to clarify in the document what happens when
> active topics are cleared for a connector that is currently running.
>

Good point.


>
>
> >
> > 6. I fixed the typo - thanks! I'm very much in favor of preserving
> symmetry
> > between the two connector types. This has definitely more long term
> > benefits and may help to avoid confusion. However, the asymmetry is
> > inherited here by the asymmetry that exists today between source and sink
> > connectors.
> > Source connector don't list topics in their configurations but sink
> > connectors do. So, if a user reconfigures a sink connector with a
> different
> > set of topics, if we don't reset the topics based on the new configs (and
> > my thought here was to match the new configuration with the set of active
> > topics), the old topics, currently not listed in the connectors
> > configuration, will keep showing up as active topics. The user will have
> to
> > explicitly reset the active topics after reconfiguring to avoid this. If
> > there's consensus that preserving this asymmetry is worse than having to
> > reset the active topics, I'm happy to change this in the KIP.
> >
>
> Would it be easier to keep the symmetric approach (the active topics are
> cleared only explicitly) if the POST connector method supported a new query
> parameter to reset the topics before starting (but after stopping any
> already running tasks)? That makes it easy to reconfigure a connector
> (source or sink) and atomically clear the active topics before the
> connector is (re)started. Without that feature, I can't just reconfigure a
> running sink connector and be sure that the active topics are cleared
> atomically -- unless we adopt the asymmetric behavior currently in the KIP.
>
> WDYT?
>

I like the idea. I'll update the KIP.


> > 7. What I try to avoid here is the following situation: For some reason
> (a
> > sequence of failures to write tombstones to the status topic), stale
> topic
> > status records remain in that topic even after a connector has been
> > deleted. Requiring to restart a connector with the same name just to
> apply
> > a follow up reset of active topics doesn't seem necessary. I like the
> idea
> > of decoupling connector existence from the maintenance of the status
> topic.
> > Of course, a similar clean up is something that the workers could also
> > perform, but to avoid complexity and potential race conditions, I'm
> leaving
> > this out for the moment (it's also an implementation detail).
> >
> > 8. Indeed, a security section is warranted. I believe the main
> implication
> > is that if you are able to query a connector's status, config, etc you
> will
> > be able to also see its active topics. Furthermore, if you are allowing a
> > worker task to create topics as well as produce or consume from topics
> only
> > via connector config overrides, leaving the worker configs without
> > permissions to these topics, meaning that you assign per connector
> > permissions and not across the board, then this feature should respect
> > this. The topics are still stored in common data structures within the
> > worker and are persisted in the status topic. But this info should not be
> > leaked to anyone who's not supposed to have access to the status topic or
> > the Connect REST API endpoints. To this respect I feel this feature
> > inherits the assumptions and security guarantees of similar information
> > already stored by the Connect framework. I'm happy to add this to a
> > security section, if we agree that the above cover the subject.
> >
>
> I think that makes sense, and it'd be great to add that in a Security
> section.
>

I'll go ahead and add this info to a Security section.

>
> > 9. I assumed that partitioning is implied by default, because there's no
> > requirement for complete ordering of topic status records. But I'll add
> > this fact as a separate bullet. The status.storage.topic is already a
> > partitioned topic.
> >
>
> Agreed. I think it'd be sufficient to simply mention that partition will be
> chosen based upon the active topic records' keys, ensuring that all active
> topic records for the same connector will be written to the same partition
> and will be totally ordered.
>

I'm adding a bullet point to refer to partitioning for this topic. Thanks

- Konstantine


> >
> > I'm following up with the rest of the comments, shortly.
> > Thanks,
> > Konstantine
> >
> >
> > On Wed, Jan 15, 2020 at 9:19 AM Almog Gavra <al...@confluent.io> wrote:
> >
> > > Hi Konstantine,
> > >
> > > Thanks for the KIP! This is going to make automatic integration with
> > > Connect much more powerful.
> > >
> > > My thoughts are mostly around freshness of the data and being able to
> > > expose that to users. Riffing on Randall's timestamp question - have we
> > > considered adding some interval at which point a connector will
> republish
> > > any topics that it encounters and update the timestamp? That way we
> have
> > > some refreshing mechanism that isn't as powerful as the complete reset
> > > (which may not be practical in many scenarios).
> > >
> > > I also agree with Randall's other point (Would it be better to not
> > > automatically reset connector's active topics when a sink connector is
> > > restarted?). I think keeping the behavior as symmetrical between sink
> and
> > > source connectors is a good idea.
> > >
> > > Lastly, with regards to the API, I can imagine it is also pretty useful
> > to
> > > answer the inverse question: "which connectors write to topic X".
> Perhaps
> > > we can achieve this by letting the users compute it and just expose an
> > API
> > > that returns the entire mapping at once (instead of needing to call the
> > > /connectors/{name}/topics endpoint for each connector).
> > >
> > > Otherwise, looks good to me! Hits the requirements that I had in mind
> on
> > > the nose.
> > > - Almog
> > >
> > > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley <tbent...@redhat.com>
> wrote:
> > >
> > > > Hi Konstantine,
> > > >
> > > > Thanks for the KIP, I can see how it could be useful.
> > > >
> > > > a) Did you consider using a metric for this? I don't think it would
> > > satisfy
> > > > all the use cases you have in mind, but you could mention it in the
> > > > rejected alternatives.
> > > >
> > > > b) If the topic name contains the string "-connector" then the key
> > format
> > > > is ambiguous. This isn't necessarily fatal because the value will
> > > > disambiguate, but it could be misleading. Any reason not to just use
> a
> > > JSON
> > > > key, and simplify the value?
> > > >
> > > > c) I didn't understand this part: "As soon as a worker detects the
> > > addition
> > > > of a topic to a connector's set of active topics, the worker will
> cease
> > > to
> > > > post update messages to the status.storage.topic for that connector.
> ".
> > > I'm
> > > > sure I've overlooking something but why is this necessary? Is this
> were
> > > the
> > > > task id in the value is used?
> > > >
> > > > Thanks again,
> > > >
> > > > Tom
> > > >
> > > > On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch <rha...@gmail.com>
> > wrote:
> > > >
> > > > > Oh, one more thing:
> > > > >
> > > > > 9. There's no mention of how the status topic is partitioned, or
> how
> > > > > partitioning will be used by the new topic records. The KIP should
> > > > probably
> > > > > outline this for clarity and completeness.
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Randall
> > > > >
> > > > > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch <rha...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks, Konstantine. Overall, this KIP looks interesting and
> really
> > > > > > useful, and for the most part is spot on. I do have a number of
> > > > > > questions/comments about specifics:
> > > > > >
> > > > > >    1. The topic records have a value that includes the connector
> > > name,
> > > > > >    task number that last reported the topic is used, and the
> topic
> > > > name.
> > > > > >    There's no mention of record timestamps, but I wonder if it'd
> be
> > > > > useful to
> > > > > >    record this. One challenge might be that a connector does not
> > > write
> > > > > to a
> > > > > >    topic for a while or the task remains running for long periods
> > of
> > > > > time and
> > > > > >    therefore the worker doesn't record that this topic has been
> > newly
> > > > > written
> > > > > >    to since it the task was restarted. IOW, the semantics of the
> > > > > timestamp may
> > > > > >    be a bit murky. Have you thought about recording the
> timestamp,
> > > and
> > > > > if so
> > > > > >    what are the pros and cons?
> > > > > >    - The "Recording active topics" section says the following:
> > > > > >       "As soon as a worker detects the addition of a topic to a
> > > > > >       connector's set of active topics, all the connector's tasks
> > > that
> > > > > inspect
> > > > > >       source or sink records will cease to post update messages
> to
> > > the
> > > > > >       status.storage.topic."
> > > > > >       This probably means the timestamp won't be very useful.
> > > > > >    2. The KIP says "the Kafka record value stores the ID of the
> > task
> > > > that
> > > > > >    succeeded to store a topic status record last." However, this
> > is a
> > > > bit
> > > > > >    unclear: is it really storing the last task that successfully
> > > wrote
> > > > > to that
> > > > > >    topic (as this would require very frequent writes to this
> > topic),
> > > or
> > > > > is it
> > > > > >    more that this is the task that was last *recorded* as having
> > > > written
> > > > > >    to the topic? (Here, "recorded" could be a bit of a gray area,
> > > since
> > > > > this
> > > > > >    would depend on the how the worker periodically records this
> > > > > information.)
> > > > > >    Any kind of clarity here might be helpful.
> > > > > >    3. In the "Recording active topics" section (and the
> surrounding
> > > > > >    sections), the "task" is used ambiguously. For example, "when
> > its
> > > > > tasks
> > > > > >    start processing their first records ... these tasks will
> start
> > > > > inspecting
> > > > > >    which is the Kafka topic of each of these records". IIUC, the
> > > first
> > > > > "task"
> > > > > >    mentioned is the connector's task, and the second is the
> > worker's
> > > > > task. Do
> > > > > >    we need to distinguish this more clearly?
> > > > > >    4. Maybe I missed it, but does this KIP explicitly say that
> the
> > > > > >    Connector API is unchanged? It's probably worth pointing out
> to
> > > help
> > > > > >    assuage any concerns that connector implementations have to
> > change
> > > > to
> > > > > make
> > > > > >    use of this feature.
> > > > > >    5. In the "Resetting a connector's set of active topics"
> section
> > > the
> > > > > >    behavior is not exactly clear. Consider a user running
> connector
> > > > "A",
> > > > > the
> > > > > >    connector has been fully started and is processing records,
> and
> > > the
> > > > > worker
> > > > > >    has recorded topic usage records. Then the user resets the
> > active
> > > > > topics
> > > > > >    for connector A while the connector is still running? If the
> > > > connector
> > > > > >    writes to no new topics, before the tasks are rebalanced then
> is
> > > it
> > > > > correct
> > > > > >    that Connect would report no active topics? And after the
> tasks
> > > are
> > > > > >    rebalance, will the worker record any topics used by connector
> > A?
> > > > > >    6. In the "Restaring" (misspelled) section: "Reconfiguring a
> > > source
> > > > > >    connector has also no altering effect for a source connector.
> > > > > However, when
> > > > > >    reconfiguring a sink connector if the new configuration no
> > longer
> > > > > includes
> > > > > >    any of the previously tracked topics, these topics will be
> > removed
> > > > > from the
> > > > > >    set of active topics for this sink connector by appending
> > > tombstone
> > > > > >    messages appropriately after the reconfiguration of the
> > > connector."
> > > > > Would
> > > > > >    it be better to not automatically reset connector's active
> > topics
> > > > > when a
> > > > > >    sink connector is restarted? Isn't that more consistent with
> the
> > > > > >    "Resetting" behavior and the goals at the top of the KIP:
> "it'd
> > be
> > > > > useful
> > > > > >    for users, operators and applications to know which are the
> > topics
> > > > > that a
> > > > > >    connector has used since it was first created"?
> > > > > >    7. The `PUT /connectors/{name}/topics/reset` endpoint "this
> > > request
> > > > > >    can be reapplied after the deletion of the connector". IOW,
> even
> > > > > though
> > > > > >    connector with that name doesn't exist, we can still make this
> > > > > request? How
> > > > > >    does this compare with other methods such as "status"?
> > > > > >    8. What are the security implications of this proposal?
> > > > > >
> > > > > > As you can see, most of these can probably be addressed without
> > much
> > > > > work.
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Randall
> > > > > >
> > > > > > On Mon, Jan 13, 2020 at 11:05 PM Konstantine Karantasis <
> > > > > > konstant...@confluent.io> wrote:
> > > > > >
> > > > > >> Hi all.
> > > > > >>
> > > > > >> I just posted KIP-558: Track the set of actively used topics by
> > > > > connectors
> > > > > >> in Kafka Connect
> > > > > >>
> > > > > >> Wiki link here:
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect
> > > > > >>
> > > > > >> I think it's a nice extension to follow up on KIP-158 and a
> useful
> > > > > feature
> > > > > >> to the ever increasing number of applications that are built
> > around
> > > > > Kafka
> > > > > >> Connect.
> > > > > >> Would love to hear what you think.
> > > > > >>
> > > > > >> Best,
> > > > > >> Konstantine
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to