On Mon, Jul 6, 2015 at 11:40 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Ewen,
>
> I read through the KIP page and here are some comments on the design
> section:
>
> 1. "... and Copycat does not require that all partitions be enumerated".
> Not very clear about this, do you mean Copycat allows non-enumerable stream
> partitions?
>

Maybe I should change "enumerated" to just plain "listed". The point is
that the framework shouldn't ever need to ask connectors for a complete
list of their current partitions. Requiring the connector to explicitly
list all partitions can be simplifying for the framework and connectors
(e.g. we could push the work of dividing partitions over tasks into the
framework, as we do with topic-partitions in sinks), but there are some
cases where that behavior isn't ideal (e.g. JMX metrics, where an app
restart could change the set of metrics, and can cause particularly bad
behavior during a rolling restart of a service since Copycat would end up
continuously readjusting assignments).


>
> 2. "... translates the data to Copycat's format, decides the destination
> topic (and possibly partition) in Kafka." Just to confirm it seems
> indicating two destination scenarios Copycat connectors should be able to
> support:
>
> a. Specific destination topics per task (e.g. as illustrated in the digram,
> task 1 to topics A and B, task 2 to topics B and C).
> b. Specific destination topic-partitions per task (as said in "possibly
> partition", like task 1 to topicA-partition1 and topicB-partition1, task 2
> to topicA-partition2 and topicB-partition2).
>
> I understand connector developers needs to implement the dynamic mapping
> coordination from the source streams to tasks, but does the mapping from
> tasks to destination topic-partitions (for sinking Copycat I assume it
> would be stream-partitions) also need to be implemented dynamically since
> the destination stream could also change?
>

Not sure I understand what you're getting at here. Connectors can do
arbitrary shuffling to the output (which may not matter for many
connectors, e.g. HDFS, where there's only one output). Some may not need
that (e.g. reading a database commit log, you probably want to maintain
ordering within a single topic).

But as of now, there's no need to track the tasks -> destination
topic-partitions at all. There's one or two things I can think of where you
could possibly optimize them a bit in a a couple of cases if you knew this
mapping (e.g. the flush + offset commit process), but I don't think that
info is that useful to copycat.


>
> 3. "Delivery Guarantees": depending on how we define the guarantees, it may
> not only depends on the output system but also the input system. For
> example, duplicates may be generated from the input systems as well. Do we
> also need to consider these scenarios?
>

Yes, that's correct. For source connectors, if the source system introduces
duplicates then we are not doing deduplication and if it drops data there's
nothing we can do. Same deal with the output system for sink connectors. I
guess on the sink side the expected semantics are more clear since
SinkTask.flush() makes the expectations pretty clear, but on the source
side the expectation of no duplicates/dropped data is implicit.


> 4. "Integration with Process Management": for "Resource constrained
> connectors", I am not sure how it is different in deployment from
> "Copycat-as-a-service"? I feel there are generally three different types:
>
>   1) run-as-a-service: on a shared cluster equipped with some resource
> manager, a Copycat framework is ever-running and users submit their
> connector jobs via REST.
>   2) standalone: on a single machine, start a Copycat instance with the
> configured master + #.workers processes via some cmdline tool.
>   3) embedded library: the Copycat code will be running on whatever the
> embedding application is running on.
>

The reason it's different from Copycat-as-a-service is because you can
apply resource constraints *on a single, specific copycat connector*. In
"as-a-service" mode, all the connectors and tasks are mixed up across the
workers, so if you want to set a CPU or memory constraint on one
connector's tasks, you can't do that. In order to do that with a resource
manager that works at the process level and support varying constraints
(e.g. connector A gets 1 CPU, connector B gets 10 CPU), you need to make
sure the processes you are applying limits to only contain one connector's
tasks.

Because "resource constrained connectors" only runs one connector and it's
tasks, it is functionally the same as using embedded mode, not adding any
code besides Copycat to the program, and running that under the cluster
manager.


>
> 5. Some terminology suggestions, how about the following descriptions (no
> technical difference except the CLI APIs, just some naming changes) of
> Copycat:
>
> a. Copycat developers needs to implement the "*connector*" module, which
> include the "*master*" and "*worker*" logic:
>
>   1) "master" is responsible for coordinating the assignment from the
> resource stream partitions to the workers (and possibly also the assignment
> from the workers to the destination stream partitions?) *dynamically*, and
>   2) "worker" is responsible for polling from the assigned resource stream
> partitions and pushing to the assigned destination stream partitions.
>

Hmm, I removed most discussion of distributed mode, but this terminology
seems like it will be confusing when we get back to that discussion.
"worker" is already being used for the container process, so there's
already confusion there.

For "connector module", that's fine with me. I've been using "connector
plugin" as I've tried to refine terminology a bit, but either one works --
most important part was to avoid confusion with the "connector" component.


>
> b. Copycat framework includes:
>
>   1) The interface for the connector workers polling-from-resource and
> pushing-to-destination function calls,
>

Yes, I'd call this the "connector API".


>   2) The interface for resource management integration: it leverages the
> underlying resource managers like YARN / Mesos to get a list of allocated "
> *containers*".
>

Ok, guess we ended up on different pages on this again. One of the goals
was to get rid of all this dependence on custom code for every resource
manager framework like Samza has.


>   3) A "*connector manager*" responsible for coordinating the assignment
> from the connector master / worker processes to the allocated containers
> *dynamically*.
>

Yes, this makes sense. This is what I thought might get confusing with
"master connector logic" above. But since both of these components are
doing conceptually similar things (breaking up work and assigning it), the
naming may just always be a bit confusing.


>
> c. Copycat users need to specify the *connector configurations* through
> config files or ZK / other storage systems, including #.tasks, starting
> offsets, etc, and start the *connector job* with its configurations (each
> job as its own configs) via the above mentioned three different modes:
>
>   1) submit the job via REST to a Copycat service running on a shared
> cluster with resource manager, or
>   2) start the job in standalone mode in a single machine, with all the
> master / workers running on that single machine.
>

These two sound fine.


>   3) start a copycat instance first in embedded mode and then add
> connectors, all the added connectors (i.e. their master / workers) run on
> the single machine where the embedding app code is running.
>

I'm not sure if I'm just misreading this, but the goal with embedded mode
is to still support running connectors (i.e. their master/workers in your
terminology) in a distributed fashion. On sinks this is trivial since the
consumer gives you this for free, but the point was to make sure the same
thing works for sources as well (i.e. the framework helps with *source*
partition balancing). If this is what you were thinking as well, could you
clarify what you meant by "run on the single machine where the embedding
app code is running"?


>
> d. As for the CLI APIs, we will only need one for the standalone mode since
> the run-as-a-service mode will always have some resource manager to
> allocate the containers.
>

Ok, this seems to confirm we're still not on the same page for resource
managers... Why can't run-as-a-service mode run without a resource manager?
By "container" in this case do you mean that the resource manager will run
the worker processes, which in turn are assigned connectors/tasks to
execute as threads in that process?

I tried to clarify the KIP to make it clear that the only *processes* I
expected are copycat workers (or standalone mode processes). Resource
managers can be used to start these processes (and help maintain the
cluster by restarting them if they crash), but there's no other deep
integration or custom code required given the current description in the
KIP.

-Ewen



>
> Guozhang
>
>
> On Mon, Jun 29, 2015 at 9:50 AM, Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
>
> > Seems like discussion has mostly quieted down on this. Any more
> questions,
> > comments, or discussion? If nobody brings up any other issues, I'll
> start a
> > vote thread in a day or two.
> >
> > -Ewen
> >
> > On Thu, Jun 25, 2015 at 3:36 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > We were talking on the call about a logo...so here I present "The
> > Original
> > > Copycat":
> > > http://shirtoid.com/67790/the-original-copycat/
> > >
> > > -Jay
> > >
> > > On Tue, Jun 23, 2015 at 6:28 PM, Gwen Shapira <gshap...@cloudera.com>
> > > wrote:
> > >
> > > > One more reason to have CopyCat as a separate project is to sidestep
> > > > the entire "Why CopyCat and not X" discussion :)
> > > >
> > > > On Tue, Jun 23, 2015 at 6:26 PM, Gwen Shapira <gshap...@cloudera.com
> >
> > > > wrote:
> > > > > Re: Flume vs. CopyCat
> > > > >
> > > > > I would love to have an automagically-parallelizing, schema-aware
> > > > > version of Flume with great reliability guarantees. Flume has good
> > > > > core architecture and I'm sure that if the Flume community is
> > > > > interested, it can be extended in that direction.
> > > > >
> > > > > However, the Apache way is not to stop new innovation just because
> > > > > some systems already exists. We develop the best systems we can,
> and
> > > > > users choose the ones they prefer - thats how ecosystems thrive.
> > > > > If we can have Flume and NiFi, Sentry and Argus, Flink and Storm,
> > > > > Parquet and ORC, I'm sure we can also have CopyCat in the zoo :)
> > > > >
> > > > > Gwen
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava
> > > > > <e...@confluent.io> wrote:
> > > > >> On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik <
> > ros...@hortonworks.com>
> > > > wrote:
> > > > >>
> > > > >>> Thanks Jay and Ewen for the response.
> > > > >>>
> > > > >>>
> > > > >>> >@Jay
> > > > >>> >
> > > > >>> > 3. This has a built in notion of parallelism throughout.
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> It was not obvious how it will look like or differ from existing
> > > > systemsÅ 
> > > > >>> since all of existing ones do parallelize data movement.
> > > > >>>
> > > > >>
> > > > >> I'm guessing some confusion here might also be because we want
> both
> > > > >> parallelization and distribution.
> > > > >>
> > > > >> Roughly speaking, I think of Copycat making the consumer group
> > > > abstraction
> > > > >> available for any import task, and the idea is to make this
> > automatic
> > > > and
> > > > >> transparent to the user. This isn't interesting for systems that
> > > > literally
> > > > >> only have a single input stream, but Copycat source connectors
> have
> > a
> > > > >> built-in notion of parallel input streams. The connector's job is
> to
> > > > inform
> > > > >> the the Copycat framework of what input streams there are and
> > Copycat
> > > > >> handles running tasks, balancing the streams across them, handles
> > > > failures
> > > > >> by rebalancing as necessary, provides offset commit and storage so
> > > tasks
> > > > >> can resume from the last known-good state, etc.
> > > > >>
> > > > >> On the sink side, the input is the Kafka consumer group, which
> > > obviously
> > > > >> already has this parallelism built in. Depending on the output,
> this
> > > may
> > > > >> manifest in different ways. For HDFS, the effect is just that your
> > > > output
> > > > >> files are partitioned (one per topic-partition).
> > > > >>
> > > > >> As for other systems, can you be more specific? Some of them
> > obviously
> > > > do
> > > > >> (e.g. Camus), but others require you to handle this manually. I
> > don't
> > > > want
> > > > >> to pick on Flume specifically, but as an example, it requires
> either
> > > > >> configuring multiple (or multiplexed) flows in a single agent or
> > > manage
> > > > >> multiple agents independently. This isn't really the same as what
> > I've
> > > > >> described above where you hand Copycat one config and it
> > automatically
> > > > >> spreads the work across multiple, fault-tolerant tasks. But flume
> is
> > > > also
> > > > >> targeting a much different general problem, trying to build
> > > potentially
> > > > >> large, multi-stage data flows with all sorts of transformations,
> > > > filtering,
> > > > >> etc.
> > > > >>
> > > > >>
> > > > >>>
> > > > >>>
> > > > >>> @Ewen,
> > > > >>>
> > > > >>> >Import: Flume is just one of many similar systems designed
> around
> > > log
> > > > >>> >collection. See notes below, but one major point is that they
> > > > generally
> > > > >>> >don't provide any sort of guaranteed delivery semantics.
> > > > >>>
> > > > >>>
> > > > >>> I think most of them do provide guarantees of some sort (Ex.
> Flume
> > &
> > > > >>> FluentD).
> > > > >>>
> > > > >>
> > > > >> This part of the discussion gets a little bit tricky, not least
> > > because
> > > > it
> > > > >> seems people can't agree on exactly what these terms mean.
> > > > >>
> > > > >> First, some systems that you didn't mention. Logstash definitely
> > > doesn't
> > > > >> have any guarantees as it uses a simple 20-event in-memory buffer
> > > > between
> > > > >> stages. As far as I can tell, Heka doesn't provide these semantics
> > > > either,
> > > > >> although I have not investigated it as deeply.
> > > > >>
> > > > >> fluentd has an article discussing the options for it (
> > > > >> http://docs.fluentd.org/articles/high-availability), but I
> actually
> > > > think
> > > > >> the article on writing plugins is more informative
> > > > >> http://docs.fluentd.org/articles/plugin-development The most
> > > important
> > > > >> point is that input plugins have no way to track or discovery
> > > downstream
> > > > >> delivery (i.e. they cannot get acks, nor is there any sort of
> offset
> > > > >> tracked that it can lookup to discover where to restart upon
> > failure,
> > > > nor
> > > > >> is it guaranteed that after router.emit() returns that the data
> will
> > > > have
> > > > >> already been delivered downstream). So if I have a replicated
> input
> > > data
> > > > >> store, e.g. a replicated database, and I am just reading off it's
> > > > >> changelog, does fluentd actually guarantee something like at least
> > > once
> > > > >> delivery to the sink? In fact, fluentd's own documentation (the
> high
> > > > >> availability doc) describes data loss scenarios that aren't
> inherent
> > > to
> > > > >> every system (e.g., if their log aggregator dies, which not every
> > > > system is
> > > > >> susceptible to, vs. if an event is generated on a single host and
> > that
> > > > host
> > > > >> dies before reporting it anywhere, then of course the data is
> > > > permanently
> > > > >> lost).
> > > > >>
> > > > >> Flume actually does have a (somewhat confusingly named)
> transaction
> > > > concept
> > > > >> to help control this. The reliability actually depends on what
> type
> > of
> > > > >> channel implementation you use. Gwen and Jeff from Cloudera
> > integrated
> > > > >> Kafka and Flume, including a Kafka channel (see
> > > > >>
> > > >
> > >
> >
> http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/
> > > > ).
> > > > >> This does allow for better control over delivery semantics, and I
> > > think
> > > > if
> > > > >> you use something like Kafka for every channel in your pipeline,
> you
> > > can
> > > > >> get something like what Copycat can provide. I'd argue flume's
> > > approach
> > > > has
> > > > >> some other drawbacks though. In order to work correctly, every
> > source
> > > > and
> > > > >> sink has to handle the transaction semantics, which adds
> complexity
> > > > >> (although they do offer great skeleton examples in their docs!).
> > > > >>
> > > > >> Copycat tries to avoid that complexity for connector developers by
> > > > changing
> > > > >> the framework to use streams, offsets, and commits, and pushing
> the
> > > > >> complexities of dealing with any sorts of errors/failures into the
> > > > >> framework. Ideally connector developers only need to a) check for
> > > > offsets
> > > > >> at startup and rewind to the last known committed offset and b)
> load
> > > > events
> > > > >> from the source system (with stream IDs and offsets) and pass them
> > to
> > > > the
> > > > >> framework.
> > > > >>
> > > > >>
> > > > >>>
> > > > >>> >YARN: My point isn't that YARN is bad, it's that tying to any
> > > > particular
> > > > >>> >cluster manager severely limits the applicability of the tool.
> The
> > > > goal is
> > > > >>> >to make Copycat agnostic to the cluster manager so it can run
> > under
> > > > Mesos,
> > > > >>> >YARN, etc.
> > > > >>>
> > > > >>> ok. Got it. Sounds like there is plan to do some work here to
> > ensure
> > > > >>> out-of-the-box it works with more than one scheduler (as @Jay
> > listed
> > > > out).
> > > > >>> In that case, IMO it would be better to actually rephrase it in
> the
> > > KIP
> > > > >>> that it will support more than one scheduler.
> > > > >>>
> > > > >>>
> > > > >> Tried to add some wording to clarify that.
> > > > >>
> > > > >>
> > > > >>>
> > > > >>> >Exactly once: You accomplish this in any system by managing
> > offsets
> > > > in the
> > > > >>> >destination system atomically with the data or through some kind
> > of
> > > > >>> >deduplication. Jiangjie actually just gave a great talk about
> this
> > > > issue
> > > > >>> >at
> > > > >>> >a recent Kafka meetup, perhaps he can share some slides about
> it.
> > > > When you
> > > > >>> >see all the details involved, you'll see why I think it might be
> > > nice
> > > > to
> > > > >>> >have the framework help you manage the complexities of achieving
> > > > different
> > > > >>> >delivery semantics ;)
> > > > >>>
> > > > >>>
> > > > >>> Deduplication as a post processing step is a common
> recommendation
> > > done
> > > > >>> today Å  but that is a workaround/fix for the inability to provide
> > > > >>> exactly-once by the delivery systems. IMO such post processing
> > should
> > > > not
> > > > >>> be considered part of the "exacty-once" guarantee of Copycat.
> > > > >>>
> > > > >>>
> > > > >>> Will be good to know how this guarantee will be possible when
> > > > delivering
> > > > >>> to HDFS.
> > > > >>> Would be great if someone can share those slides if it is
> discussed
> > > > there.
> > > > >>>
> > > > >>>
> > > > >> For HDFS, the gist of the solution is to write to temporary files
> > and
> > > > then
> > > > >> rename atomically to their final destination, including offset
> > > > information
> > > > >> (e.g., it can just be in the filename). Readers only see files
> that
> > > have
> > > > >> been "committed". If there is a failure, any existing temp files
> get
> > > > >> cleaned up and reading is reset to the last committed offset.
> There
> > > are
> > > > >> some tricky details if you have zombie processes and depending on
> > how
> > > > you
> > > > >> organize the data across files, but this isn't really the point of
> > > this
> > > > >> KIP. If you're interested in HDFS specifically, I'd suggest
> looking
> > at
> > > > >> Camus's implementation.
> > > > >>
> > > > >>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> Was looking for clarification on this ..
> > > > >>> - Export side - is this like a map reduce kind of job or
> something
> > > > else ?
> > > > >>> If delivering to hdfs would this be running on the hadoop cluster
> > or
> > > > >>> outside ?
> > > > >>>
> > > > >> - Import side - how does this look ? Is it a bunch of flume like
> > > > processes
> > > > >>> ? maybe just some kind of a broker that translates the incoming
> > > > protocol
> > > > >>> into outgoing Kafka producer api protocol ? If delivering to
> hdfs,
> > > will
> > > > >>> this run on the cluster or outside ?
> > > > >>>
> > > > >>
> > > > >> No mapreduce; in fact, no other frameworks required unless the
> > > connector
> > > > >> needs it for some reason. Both source and sink look structurally
> the
> > > > same.
> > > > >> Probably the most common scenario is to run a set of workers that
> > > > provide
> > > > >> the copycat service. You submit connector jobs to run on these
> > > workers.
> > > > A
> > > > >> coordinator handles distributing the work across worker nodes.
> > > > Coordinators
> > > > >> determine how to divide the tasks and generate configs for them,
> > then
> > > > the
> > > > >> framework handles distributing that work. Each individual task
> > handles
> > > > some
> > > > >> subset of the job. For source tasks, that subset is a set of input
> > > > streams
> > > > >> (in the JDBC example in the KIP, each table would have a
> > corresponding
> > > > >> stream). For sink tasks, the subset is determined automatically by
> > the
> > > > >> framework via the underlying consumer group as a subset of
> > > > topic-partitions
> > > > >> (since the input is from Kafka). Connectors are kept simple, just
> > > > >> processing streams of records (either generating them by reading
> > from
> > > > the
> > > > >> source system or recording them into the sink system). Source
> tasks
> > > also
> > > > >> include information about offsets, and sink tasks either need to
> > > manage
> > > > >> offsets themselves or implement flush() functionality. Given these
> > > > >> primitives, the framework can then handle other complexities like
> > > > different
> > > > >> delivery semantics without any additional support from the
> > connectors.
> > > > >>
> > > > >> The motivation for the additional modes of execution (agent,
> > embedded)
> > > > was
> > > > >> to support a couple of other common use cases. Agent mode is
> > > completely
> > > > >> standalone, which provides for a much simpler implementation and
> > > handles
> > > > >> use cases where there isn't an easy way to avoid running the job
> > > across
> > > > >> many machines (e.g., if you have to load logs directly from log
> > > files).
> > > > >> Embedded mode is actually a simple variant of the distributed
> mode,
> > > but
> > > > >> lets you setup and run the entire cluster alongside the rest of
> your
> > > > >> distributed app. This is useful if you want to get up and running
> > with
> > > > an
> > > > >> application where you need to, for example, import data from
> another
> > > > >> service into Kafka, then consume and process that data. You can
> > setup
> > > > the
> > > > >> worker and submit a job directly from your code, reducing the
> > > > operational
> > > > >> complexity. It's probably not the right long term solution as your
> > > usage
> > > > >> expands, but it can significantly ease adoption.
> > > > >>
> > > > >>
> > > > >>>
> > > > >>>
> > > > >>> I still think adding one or two specific end-to-end use-cases in
> > the
> > > > KIP,
> > > > >>> showing how copycat will pan out for them for import/export will
> > > really
> > > > >>> clarify things.
> > > > >>>
> > > > >>
> > > > >> There were a couple of examples already in the KIP -- JDBC, HDFS,
> > log
> > > > >> import, and now I've also added mirror maker. Were you looking for
> > > > >> something more specific? I could also explain a full source ->
> kafka
> > > ->
> > > > >> sink pipeline, but I don't know that there's much to add there
> > beyond
> > > > the
> > > > >> fact that we would like schemas to carry across the entire
> pipeline.
> > > > >> Otherwise it's just chaining connectors. Besides, I think most of
> > the
> > > > >> interesting use cases actually have additional processing steps in
> > > > between,
> > > > >> i.e. using stream processing frameworks or custom consumers +
> > > producers.
> > > > >>
> > > > >> --
> > > > >> Thanks,
> > > > >> Ewen
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>
>
>
> --
> -- Guozhang
>



-- 
Thanks,
Ewen

Reply via email to