Hi Ewen

thanks for the response, I totally agree that this KIP touches upon
multiple rather orthogonal issues, I had a hard time deciding on the name
and scope for it :)
Let me sum up the biggest pain points that we're facing with distributed
mode:

* By far the biggest issue we have is the lack of physical connector
isolation. Since all of them live in the same JVM process we've run into
issues multiple times where misbehaving connectors can either crash the
whole process or simply over-utilize cpu/memory and effectively deprive
others of resources. This is the main reason why we stopped using
distributed mode in production and switched to running each connector in
standalone mode per container. Currently we're running a patched version
Connect where standalone mode can store offsets in a Kafka topic (as
currently proposed in the KIP). I don't really see how this could be
easily addressed within the existing framework and given that increasingly
more users have access to mesos/k8s it would seem natural to isolate at
this level rather than solve this within Connect.

* Stop-the-world rebalancing when deploying connectors is
a-very-close-second-biggest issue we had with distributed mode. True, you
could solve this by improving the rebalancing algorithm, but just having
isolated Connect frameworks with single connectors inside sounds much safer
(bugs do happen) and easier to implement.

* The next biggest problem is the deployment workflow, which can be split
into two major issues: deploying via REST and various JAR upgrades
(connectors/converters/transforms). Again, true, you can introduce some
kind of JAR dynamic-reloading mechanism which no longer requires a global
cluster restart, but still requires some ad-hoc deployment procedure (e.g.
how would I coordinate JAR updates with configuration updates?). Once
again, all of this is trivially solved just by running statically
configured connectors-per-container: just rebuild the image with updated
jars and redeploy it to mesos/k8s.

* The separate configuration per connector issues were not that painful,
but nice to get rid of: mainly the lack of ability to use different
credentials per connector. Sure, these are being gradually addressed by
various KIPs, but by running separate Connect instances per connector we
get full control out of the box.

As for now, we are perfectly happy with running each connector in a
separate container in standalone mode, the only required modification to
Connect is the ability to store offsets in a Kafka topic. The REST API
becomes useless in this scenario, it can of course be disabled at the
container/k8s level or ignored, but nevertheless I've included a proposal
to make it read-only (or disable it) in the KIP for completeness.

I believe what Stephane calls Kafka Connect v2 is not that much of a "grand
redesign", but rather a logical conclusion to this KIP: I wouldn't go as
far as calling it v2, but maybe a new "distributed-isolated" runtime mode.
If connectors are isolated and statically pre-configured, the distributed
mode could in principle do away with the config/status topics and act more
like an ordinary kafka consumer. The fact that these topics would no longer
be needed would be a nice side-effect, I don't really think they are a
major problem, rather an annoyance.

To summarize, the core issue the KIP is addressing is physical/logical
connector isolation, which can be solved by running isolated Connect
frameworks, rather than a Connect cluster. The most reasonable method for
doing this is running connectors on a platform like mesos/k8s, hence the
configuration/deployment problems. In other words, it would be nice to run
Connect more like an application and not like a cluster. In terms of
approaching this incrementally, I believe the first step could be improving
the standalone mode as described in the KIP currently. Improving the
distributed mode is a separate story.

Hope this makes sense


2018-05-22 3:00 GMT+03:00 Ewen Cheslack-Postava <e...@confluent.io>:

> Hey all,
>
> I think think this is a great discussion, and is helping to clarify the
> real pain points as well as explore a few more options than just what was
> initially proposed.
>
> Stephane, I think why you're ending up in "grand redesign" state is because
> you're highlighting (and the KIP's motivation section examples of pain
> points) are actually a set of a couple of different high level challenges:
>
>
>    - Impact of shared resources, especially rebalancing but also just
>    shared process space/memory/etc
>    - Kafka security credentials / shared client configs
>    - How configs are specified in the two modes
>
> I actually think there isn't much in the identified problems that are
> unique to containers or k8s/mesos, that's just an environment you'll
> commonly run into these challenges.
>
> I want to cover each of these issues to see if we can improve them in a
> more incremental way.
>
> *Impact of shared resources, especially rebalancing but also just shared
> process space/memory/etc*
> There are 2 aspects to this. The first is rebalancing. There's actually
> nothing that fundamentally requires every rebalance operation to be
> stop-the-world. I've had some thoughts about allowing partial and
> incremental rebalancing floating around for awhile. As Connect clusters are
> growing larger, this is becoming increasingly important. I don't have a
> full write up ready, but it's fixable by making a straightforward change
> (and we planned for the ability to make changes like this, so it absolutely
> should be possible to do and apply to existing clusters). The basic idea is
> to change what starting a rebalance means for ownership of resources --
> instead of assuming everything might be rearranged, you assume by default
> that you will continue to own everything you currently do and continue
> processing through the rebalance. The leader who handles assignment can
> then decide what set of rebalances really are required and include that
> info in the data it sends back. If any rebalancing is required, do a
> subsequent round of rebalancing where you actually give up those resources
> if they were assigned to you. This gives you a way to do partial rebalances
> and only as needed. You can further extend this in a variety of ways, e.g.
> only rebalancing one resource per rebalance, doing a bit of "scheduling" to
> spread out the impact, etc. This is definitely not a trivial
> change/addition to make and will require very thorough testing, but it's
> definitely feasible for 1 person to implement themselves pretty quickly.
>
> The second aspect is shared resources with no control over different
> connectors usage (and the fact that someone doing something bad might mean
> all connectors crash with the workers). Containers are great for this that
> give you more granularity than VMs or physical hosts, but either one works.
> If you want to use any of these solutions, you are going to have to have
> separate Connect clusters. There are some pain points if you go this route,
> but I think some you can just live with (e.g. 3 topics per connector isn't
> actually that insane), some are fixable and just not there yet (e.g. the
> upgrade example affecting the whole cluster is also fixable by just
> supporting dynamically loading in new jars instead of requiring a restart).
>
> *Kafka security credentials / shared client configs*
> This is definitely one we hear from time to time. In general, there's no
> reason we couldn't just expose security configs in the connector config. I
> have pushed back on just exposing all configs because a) I don't think
> users *should* have control over them, b) in a shared cluster, some
> represent potentially serious stability threats (e.g. I override buffer
> sizes and OOM everything), and c) it's actually a compatibility concern.
> However, I don't think that means we shouldn't consider exposing some of
> them if there are good use cases (such as here, where you might want to
> lock down different connectors from each other via ACLs).
>
> For the purposes of this discussion, I don't really think this is the thing
> to focus on -- given you're already committing to having separate security
> configs for each connector, it seems like whether you configure multiple
> clusters vs configure them per connector isn't that much different.
>
> *How configs are specified in the two modes*
> This one is, I think, the most interesting because it has been requested a
> few times before and I'm not sure anyone has actually worked through
> semantics that are consistent and handle all the different requirements. Of
> all the stuff mentioned so far in this discussion, this is the piece that
> is really the biggest pain point that's not easy to work around today --
> even if you split out to separate containers per connector, you still want
> distributed mode because a single connector may need to be spread across
> multiple containers. However, the default way for apps to be configured is
> for them to get config files/parameters directly with each process, and so
> it seems nice if you are only running 1 connecter per cluster, to treat the
> entire config, both worker and connector, as a single "app" config.
>
> But this breaks down with distributed mode and when you start thinking
> about upgrades. With distributed mode as it is today, how do you decide if
> you should write the config you were handed to the config topic? (For the
> deployment approach we're talking about here, disabling changes to configs
> via the REST API makes sense and avoids a whole set of confusing cases
> where it's unclear what to do, so I'll avoid them here, but for the more
> general question of just allowing connector config files for distributed
> mode, these cases would also need to be addressed). Updates to the config
> complicate things -- what happens when you're rolling bouncing and a worker
> crashes and fails over? Can we always guarantee we won't restart a process
> such that a node still with the old config doesn't restart and overwrite a
> newer version of the config? Or maybe we don't write it at all and each
> just relies on the config it was given, but a) we still need to distribute
> task configs and b) what happens during an upgrade when the configs are
> different across different nodes? I think we would need to think about and
> define how all these edge cases actually behave, and ensure it would be
> both correct and intuitive.
>
> On the topic of workers having a leader & having to use a topic because
> there is no other storage: first, there is a leader and we do use that to
> control who is currently allowed to write to the config topic. Kafka
> effectively provides for this in the protocol because of the way we have a
> single member of the group do assignment in a two-round protocol. The
> reason statuses are also a topic is that we have this great pub/sub system,
> so it makes sense to use it instead of reinventing the wheel. If the number
> of topics is too painful, instead of possibly doing something like
> replacing the way we store and distribute configs/offsets/statuses, we
> could instead try to address this specific concern with, e.g., connect
> cluster IDs that are included in these topics so you can have one global
> topic similar to _consumer_offsets. That doesn't totally solve the problem
> if you want the worker clusters to also be completely security isolated,
> but I think it addresses most people's concern with this setup (while also
> not introducing additional changes in how we have to have the cluster
> achieve consensus). These are JSON data formats so should be easy to extend
> a bit -- the major question I would have is around how we make it seamless
> to upgrade given that, e.g., keys in the config topic would have to change
> between v1 and v2 formats.
>
>
> Anyway, sorry for the wall of text, but I think giving a bit more structure
> to the problem space might help us focus on what really needs to be solved
> so we can also do so incrementally. So given the above, what are the things
> that most make it painful? Is it the config thing where fundamentally it
> does not fit into your k8s/mesos/container + CI/CD workflow to do anything
> other than pass configs in via command line flags? Or is it mostly the pain
> of too many topics? Or something else?
>
> -Ewen
>
> On Fri, May 18, 2018 at 4:47 AM Saulius Valatka <saulius...@gmail.com>
> wrote:
>
> > Hi,
> >
> > thanks for the feedback, you have some valid points that I did not
> consider
> > thoroughly.
> >
> > Now that I think about it, my current proposal is indeed a bit hacky,
> > namely the dichotomy between running "light"/"heavy" connectors in
> > standalone/distributed mode: indeed there should be no difference, one
> > should be able to freely scale a connector by running more container
> > instances. Whereas with the proposed approach going from 1 container to
> > more would require switching the runtime mode, e.g. this is completely
> not
> > the case with kafka streams apps -- you just scale them at will and I
> don't
> > see why connectors should behave differently.
> >
> > Your points regarding the config/status topics are also true: the config
> > topic seems irrelevant assuming connector configuration is defined
> > statically inside the container and the status topic could _probably_
> also
> > be dropped and the functionality implemented via the consumer group
> > protocol -- though I'm also just speculating here, I'll have to study the
> > source code in more depth (maybe someone more familiar with the code-base
> > could offer some insight?).
> >
> > Regarding Rahul's questions as to why these topics are a problem, I can
> > also re-iterate what Stephane said: it's a needless burden to manage.
> > Though I'm pretty sure different connectors can use the same topics, it's
> > still not nice, since if you're using ACLs to control access, all
> > connectors will have to be granted access to these topics and there can
> > potentially be trouble from misbehaving/malicious connectors. Also, I
> don't
> > think these topics offer any advantages when it comes to centralization:
> > the only reasons they exist is because the distributed mode has no other
> > way to store information apart from inside kafka topics, whereas if
> you're
> > running connectors in containers on kubernetes, you'd store this
> > configuration in the image/env-vars/configmaps or some other mechanism
> you
> > use, the point being that this once again becomes a concern for the
> > container platform -- once again, just like with Kafka Streams.
> >
> > Given all of the above, I'm now inclined to either extend the standalone
> > mode and enable scaling/offset storage in kafka OR better yet, introduce
> a
> > new runtime mode ("container"? not sure what should it be called). The
> key
> > points of this runtime mode would be:
> >
> >  - offset storage in a kafka topic
> >  - only one statically configured connector launched during startup
> >  - scaling happens via the consumer group protocol
> >  - only a lightweight "health" REST API that simply informs if the
> > connector is running
> >
> > Obviously this would extend the scope of the KIP, but I'd be willing to
> > give this a shot.
> > Waiting for your feedback, once a more clear vision is in place I could
> > update the KIP.
> >
> > Thanks
> >
> > 2018-05-17 13:17 GMT+03:00 Stephane Maarek <steph...@simplemachines.com.
> au
> > >:
> >
> > > Hi Salius
> > >
> > > I think you're on the money, but you're not pushing things too far.
> > > This is something I've hoped for a long time.
> > > Let's talk Kafka Connect v2
> > >
> > > Kafka Connect Cluster, as you said, are not convenient to work with
> (the
> > > KIP details drawbacks well). I'm all about containerisation just like
> > > stream apps support (and boasts!).
> > >
> > > Now, here's the problem with Kafka Connect. There are three backing
> > topics.
> > > Here's the analysis of how they can evolve:
> > > - Config topic: this one is irrelevant if each connect cluster comes
> > with a
> > > config bundled with the corresponding JAR, as you mentioned in your KIP
> > > - Status topic: this is something I wish was gone too. The consumers
> > have a
> > > coordinator, and I believe the connect workers should have a
> coordinator
> > > too, for task rebalancing.
> > > - Source Offset topic: only relevant for sources. I wish there was a
> > > __connect_offsets global topic just like for consumers and an
> > > "ConnectOffsetCoordinator" to talk to to retrieve latest committed
> > offset.
> > >
> > > If we look above, with a few back-end fundamental transformations, we
> can
> > > probably make Connect "cluster-less".
> > >
> > > What the community would get out of it is huge:
> > > - Connect workers for a specific connector are independent and
> isolated,
> > > measurable (in CPU and Mem) and auto-scalable
> > > - CI/CD is super easy to integrate, as it's just another container /
> jar.
> > > - You can roll restart a specific connector and upgrade a JAR without
> > > interrupting your other connectors and while keeping the current
> > connector
> > > from running.
> > > - The topics backing connect are removed except the global one, which
> > > allows you to scale easily in terms of number of connectors
> > > - Running a connector in dev or prod (for people offering connectors)
> is
> > as
> > > easy as doing a simple "docker run".
> > > - Each consumer / producer settings can be configured at the container
> > > level.
> > > - Each connect process is immutable in configuration.
> > > - Each connect process has its own security identity (right now, you
> > need a
> > > connect cluster per service role, which is a lot of overhead in terms
> of
> > > backing topic)
> > >
> > > Now, I don't have the Kafka expertise to know exactly which changes to
> > make
> > > in the code, but I believe the final idea is achievable.
> > > The change would be breaking for how Kafka Connect is run, but I think
> > > there's a chance to make the change non breaking to how Connect is
> > > programmed. I believe the same public API framework can be used.
> > >
> > > Finally, the REST API can be used for monitoring, or the JMX metrics as
> > > usual.
> > >
> > > I may be completely wrong, but I would see such a change drive the
> > > utilisation, management of Connect by a lot while lowering the barrier
> to
> > > adoption.
> > >
> > > This change may be big to implement but probably worthwhile. I'd be
> happy
> > > to provide more "user feedback" on a PR, but probably won't be able to
> > > implement a PR myself.
> > >
> > > More than happy to discuss this
> > >
> > > Best,
> > > Stephane
> > >
> > >
> > > Kind regards,
> > > Stephane
> > >
> > > [image: Simple Machines]
> > >
> > > Stephane Maarek | Developer
> > >
> > > +61 416 575 980
> > > steph...@simplemachines.com.au
> > > simplemachines.com.au
> > > Level 2, 145 William Street, Sydney NSW 2010
> > >
> > > On 17 May 2018 at 14:42, Saulius Valatka <saulius...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > the only real usecase for the REST interface I can see is providing
> > > > health/liveness checks for mesos/kubernetes. It's also true that the
> > API
> > > > can be left as is and e.g. not exposed publicly on the platform
> level,
> > > but
> > > > this would still leave opportunities to accidentally mess something
> up
> > > > internally, so it's mostly a safety concern.
> > > >
> > > > Regarding the option renaming: I agree that it's not necessary as
> it's
> > > not
> > > > clashing with anything, my reasoning is that assuming some other
> offset
> > > > storage appears in the future, having all config properties at the
> root
> > > > level of offset.storage.* _MIGHT_ introduce clashes in the future, so
> > > this
> > > > is just a suggestion for introducing a convention of
> > > > offset.storage.<store>.<properties>, which the existing
> > > > property offset.storage.file.filename already adheres to. But in
> > general,
> > > > yes -- this can be left as is.
> > > >
> > > >
> > > >
> > > > 2018-05-17 1:20 GMT+03:00 Jakub Scholz <ja...@scholz.cz>:
> > > >
> > > > > Hi,
> > > > >
> > > > > What do you plan to use the read-only REST interface for? Is there
> > > > > something what you cannot get through metrics interface? Otherwise
> it
> > > > might
> > > > > be easier to just disable the REST interface (either in the code,
> or
> > > just
> > > > > on the platform level - e.g. in Kubernetes).
> > > > >
> > > > > Also, I do not know what is the usual approach in Kafka ... but do
> we
> > > > > really have to rename the offset.storage.* options? The current
> names
> > > do
> > > > > not seem to have any collision with what you are adding and they
> > would
> > > > get
> > > > > "out of sync" with the other options used in connect
> > (status.storage.*
> > > > and
> > > > > config.storage.*). So it seems a bit unnecessary change to me.
> > > > >
> > > > > Jakub
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 16, 2018 at 10:10 PM Saulius Valatka <
> > saulius...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I'd like to start a discussion on the following KIP:
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 304%3A+Connect+runtime+mode+improvements+for+container+platforms
> > > > > >
> > > > > > Basically the idea is to make it easier to run separate instances
> > of
> > > > > Kafka
> > > > > > Connect hosting isolated connectors on container platforms such
> as
> > > > Mesos
> > > > > or
> > > > > > Kubernetes.
> > > > > >
> > > > > > In particular it would be interesting to hear opinions about the
> > > > proposed
> > > > > > read-only REST API mode, more specifically I'm concerned about
> the
> > > > > > possibility to implement it in distributed mode as it appears the
> > > > > framework
> > > > > > is using it internally (
> > > > > >
> > > > > > https://github.com/apache/kafka/blob/trunk/connect/
> > > > > runtime/src/main/java/org/apache/kafka/connect/runtime/
> > > > > distributed/DistributedHerder.java#L1019
> > > > > > ),
> > > > > > however this particular API method appears to be undocumented(?).
> > > > > >
> > > > > > Looking forward for your feedback.
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to