Hey all,

I think think this is a great discussion, and is helping to clarify the
real pain points as well as explore a few more options than just what was
initially proposed.

Stephane, I think why you're ending up in "grand redesign" state is because
you're highlighting (and the KIP's motivation section examples of pain
points) are actually a set of a couple of different high level challenges:


   - Impact of shared resources, especially rebalancing but also just
   shared process space/memory/etc
   - Kafka security credentials / shared client configs
   - How configs are specified in the two modes

I actually think there isn't much in the identified problems that are
unique to containers or k8s/mesos, that's just an environment you'll
commonly run into these challenges.

I want to cover each of these issues to see if we can improve them in a
more incremental way.

*Impact of shared resources, especially rebalancing but also just shared
process space/memory/etc*
There are 2 aspects to this. The first is rebalancing. There's actually
nothing that fundamentally requires every rebalance operation to be
stop-the-world. I've had some thoughts about allowing partial and
incremental rebalancing floating around for awhile. As Connect clusters are
growing larger, this is becoming increasingly important. I don't have a
full write up ready, but it's fixable by making a straightforward change
(and we planned for the ability to make changes like this, so it absolutely
should be possible to do and apply to existing clusters). The basic idea is
to change what starting a rebalance means for ownership of resources --
instead of assuming everything might be rearranged, you assume by default
that you will continue to own everything you currently do and continue
processing through the rebalance. The leader who handles assignment can
then decide what set of rebalances really are required and include that
info in the data it sends back. If any rebalancing is required, do a
subsequent round of rebalancing where you actually give up those resources
if they were assigned to you. This gives you a way to do partial rebalances
and only as needed. You can further extend this in a variety of ways, e.g.
only rebalancing one resource per rebalance, doing a bit of "scheduling" to
spread out the impact, etc. This is definitely not a trivial
change/addition to make and will require very thorough testing, but it's
definitely feasible for 1 person to implement themselves pretty quickly.

The second aspect is shared resources with no control over different
connectors usage (and the fact that someone doing something bad might mean
all connectors crash with the workers). Containers are great for this that
give you more granularity than VMs or physical hosts, but either one works.
If you want to use any of these solutions, you are going to have to have
separate Connect clusters. There are some pain points if you go this route,
but I think some you can just live with (e.g. 3 topics per connector isn't
actually that insane), some are fixable and just not there yet (e.g. the
upgrade example affecting the whole cluster is also fixable by just
supporting dynamically loading in new jars instead of requiring a restart).

*Kafka security credentials / shared client configs*
This is definitely one we hear from time to time. In general, there's no
reason we couldn't just expose security configs in the connector config. I
have pushed back on just exposing all configs because a) I don't think
users *should* have control over them, b) in a shared cluster, some
represent potentially serious stability threats (e.g. I override buffer
sizes and OOM everything), and c) it's actually a compatibility concern.
However, I don't think that means we shouldn't consider exposing some of
them if there are good use cases (such as here, where you might want to
lock down different connectors from each other via ACLs).

For the purposes of this discussion, I don't really think this is the thing
to focus on -- given you're already committing to having separate security
configs for each connector, it seems like whether you configure multiple
clusters vs configure them per connector isn't that much different.

*How configs are specified in the two modes*
This one is, I think, the most interesting because it has been requested a
few times before and I'm not sure anyone has actually worked through
semantics that are consistent and handle all the different requirements. Of
all the stuff mentioned so far in this discussion, this is the piece that
is really the biggest pain point that's not easy to work around today --
even if you split out to separate containers per connector, you still want
distributed mode because a single connector may need to be spread across
multiple containers. However, the default way for apps to be configured is
for them to get config files/parameters directly with each process, and so
it seems nice if you are only running 1 connecter per cluster, to treat the
entire config, both worker and connector, as a single "app" config.

But this breaks down with distributed mode and when you start thinking
about upgrades. With distributed mode as it is today, how do you decide if
you should write the config you were handed to the config topic? (For the
deployment approach we're talking about here, disabling changes to configs
via the REST API makes sense and avoids a whole set of confusing cases
where it's unclear what to do, so I'll avoid them here, but for the more
general question of just allowing connector config files for distributed
mode, these cases would also need to be addressed). Updates to the config
complicate things -- what happens when you're rolling bouncing and a worker
crashes and fails over? Can we always guarantee we won't restart a process
such that a node still with the old config doesn't restart and overwrite a
newer version of the config? Or maybe we don't write it at all and each
just relies on the config it was given, but a) we still need to distribute
task configs and b) what happens during an upgrade when the configs are
different across different nodes? I think we would need to think about and
define how all these edge cases actually behave, and ensure it would be
both correct and intuitive.

On the topic of workers having a leader & having to use a topic because
there is no other storage: first, there is a leader and we do use that to
control who is currently allowed to write to the config topic. Kafka
effectively provides for this in the protocol because of the way we have a
single member of the group do assignment in a two-round protocol. The
reason statuses are also a topic is that we have this great pub/sub system,
so it makes sense to use it instead of reinventing the wheel. If the number
of topics is too painful, instead of possibly doing something like
replacing the way we store and distribute configs/offsets/statuses, we
could instead try to address this specific concern with, e.g., connect
cluster IDs that are included in these topics so you can have one global
topic similar to _consumer_offsets. That doesn't totally solve the problem
if you want the worker clusters to also be completely security isolated,
but I think it addresses most people's concern with this setup (while also
not introducing additional changes in how we have to have the cluster
achieve consensus). These are JSON data formats so should be easy to extend
a bit -- the major question I would have is around how we make it seamless
to upgrade given that, e.g., keys in the config topic would have to change
between v1 and v2 formats.


Anyway, sorry for the wall of text, but I think giving a bit more structure
to the problem space might help us focus on what really needs to be solved
so we can also do so incrementally. So given the above, what are the things
that most make it painful? Is it the config thing where fundamentally it
does not fit into your k8s/mesos/container + CI/CD workflow to do anything
other than pass configs in via command line flags? Or is it mostly the pain
of too many topics? Or something else?

-Ewen

On Fri, May 18, 2018 at 4:47 AM Saulius Valatka <saulius...@gmail.com>
wrote:

> Hi,
>
> thanks for the feedback, you have some valid points that I did not consider
> thoroughly.
>
> Now that I think about it, my current proposal is indeed a bit hacky,
> namely the dichotomy between running "light"/"heavy" connectors in
> standalone/distributed mode: indeed there should be no difference, one
> should be able to freely scale a connector by running more container
> instances. Whereas with the proposed approach going from 1 container to
> more would require switching the runtime mode, e.g. this is completely not
> the case with kafka streams apps -- you just scale them at will and I don't
> see why connectors should behave differently.
>
> Your points regarding the config/status topics are also true: the config
> topic seems irrelevant assuming connector configuration is defined
> statically inside the container and the status topic could _probably_ also
> be dropped and the functionality implemented via the consumer group
> protocol -- though I'm also just speculating here, I'll have to study the
> source code in more depth (maybe someone more familiar with the code-base
> could offer some insight?).
>
> Regarding Rahul's questions as to why these topics are a problem, I can
> also re-iterate what Stephane said: it's a needless burden to manage.
> Though I'm pretty sure different connectors can use the same topics, it's
> still not nice, since if you're using ACLs to control access, all
> connectors will have to be granted access to these topics and there can
> potentially be trouble from misbehaving/malicious connectors. Also, I don't
> think these topics offer any advantages when it comes to centralization:
> the only reasons they exist is because the distributed mode has no other
> way to store information apart from inside kafka topics, whereas if you're
> running connectors in containers on kubernetes, you'd store this
> configuration in the image/env-vars/configmaps or some other mechanism you
> use, the point being that this once again becomes a concern for the
> container platform -- once again, just like with Kafka Streams.
>
> Given all of the above, I'm now inclined to either extend the standalone
> mode and enable scaling/offset storage in kafka OR better yet, introduce a
> new runtime mode ("container"? not sure what should it be called). The key
> points of this runtime mode would be:
>
>  - offset storage in a kafka topic
>  - only one statically configured connector launched during startup
>  - scaling happens via the consumer group protocol
>  - only a lightweight "health" REST API that simply informs if the
> connector is running
>
> Obviously this would extend the scope of the KIP, but I'd be willing to
> give this a shot.
> Waiting for your feedback, once a more clear vision is in place I could
> update the KIP.
>
> Thanks
>
> 2018-05-17 13:17 GMT+03:00 Stephane Maarek <steph...@simplemachines.com.au
> >:
>
> > Hi Salius
> >
> > I think you're on the money, but you're not pushing things too far.
> > This is something I've hoped for a long time.
> > Let's talk Kafka Connect v2
> >
> > Kafka Connect Cluster, as you said, are not convenient to work with (the
> > KIP details drawbacks well). I'm all about containerisation just like
> > stream apps support (and boasts!).
> >
> > Now, here's the problem with Kafka Connect. There are three backing
> topics.
> > Here's the analysis of how they can evolve:
> > - Config topic: this one is irrelevant if each connect cluster comes
> with a
> > config bundled with the corresponding JAR, as you mentioned in your KIP
> > - Status topic: this is something I wish was gone too. The consumers
> have a
> > coordinator, and I believe the connect workers should have a coordinator
> > too, for task rebalancing.
> > - Source Offset topic: only relevant for sources. I wish there was a
> > __connect_offsets global topic just like for consumers and an
> > "ConnectOffsetCoordinator" to talk to to retrieve latest committed
> offset.
> >
> > If we look above, with a few back-end fundamental transformations, we can
> > probably make Connect "cluster-less".
> >
> > What the community would get out of it is huge:
> > - Connect workers for a specific connector are independent and isolated,
> > measurable (in CPU and Mem) and auto-scalable
> > - CI/CD is super easy to integrate, as it's just another container / jar.
> > - You can roll restart a specific connector and upgrade a JAR without
> > interrupting your other connectors and while keeping the current
> connector
> > from running.
> > - The topics backing connect are removed except the global one, which
> > allows you to scale easily in terms of number of connectors
> > - Running a connector in dev or prod (for people offering connectors) is
> as
> > easy as doing a simple "docker run".
> > - Each consumer / producer settings can be configured at the container
> > level.
> > - Each connect process is immutable in configuration.
> > - Each connect process has its own security identity (right now, you
> need a
> > connect cluster per service role, which is a lot of overhead in terms of
> > backing topic)
> >
> > Now, I don't have the Kafka expertise to know exactly which changes to
> make
> > in the code, but I believe the final idea is achievable.
> > The change would be breaking for how Kafka Connect is run, but I think
> > there's a chance to make the change non breaking to how Connect is
> > programmed. I believe the same public API framework can be used.
> >
> > Finally, the REST API can be used for monitoring, or the JMX metrics as
> > usual.
> >
> > I may be completely wrong, but I would see such a change drive the
> > utilisation, management of Connect by a lot while lowering the barrier to
> > adoption.
> >
> > This change may be big to implement but probably worthwhile. I'd be happy
> > to provide more "user feedback" on a PR, but probably won't be able to
> > implement a PR myself.
> >
> > More than happy to discuss this
> >
> > Best,
> > Stephane
> >
> >
> > Kind regards,
> > Stephane
> >
> > [image: Simple Machines]
> >
> > Stephane Maarek | Developer
> >
> > +61 416 575 980
> > steph...@simplemachines.com.au
> > simplemachines.com.au
> > Level 2, 145 William Street, Sydney NSW 2010
> >
> > On 17 May 2018 at 14:42, Saulius Valatka <saulius...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > the only real usecase for the REST interface I can see is providing
> > > health/liveness checks for mesos/kubernetes. It's also true that the
> API
> > > can be left as is and e.g. not exposed publicly on the platform level,
> > but
> > > this would still leave opportunities to accidentally mess something up
> > > internally, so it's mostly a safety concern.
> > >
> > > Regarding the option renaming: I agree that it's not necessary as it's
> > not
> > > clashing with anything, my reasoning is that assuming some other offset
> > > storage appears in the future, having all config properties at the root
> > > level of offset.storage.* _MIGHT_ introduce clashes in the future, so
> > this
> > > is just a suggestion for introducing a convention of
> > > offset.storage.<store>.<properties>, which the existing
> > > property offset.storage.file.filename already adheres to. But in
> general,
> > > yes -- this can be left as is.
> > >
> > >
> > >
> > > 2018-05-17 1:20 GMT+03:00 Jakub Scholz <ja...@scholz.cz>:
> > >
> > > > Hi,
> > > >
> > > > What do you plan to use the read-only REST interface for? Is there
> > > > something what you cannot get through metrics interface? Otherwise it
> > > might
> > > > be easier to just disable the REST interface (either in the code, or
> > just
> > > > on the platform level - e.g. in Kubernetes).
> > > >
> > > > Also, I do not know what is the usual approach in Kafka ... but do we
> > > > really have to rename the offset.storage.* options? The current names
> > do
> > > > not seem to have any collision with what you are adding and they
> would
> > > get
> > > > "out of sync" with the other options used in connect
> (status.storage.*
> > > and
> > > > config.storage.*). So it seems a bit unnecessary change to me.
> > > >
> > > > Jakub
> > > >
> > > >
> > > >
> > > > On Wed, May 16, 2018 at 10:10 PM Saulius Valatka <
> saulius...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I'd like to start a discussion on the following KIP:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 304%3A+Connect+runtime+mode+improvements+for+container+platforms
> > > > >
> > > > > Basically the idea is to make it easier to run separate instances
> of
> > > > Kafka
> > > > > Connect hosting isolated connectors on container platforms such as
> > > Mesos
> > > > or
> > > > > Kubernetes.
> > > > >
> > > > > In particular it would be interesting to hear opinions about the
> > > proposed
> > > > > read-only REST API mode, more specifically I'm concerned about the
> > > > > possibility to implement it in distributed mode as it appears the
> > > > framework
> > > > > is using it internally (
> > > > >
> > > > > https://github.com/apache/kafka/blob/trunk/connect/
> > > > runtime/src/main/java/org/apache/kafka/connect/runtime/
> > > > distributed/DistributedHerder.java#L1019
> > > > > ),
> > > > > however this particular API method appears to be undocumented(?).
> > > > >
> > > > > Looking forward for your feedback.
> > > > >
> > > >
> > >
> >
>

Reply via email to