Thanks for new comments Randall. Following up with my replies inline below. I'll also go ahead and update the KIP with the suggestions that are outstanding right now and post a summary of the changes.
On Wed, Jan 15, 2020 at 2:37 PM Randall Hauch <rha...@gmail.com> wrote: > My responses are inline: > > On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis < > konstant...@confluent.io> wrote: > > > Hi Randall, Tom and Almog. I'm excited to read your comments. I'll reply > in > > separate emails, in order. > > > > First, to Randall's comments, I'm replying below with a reference to the > > comment number: > > > > 1. Although I can imagine we'd be interested in adding additional > metadata > > in the record value, I didn't see the need for a timestamp in this first > > draft. > > Now that you mention, the way I'd interpret a timestamp in the connector > > status record value would be as an approximation of since when this > > connector has been using this topic. > > Happy to add this if we think this info is useful. Of course, accuracy of > > this information depends on message retention in Kafka and on how long > the > > workers have been running without a restart, so this might make this > > approximation less useful if it gets recomputed from time to time. > > To your reference in "Recording active topics" I'll reply below, because > > that's Tom's question too. > > > > Makes sense that the timestamp in the connector is the (approximate) time > that the connector has been using the topic. I do think it's worth adding > in the record value (not relying upon Kafka record timestamp). > > Regarding "message retention", by default Connect creates the status topic > with compaction but no deletion policy, which means infinite retention. > Don't several things become problematic if finite retention is used on the > status topic, or do we need to worry about this for the active topic > records. Do we need to periodically rewrite all of the active topic > records? If so, we could just write new records using the original > timestamp as originally read by the worker. If the worker does periodically > (maybe just on task startup) rewrite the active topic records, then we'd > have to be sure about the semantics of and interplay with concurrent > explicit "reset" calls. > Good point. These topics are configured to have infinite retention. I'll add the timestamp as type 'long'. > > > > > 2. I'll explain with an example, that maybe is worth adding to the KIP > > because what's expected to happen might not be as obvious as I thought > when > > a new topic is recorded. > > Let's say we have two workers, W1 and W2, each running two worker tasks > T11 > > T12 and T21 T22 respectively associated with a connector C1. All tasks > will > > run producers that will produce records to the same topic, "test-topic". > > When the connector starts, both workers track this connector's set of > > active topics as empty. Given the absence of synchronization (that's > good) > > in how this information is recorded and persisted in the status topic, > all > > four tasks might race to record status messages: > > > > For example: > > > > T11, running at worker W1, will send Kafka records with: > > key: topic-test-topic-connector-C1 > > value: "topic": { "connector": "some-source", "task": > "some-source-TT11", > > "name": "test-topic" } > > > > and T22, running at worker W2, will send Kafka records with: > > key: topic-test-topic-connector-C1 > > value: "topic": { "connector": "some-source", "task": > "some-source-TT22", > > "name": "test-topic" } > > > > (similarly tasks T12 and T21 might send topic status records). > > > > These four records (they might not even be four but there's going to be > at > > least one) may be written in any order. Because the topic is compacted > and > > these records have the same key, eventually only one message will be > > retained. > > The task ID of that message will be the ID of the task that wrote last. I > > can see this being used mostly for troubleshooting. > > > > Thanks for the clarification. Might be good to clarify the language a bit > more, though I'm not convinced an example is really needed. > I'll try to see how they both fit. Sure. > > > > > > 3. I believe across the whole KIP, when I'm referring to the task > entity, I > > imply the worker task. Not the user code that is running as > implementation > > of the SourceTask or SinkTask abstract classes. Didn't want to increase > > complexity by referring to a task as worker task. > > But I see your point and I'm going to prefer the terms "worker" and > "worker > > task" to highlight that it's the framework that is aware of this feature > > and not the user code. > > > > Thank you. > +1 > > > > > > 4. I assumed that absence of changes to the public API would indicate > that > > these interfaces/abstract classes remain unchanged. But definitely it's > > worth to explicitly mention that. > > > > Thanks! > > +1 > > > > > 5. That is correct. My intention is to make reset work well with the > > streaming programming model. Resetting (which btw is not mandatory) means > > that you are cleaning the slate for a connector that is currently > running, > > and its currently active topics will soon be populated from scratch > because > > new records will be produced or consumed. > > But resetting is not required. I see it more like a useful operation, in > > case users want to clean the active topics history, without having to > > delete a connector, since delete has further implications in the > > connector's progress tracking. > > > > I do think it's worth trying to clarify in the document what happens when > active topics are cleared for a connector that is currently running. > Good point. > > > > > > 6. I fixed the typo - thanks! I'm very much in favor of preserving > symmetry > > between the two connector types. This has definitely more long term > > benefits and may help to avoid confusion. However, the asymmetry is > > inherited here by the asymmetry that exists today between source and sink > > connectors. > > Source connector don't list topics in their configurations but sink > > connectors do. So, if a user reconfigures a sink connector with a > different > > set of topics, if we don't reset the topics based on the new configs (and > > my thought here was to match the new configuration with the set of active > > topics), the old topics, currently not listed in the connectors > > configuration, will keep showing up as active topics. The user will have > to > > explicitly reset the active topics after reconfiguring to avoid this. If > > there's consensus that preserving this asymmetry is worse than having to > > reset the active topics, I'm happy to change this in the KIP. > > > > Would it be easier to keep the symmetric approach (the active topics are > cleared only explicitly) if the POST connector method supported a new query > parameter to reset the topics before starting (but after stopping any > already running tasks)? That makes it easy to reconfigure a connector > (source or sink) and atomically clear the active topics before the > connector is (re)started. Without that feature, I can't just reconfigure a > running sink connector and be sure that the active topics are cleared > atomically -- unless we adopt the asymmetric behavior currently in the KIP. > > WDYT? > I like the idea. I'll update the KIP. > > 7. What I try to avoid here is the following situation: For some reason > (a > > sequence of failures to write tombstones to the status topic), stale > topic > > status records remain in that topic even after a connector has been > > deleted. Requiring to restart a connector with the same name just to > apply > > a follow up reset of active topics doesn't seem necessary. I like the > idea > > of decoupling connector existence from the maintenance of the status > topic. > > Of course, a similar clean up is something that the workers could also > > perform, but to avoid complexity and potential race conditions, I'm > leaving > > this out for the moment (it's also an implementation detail). > > > > 8. Indeed, a security section is warranted. I believe the main > implication > > is that if you are able to query a connector's status, config, etc you > will > > be able to also see its active topics. Furthermore, if you are allowing a > > worker task to create topics as well as produce or consume from topics > only > > via connector config overrides, leaving the worker configs without > > permissions to these topics, meaning that you assign per connector > > permissions and not across the board, then this feature should respect > > this. The topics are still stored in common data structures within the > > worker and are persisted in the status topic. But this info should not be > > leaked to anyone who's not supposed to have access to the status topic or > > the Connect REST API endpoints. To this respect I feel this feature > > inherits the assumptions and security guarantees of similar information > > already stored by the Connect framework. I'm happy to add this to a > > security section, if we agree that the above cover the subject. > > > > I think that makes sense, and it'd be great to add that in a Security > section. > I'll go ahead and add this info to a Security section. > > > 9. I assumed that partitioning is implied by default, because there's no > > requirement for complete ordering of topic status records. But I'll add > > this fact as a separate bullet. The status.storage.topic is already a > > partitioned topic. > > > > Agreed. I think it'd be sufficient to simply mention that partition will be > chosen based upon the active topic records' keys, ensuring that all active > topic records for the same connector will be written to the same partition > and will be totally ordered. > I'm adding a bullet point to refer to partitioning for this topic. Thanks - Konstantine > > > > I'm following up with the rest of the comments, shortly. > > Thanks, > > Konstantine > > > > > > On Wed, Jan 15, 2020 at 9:19 AM Almog Gavra <al...@confluent.io> wrote: > > > > > Hi Konstantine, > > > > > > Thanks for the KIP! This is going to make automatic integration with > > > Connect much more powerful. > > > > > > My thoughts are mostly around freshness of the data and being able to > > > expose that to users. Riffing on Randall's timestamp question - have we > > > considered adding some interval at which point a connector will > republish > > > any topics that it encounters and update the timestamp? That way we > have > > > some refreshing mechanism that isn't as powerful as the complete reset > > > (which may not be practical in many scenarios). > > > > > > I also agree with Randall's other point (Would it be better to not > > > automatically reset connector's active topics when a sink connector is > > > restarted?). I think keeping the behavior as symmetrical between sink > and > > > source connectors is a good idea. > > > > > > Lastly, with regards to the API, I can imagine it is also pretty useful > > to > > > answer the inverse question: "which connectors write to topic X". > Perhaps > > > we can achieve this by letting the users compute it and just expose an > > API > > > that returns the entire mapping at once (instead of needing to call the > > > /connectors/{name}/topics endpoint for each connector). > > > > > > Otherwise, looks good to me! Hits the requirements that I had in mind > on > > > the nose. > > > - Almog > > > > > > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley <tbent...@redhat.com> > wrote: > > > > > > > Hi Konstantine, > > > > > > > > Thanks for the KIP, I can see how it could be useful. > > > > > > > > a) Did you consider using a metric for this? I don't think it would > > > satisfy > > > > all the use cases you have in mind, but you could mention it in the > > > > rejected alternatives. > > > > > > > > b) If the topic name contains the string "-connector" then the key > > format > > > > is ambiguous. This isn't necessarily fatal because the value will > > > > disambiguate, but it could be misleading. Any reason not to just use > a > > > JSON > > > > key, and simplify the value? > > > > > > > > c) I didn't understand this part: "As soon as a worker detects the > > > addition > > > > of a topic to a connector's set of active topics, the worker will > cease > > > to > > > > post update messages to the status.storage.topic for that connector. > ". > > > I'm > > > > sure I've overlooking something but why is this necessary? Is this > were > > > the > > > > task id in the value is used? > > > > > > > > Thanks again, > > > > > > > > Tom > > > > > > > > On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch <rha...@gmail.com> > > wrote: > > > > > > > > > Oh, one more thing: > > > > > > > > > > 9. There's no mention of how the status topic is partitioned, or > how > > > > > partitioning will be used by the new topic records. The KIP should > > > > probably > > > > > outline this for clarity and completeness. > > > > > > > > > > Best regards, > > > > > > > > > > Randall > > > > > > > > > > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch <rha...@gmail.com> > > > wrote: > > > > > > > > > > > Thanks, Konstantine. Overall, this KIP looks interesting and > really > > > > > > useful, and for the most part is spot on. I do have a number of > > > > > > questions/comments about specifics: > > > > > > > > > > > > 1. The topic records have a value that includes the connector > > > name, > > > > > > task number that last reported the topic is used, and the > topic > > > > name. > > > > > > There's no mention of record timestamps, but I wonder if it'd > be > > > > > useful to > > > > > > record this. One challenge might be that a connector does not > > > write > > > > > to a > > > > > > topic for a while or the task remains running for long periods > > of > > > > > time and > > > > > > therefore the worker doesn't record that this topic has been > > newly > > > > > written > > > > > > to since it the task was restarted. IOW, the semantics of the > > > > > timestamp may > > > > > > be a bit murky. Have you thought about recording the > timestamp, > > > and > > > > > if so > > > > > > what are the pros and cons? > > > > > > - The "Recording active topics" section says the following: > > > > > > "As soon as a worker detects the addition of a topic to a > > > > > > connector's set of active topics, all the connector's tasks > > > that > > > > > inspect > > > > > > source or sink records will cease to post update messages > to > > > the > > > > > > status.storage.topic." > > > > > > This probably means the timestamp won't be very useful. > > > > > > 2. The KIP says "the Kafka record value stores the ID of the > > task > > > > that > > > > > > succeeded to store a topic status record last." However, this > > is a > > > > bit > > > > > > unclear: is it really storing the last task that successfully > > > wrote > > > > > to that > > > > > > topic (as this would require very frequent writes to this > > topic), > > > or > > > > > is it > > > > > > more that this is the task that was last *recorded* as having > > > > written > > > > > > to the topic? (Here, "recorded" could be a bit of a gray area, > > > since > > > > > this > > > > > > would depend on the how the worker periodically records this > > > > > information.) > > > > > > Any kind of clarity here might be helpful. > > > > > > 3. In the "Recording active topics" section (and the > surrounding > > > > > > sections), the "task" is used ambiguously. For example, "when > > its > > > > > tasks > > > > > > start processing their first records ... these tasks will > start > > > > > inspecting > > > > > > which is the Kafka topic of each of these records". IIUC, the > > > first > > > > > "task" > > > > > > mentioned is the connector's task, and the second is the > > worker's > > > > > task. Do > > > > > > we need to distinguish this more clearly? > > > > > > 4. Maybe I missed it, but does this KIP explicitly say that > the > > > > > > Connector API is unchanged? It's probably worth pointing out > to > > > help > > > > > > assuage any concerns that connector implementations have to > > change > > > > to > > > > > make > > > > > > use of this feature. > > > > > > 5. In the "Resetting a connector's set of active topics" > section > > > the > > > > > > behavior is not exactly clear. Consider a user running > connector > > > > "A", > > > > > the > > > > > > connector has been fully started and is processing records, > and > > > the > > > > > worker > > > > > > has recorded topic usage records. Then the user resets the > > active > > > > > topics > > > > > > for connector A while the connector is still running? If the > > > > connector > > > > > > writes to no new topics, before the tasks are rebalanced then > is > > > it > > > > > correct > > > > > > that Connect would report no active topics? And after the > tasks > > > are > > > > > > rebalance, will the worker record any topics used by connector > > A? > > > > > > 6. In the "Restaring" (misspelled) section: "Reconfiguring a > > > source > > > > > > connector has also no altering effect for a source connector. > > > > > However, when > > > > > > reconfiguring a sink connector if the new configuration no > > longer > > > > > includes > > > > > > any of the previously tracked topics, these topics will be > > removed > > > > > from the > > > > > > set of active topics for this sink connector by appending > > > tombstone > > > > > > messages appropriately after the reconfiguration of the > > > connector." > > > > > Would > > > > > > it be better to not automatically reset connector's active > > topics > > > > > when a > > > > > > sink connector is restarted? Isn't that more consistent with > the > > > > > > "Resetting" behavior and the goals at the top of the KIP: > "it'd > > be > > > > > useful > > > > > > for users, operators and applications to know which are the > > topics > > > > > that a > > > > > > connector has used since it was first created"? > > > > > > 7. The `PUT /connectors/{name}/topics/reset` endpoint "this > > > request > > > > > > can be reapplied after the deletion of the connector". IOW, > even > > > > > though > > > > > > connector with that name doesn't exist, we can still make this > > > > > request? How > > > > > > does this compare with other methods such as "status"? > > > > > > 8. What are the security implications of this proposal? > > > > > > > > > > > > As you can see, most of these can probably be addressed without > > much > > > > > work. > > > > > > > > > > > > Best regards, > > > > > > > > > > > > Randall > > > > > > > > > > > > On Mon, Jan 13, 2020 at 11:05 PM Konstantine Karantasis < > > > > > > konstant...@confluent.io> wrote: > > > > > > > > > > > >> Hi all. > > > > > >> > > > > > >> I just posted KIP-558: Track the set of actively used topics by > > > > > connectors > > > > > >> in Kafka Connect > > > > > >> > > > > > >> Wiki link here: > > > > > >> > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect > > > > > >> > > > > > >> I think it's a nice extension to follow up on KIP-158 and a > useful > > > > > feature > > > > > >> to the ever increasing number of applications that are built > > around > > > > > Kafka > > > > > >> Connect. > > > > > >> Would love to hear what you think. > > > > > >> > > > > > >> Best, > > > > > >> Konstantine > > > > > >> > > > > > > > > > > > > > > > > > > > > >