Seems like discussion has mostly quieted down on this. Any more questions,
comments, or discussion? If nobody brings up any other issues, I'll start a
vote thread in a day or two.

-Ewen

On Thu, Jun 25, 2015 at 3:36 PM, Jay Kreps <j...@confluent.io> wrote:

> We were talking on the call about a logo...so here I present "The Original
> Copycat":
> http://shirtoid.com/67790/the-original-copycat/
>
> -Jay
>
> On Tue, Jun 23, 2015 at 6:28 PM, Gwen Shapira <gshap...@cloudera.com>
> wrote:
>
> > One more reason to have CopyCat as a separate project is to sidestep
> > the entire "Why CopyCat and not X" discussion :)
> >
> > On Tue, Jun 23, 2015 at 6:26 PM, Gwen Shapira <gshap...@cloudera.com>
> > wrote:
> > > Re: Flume vs. CopyCat
> > >
> > > I would love to have an automagically-parallelizing, schema-aware
> > > version of Flume with great reliability guarantees. Flume has good
> > > core architecture and I'm sure that if the Flume community is
> > > interested, it can be extended in that direction.
> > >
> > > However, the Apache way is not to stop new innovation just because
> > > some systems already exists. We develop the best systems we can, and
> > > users choose the ones they prefer - thats how ecosystems thrive.
> > > If we can have Flume and NiFi, Sentry and Argus, Flink and Storm,
> > > Parquet and ORC, I'm sure we can also have CopyCat in the zoo :)
> > >
> > > Gwen
> > >
> > >
> > >
> > > On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava
> > > <e...@confluent.io> wrote:
> > >> On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik <ros...@hortonworks.com>
> > wrote:
> > >>
> > >>> Thanks Jay and Ewen for the response.
> > >>>
> > >>>
> > >>> >@Jay
> > >>> >
> > >>> > 3. This has a built in notion of parallelism throughout.
> > >>>
> > >>>
> > >>>
> > >>> It was not obvious how it will look like or differ from existing
> > systemsÅ 
> > >>> since all of existing ones do parallelize data movement.
> > >>>
> > >>
> > >> I'm guessing some confusion here might also be because we want both
> > >> parallelization and distribution.
> > >>
> > >> Roughly speaking, I think of Copycat making the consumer group
> > abstraction
> > >> available for any import task, and the idea is to make this automatic
> > and
> > >> transparent to the user. This isn't interesting for systems that
> > literally
> > >> only have a single input stream, but Copycat source connectors have a
> > >> built-in notion of parallel input streams. The connector's job is to
> > inform
> > >> the the Copycat framework of what input streams there are and Copycat
> > >> handles running tasks, balancing the streams across them, handles
> > failures
> > >> by rebalancing as necessary, provides offset commit and storage so
> tasks
> > >> can resume from the last known-good state, etc.
> > >>
> > >> On the sink side, the input is the Kafka consumer group, which
> obviously
> > >> already has this parallelism built in. Depending on the output, this
> may
> > >> manifest in different ways. For HDFS, the effect is just that your
> > output
> > >> files are partitioned (one per topic-partition).
> > >>
> > >> As for other systems, can you be more specific? Some of them obviously
> > do
> > >> (e.g. Camus), but others require you to handle this manually. I don't
> > want
> > >> to pick on Flume specifically, but as an example, it requires either
> > >> configuring multiple (or multiplexed) flows in a single agent or
> manage
> > >> multiple agents independently. This isn't really the same as what I've
> > >> described above where you hand Copycat one config and it automatically
> > >> spreads the work across multiple, fault-tolerant tasks. But flume is
> > also
> > >> targeting a much different general problem, trying to build
> potentially
> > >> large, multi-stage data flows with all sorts of transformations,
> > filtering,
> > >> etc.
> > >>
> > >>
> > >>>
> > >>>
> > >>> @Ewen,
> > >>>
> > >>> >Import: Flume is just one of many similar systems designed around
> log
> > >>> >collection. See notes below, but one major point is that they
> > generally
> > >>> >don't provide any sort of guaranteed delivery semantics.
> > >>>
> > >>>
> > >>> I think most of them do provide guarantees of some sort (Ex. Flume &
> > >>> FluentD).
> > >>>
> > >>
> > >> This part of the discussion gets a little bit tricky, not least
> because
> > it
> > >> seems people can't agree on exactly what these terms mean.
> > >>
> > >> First, some systems that you didn't mention. Logstash definitely
> doesn't
> > >> have any guarantees as it uses a simple 20-event in-memory buffer
> > between
> > >> stages. As far as I can tell, Heka doesn't provide these semantics
> > either,
> > >> although I have not investigated it as deeply.
> > >>
> > >> fluentd has an article discussing the options for it (
> > >> http://docs.fluentd.org/articles/high-availability), but I actually
> > think
> > >> the article on writing plugins is more informative
> > >> http://docs.fluentd.org/articles/plugin-development The most
> important
> > >> point is that input plugins have no way to track or discovery
> downstream
> > >> delivery (i.e. they cannot get acks, nor is there any sort of offset
> > >> tracked that it can lookup to discover where to restart upon failure,
> > nor
> > >> is it guaranteed that after router.emit() returns that the data will
> > have
> > >> already been delivered downstream). So if I have a replicated input
> data
> > >> store, e.g. a replicated database, and I am just reading off it's
> > >> changelog, does fluentd actually guarantee something like at least
> once
> > >> delivery to the sink? In fact, fluentd's own documentation (the high
> > >> availability doc) describes data loss scenarios that aren't inherent
> to
> > >> every system (e.g., if their log aggregator dies, which not every
> > system is
> > >> susceptible to, vs. if an event is generated on a single host and that
> > host
> > >> dies before reporting it anywhere, then of course the data is
> > permanently
> > >> lost).
> > >>
> > >> Flume actually does have a (somewhat confusingly named) transaction
> > concept
> > >> to help control this. The reliability actually depends on what type of
> > >> channel implementation you use. Gwen and Jeff from Cloudera integrated
> > >> Kafka and Flume, including a Kafka channel (see
> > >>
> >
> http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/
> > ).
> > >> This does allow for better control over delivery semantics, and I
> think
> > if
> > >> you use something like Kafka for every channel in your pipeline, you
> can
> > >> get something like what Copycat can provide. I'd argue flume's
> approach
> > has
> > >> some other drawbacks though. In order to work correctly, every source
> > and
> > >> sink has to handle the transaction semantics, which adds complexity
> > >> (although they do offer great skeleton examples in their docs!).
> > >>
> > >> Copycat tries to avoid that complexity for connector developers by
> > changing
> > >> the framework to use streams, offsets, and commits, and pushing the
> > >> complexities of dealing with any sorts of errors/failures into the
> > >> framework. Ideally connector developers only need to a) check for
> > offsets
> > >> at startup and rewind to the last known committed offset and b) load
> > events
> > >> from the source system (with stream IDs and offsets) and pass them to
> > the
> > >> framework.
> > >>
> > >>
> > >>>
> > >>> >YARN: My point isn't that YARN is bad, it's that tying to any
> > particular
> > >>> >cluster manager severely limits the applicability of the tool. The
> > goal is
> > >>> >to make Copycat agnostic to the cluster manager so it can run under
> > Mesos,
> > >>> >YARN, etc.
> > >>>
> > >>> ok. Got it. Sounds like there is plan to do some work here to ensure
> > >>> out-of-the-box it works with more than one scheduler (as @Jay listed
> > out).
> > >>> In that case, IMO it would be better to actually rephrase it in the
> KIP
> > >>> that it will support more than one scheduler.
> > >>>
> > >>>
> > >> Tried to add some wording to clarify that.
> > >>
> > >>
> > >>>
> > >>> >Exactly once: You accomplish this in any system by managing offsets
> > in the
> > >>> >destination system atomically with the data or through some kind of
> > >>> >deduplication. Jiangjie actually just gave a great talk about this
> > issue
> > >>> >at
> > >>> >a recent Kafka meetup, perhaps he can share some slides about it.
> > When you
> > >>> >see all the details involved, you'll see why I think it might be
> nice
> > to
> > >>> >have the framework help you manage the complexities of achieving
> > different
> > >>> >delivery semantics ;)
> > >>>
> > >>>
> > >>> Deduplication as a post processing step is a common recommendation
> done
> > >>> today Å  but that is a workaround/fix for the inability to provide
> > >>> exactly-once by the delivery systems. IMO such post processing should
> > not
> > >>> be considered part of the "exacty-once" guarantee of Copycat.
> > >>>
> > >>>
> > >>> Will be good to know how this guarantee will be possible when
> > delivering
> > >>> to HDFS.
> > >>> Would be great if someone can share those slides if it is discussed
> > there.
> > >>>
> > >>>
> > >> For HDFS, the gist of the solution is to write to temporary files and
> > then
> > >> rename atomically to their final destination, including offset
> > information
> > >> (e.g., it can just be in the filename). Readers only see files that
> have
> > >> been "committed". If there is a failure, any existing temp files get
> > >> cleaned up and reading is reset to the last committed offset. There
> are
> > >> some tricky details if you have zombie processes and depending on how
> > you
> > >> organize the data across files, but this isn't really the point of
> this
> > >> KIP. If you're interested in HDFS specifically, I'd suggest looking at
> > >> Camus's implementation.
> > >>
> > >>
> > >>>
> > >>>
> > >>>
> > >>> Was looking for clarification on this ..
> > >>> - Export side - is this like a map reduce kind of job or something
> > else ?
> > >>> If delivering to hdfs would this be running on the hadoop cluster or
> > >>> outside ?
> > >>>
> > >> - Import side - how does this look ? Is it a bunch of flume like
> > processes
> > >>> ? maybe just some kind of a broker that translates the incoming
> > protocol
> > >>> into outgoing Kafka producer api protocol ? If delivering to hdfs,
> will
> > >>> this run on the cluster or outside ?
> > >>>
> > >>
> > >> No mapreduce; in fact, no other frameworks required unless the
> connector
> > >> needs it for some reason. Both source and sink look structurally the
> > same.
> > >> Probably the most common scenario is to run a set of workers that
> > provide
> > >> the copycat service. You submit connector jobs to run on these
> workers.
> > A
> > >> coordinator handles distributing the work across worker nodes.
> > Coordinators
> > >> determine how to divide the tasks and generate configs for them, then
> > the
> > >> framework handles distributing that work. Each individual task handles
> > some
> > >> subset of the job. For source tasks, that subset is a set of input
> > streams
> > >> (in the JDBC example in the KIP, each table would have a corresponding
> > >> stream). For sink tasks, the subset is determined automatically by the
> > >> framework via the underlying consumer group as a subset of
> > topic-partitions
> > >> (since the input is from Kafka). Connectors are kept simple, just
> > >> processing streams of records (either generating them by reading from
> > the
> > >> source system or recording them into the sink system). Source tasks
> also
> > >> include information about offsets, and sink tasks either need to
> manage
> > >> offsets themselves or implement flush() functionality. Given these
> > >> primitives, the framework can then handle other complexities like
> > different
> > >> delivery semantics without any additional support from the connectors.
> > >>
> > >> The motivation for the additional modes of execution (agent, embedded)
> > was
> > >> to support a couple of other common use cases. Agent mode is
> completely
> > >> standalone, which provides for a much simpler implementation and
> handles
> > >> use cases where there isn't an easy way to avoid running the job
> across
> > >> many machines (e.g., if you have to load logs directly from log
> files).
> > >> Embedded mode is actually a simple variant of the distributed mode,
> but
> > >> lets you setup and run the entire cluster alongside the rest of your
> > >> distributed app. This is useful if you want to get up and running with
> > an
> > >> application where you need to, for example, import data from another
> > >> service into Kafka, then consume and process that data. You can setup
> > the
> > >> worker and submit a job directly from your code, reducing the
> > operational
> > >> complexity. It's probably not the right long term solution as your
> usage
> > >> expands, but it can significantly ease adoption.
> > >>
> > >>
> > >>>
> > >>>
> > >>> I still think adding one or two specific end-to-end use-cases in the
> > KIP,
> > >>> showing how copycat will pan out for them for import/export will
> really
> > >>> clarify things.
> > >>>
> > >>
> > >> There were a couple of examples already in the KIP -- JDBC, HDFS, log
> > >> import, and now I've also added mirror maker. Were you looking for
> > >> something more specific? I could also explain a full source -> kafka
> ->
> > >> sink pipeline, but I don't know that there's much to add there beyond
> > the
> > >> fact that we would like schemas to carry across the entire pipeline.
> > >> Otherwise it's just chaining connectors. Besides, I think most of the
> > >> interesting use cases actually have additional processing steps in
> > between,
> > >> i.e. using stream processing frameworks or custom consumers +
> producers.
> > >>
> > >> --
> > >> Thanks,
> > >> Ewen
> >
>



-- 
Thanks,
Ewen

Reply via email to