We were talking on the call about a logo...so here I present "The Original
Copycat":
http://shirtoid.com/67790/the-original-copycat/

-Jay

On Tue, Jun 23, 2015 at 6:28 PM, Gwen Shapira <gshap...@cloudera.com> wrote:

> One more reason to have CopyCat as a separate project is to sidestep
> the entire "Why CopyCat and not X" discussion :)
>
> On Tue, Jun 23, 2015 at 6:26 PM, Gwen Shapira <gshap...@cloudera.com>
> wrote:
> > Re: Flume vs. CopyCat
> >
> > I would love to have an automagically-parallelizing, schema-aware
> > version of Flume with great reliability guarantees. Flume has good
> > core architecture and I'm sure that if the Flume community is
> > interested, it can be extended in that direction.
> >
> > However, the Apache way is not to stop new innovation just because
> > some systems already exists. We develop the best systems we can, and
> > users choose the ones they prefer - thats how ecosystems thrive.
> > If we can have Flume and NiFi, Sentry and Argus, Flink and Storm,
> > Parquet and ORC, I'm sure we can also have CopyCat in the zoo :)
> >
> > Gwen
> >
> >
> >
> > On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava
> > <e...@confluent.io> wrote:
> >> On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik <ros...@hortonworks.com>
> wrote:
> >>
> >>> Thanks Jay and Ewen for the response.
> >>>
> >>>
> >>> >@Jay
> >>> >
> >>> > 3. This has a built in notion of parallelism throughout.
> >>>
> >>>
> >>>
> >>> It was not obvious how it will look like or differ from existing
> systemsÅ 
> >>> since all of existing ones do parallelize data movement.
> >>>
> >>
> >> I'm guessing some confusion here might also be because we want both
> >> parallelization and distribution.
> >>
> >> Roughly speaking, I think of Copycat making the consumer group
> abstraction
> >> available for any import task, and the idea is to make this automatic
> and
> >> transparent to the user. This isn't interesting for systems that
> literally
> >> only have a single input stream, but Copycat source connectors have a
> >> built-in notion of parallel input streams. The connector's job is to
> inform
> >> the the Copycat framework of what input streams there are and Copycat
> >> handles running tasks, balancing the streams across them, handles
> failures
> >> by rebalancing as necessary, provides offset commit and storage so tasks
> >> can resume from the last known-good state, etc.
> >>
> >> On the sink side, the input is the Kafka consumer group, which obviously
> >> already has this parallelism built in. Depending on the output, this may
> >> manifest in different ways. For HDFS, the effect is just that your
> output
> >> files are partitioned (one per topic-partition).
> >>
> >> As for other systems, can you be more specific? Some of them obviously
> do
> >> (e.g. Camus), but others require you to handle this manually. I don't
> want
> >> to pick on Flume specifically, but as an example, it requires either
> >> configuring multiple (or multiplexed) flows in a single agent or manage
> >> multiple agents independently. This isn't really the same as what I've
> >> described above where you hand Copycat one config and it automatically
> >> spreads the work across multiple, fault-tolerant tasks. But flume is
> also
> >> targeting a much different general problem, trying to build potentially
> >> large, multi-stage data flows with all sorts of transformations,
> filtering,
> >> etc.
> >>
> >>
> >>>
> >>>
> >>> @Ewen,
> >>>
> >>> >Import: Flume is just one of many similar systems designed around log
> >>> >collection. See notes below, but one major point is that they
> generally
> >>> >don't provide any sort of guaranteed delivery semantics.
> >>>
> >>>
> >>> I think most of them do provide guarantees of some sort (Ex. Flume &
> >>> FluentD).
> >>>
> >>
> >> This part of the discussion gets a little bit tricky, not least because
> it
> >> seems people can't agree on exactly what these terms mean.
> >>
> >> First, some systems that you didn't mention. Logstash definitely doesn't
> >> have any guarantees as it uses a simple 20-event in-memory buffer
> between
> >> stages. As far as I can tell, Heka doesn't provide these semantics
> either,
> >> although I have not investigated it as deeply.
> >>
> >> fluentd has an article discussing the options for it (
> >> http://docs.fluentd.org/articles/high-availability), but I actually
> think
> >> the article on writing plugins is more informative
> >> http://docs.fluentd.org/articles/plugin-development The most important
> >> point is that input plugins have no way to track or discovery downstream
> >> delivery (i.e. they cannot get acks, nor is there any sort of offset
> >> tracked that it can lookup to discover where to restart upon failure,
> nor
> >> is it guaranteed that after router.emit() returns that the data will
> have
> >> already been delivered downstream). So if I have a replicated input data
> >> store, e.g. a replicated database, and I am just reading off it's
> >> changelog, does fluentd actually guarantee something like at least once
> >> delivery to the sink? In fact, fluentd's own documentation (the high
> >> availability doc) describes data loss scenarios that aren't inherent to
> >> every system (e.g., if their log aggregator dies, which not every
> system is
> >> susceptible to, vs. if an event is generated on a single host and that
> host
> >> dies before reporting it anywhere, then of course the data is
> permanently
> >> lost).
> >>
> >> Flume actually does have a (somewhat confusingly named) transaction
> concept
> >> to help control this. The reliability actually depends on what type of
> >> channel implementation you use. Gwen and Jeff from Cloudera integrated
> >> Kafka and Flume, including a Kafka channel (see
> >>
> http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/
> ).
> >> This does allow for better control over delivery semantics, and I think
> if
> >> you use something like Kafka for every channel in your pipeline, you can
> >> get something like what Copycat can provide. I'd argue flume's approach
> has
> >> some other drawbacks though. In order to work correctly, every source
> and
> >> sink has to handle the transaction semantics, which adds complexity
> >> (although they do offer great skeleton examples in their docs!).
> >>
> >> Copycat tries to avoid that complexity for connector developers by
> changing
> >> the framework to use streams, offsets, and commits, and pushing the
> >> complexities of dealing with any sorts of errors/failures into the
> >> framework. Ideally connector developers only need to a) check for
> offsets
> >> at startup and rewind to the last known committed offset and b) load
> events
> >> from the source system (with stream IDs and offsets) and pass them to
> the
> >> framework.
> >>
> >>
> >>>
> >>> >YARN: My point isn't that YARN is bad, it's that tying to any
> particular
> >>> >cluster manager severely limits the applicability of the tool. The
> goal is
> >>> >to make Copycat agnostic to the cluster manager so it can run under
> Mesos,
> >>> >YARN, etc.
> >>>
> >>> ok. Got it. Sounds like there is plan to do some work here to ensure
> >>> out-of-the-box it works with more than one scheduler (as @Jay listed
> out).
> >>> In that case, IMO it would be better to actually rephrase it in the KIP
> >>> that it will support more than one scheduler.
> >>>
> >>>
> >> Tried to add some wording to clarify that.
> >>
> >>
> >>>
> >>> >Exactly once: You accomplish this in any system by managing offsets
> in the
> >>> >destination system atomically with the data or through some kind of
> >>> >deduplication. Jiangjie actually just gave a great talk about this
> issue
> >>> >at
> >>> >a recent Kafka meetup, perhaps he can share some slides about it.
> When you
> >>> >see all the details involved, you'll see why I think it might be nice
> to
> >>> >have the framework help you manage the complexities of achieving
> different
> >>> >delivery semantics ;)
> >>>
> >>>
> >>> Deduplication as a post processing step is a common recommendation done
> >>> today Å  but that is a workaround/fix for the inability to provide
> >>> exactly-once by the delivery systems. IMO such post processing should
> not
> >>> be considered part of the "exacty-once" guarantee of Copycat.
> >>>
> >>>
> >>> Will be good to know how this guarantee will be possible when
> delivering
> >>> to HDFS.
> >>> Would be great if someone can share those slides if it is discussed
> there.
> >>>
> >>>
> >> For HDFS, the gist of the solution is to write to temporary files and
> then
> >> rename atomically to their final destination, including offset
> information
> >> (e.g., it can just be in the filename). Readers only see files that have
> >> been "committed". If there is a failure, any existing temp files get
> >> cleaned up and reading is reset to the last committed offset. There are
> >> some tricky details if you have zombie processes and depending on how
> you
> >> organize the data across files, but this isn't really the point of this
> >> KIP. If you're interested in HDFS specifically, I'd suggest looking at
> >> Camus's implementation.
> >>
> >>
> >>>
> >>>
> >>>
> >>> Was looking for clarification on this ..
> >>> - Export side - is this like a map reduce kind of job or something
> else ?
> >>> If delivering to hdfs would this be running on the hadoop cluster or
> >>> outside ?
> >>>
> >> - Import side - how does this look ? Is it a bunch of flume like
> processes
> >>> ? maybe just some kind of a broker that translates the incoming
> protocol
> >>> into outgoing Kafka producer api protocol ? If delivering to hdfs, will
> >>> this run on the cluster or outside ?
> >>>
> >>
> >> No mapreduce; in fact, no other frameworks required unless the connector
> >> needs it for some reason. Both source and sink look structurally the
> same.
> >> Probably the most common scenario is to run a set of workers that
> provide
> >> the copycat service. You submit connector jobs to run on these workers.
> A
> >> coordinator handles distributing the work across worker nodes.
> Coordinators
> >> determine how to divide the tasks and generate configs for them, then
> the
> >> framework handles distributing that work. Each individual task handles
> some
> >> subset of the job. For source tasks, that subset is a set of input
> streams
> >> (in the JDBC example in the KIP, each table would have a corresponding
> >> stream). For sink tasks, the subset is determined automatically by the
> >> framework via the underlying consumer group as a subset of
> topic-partitions
> >> (since the input is from Kafka). Connectors are kept simple, just
> >> processing streams of records (either generating them by reading from
> the
> >> source system or recording them into the sink system). Source tasks also
> >> include information about offsets, and sink tasks either need to manage
> >> offsets themselves or implement flush() functionality. Given these
> >> primitives, the framework can then handle other complexities like
> different
> >> delivery semantics without any additional support from the connectors.
> >>
> >> The motivation for the additional modes of execution (agent, embedded)
> was
> >> to support a couple of other common use cases. Agent mode is completely
> >> standalone, which provides for a much simpler implementation and handles
> >> use cases where there isn't an easy way to avoid running the job across
> >> many machines (e.g., if you have to load logs directly from log files).
> >> Embedded mode is actually a simple variant of the distributed mode, but
> >> lets you setup and run the entire cluster alongside the rest of your
> >> distributed app. This is useful if you want to get up and running with
> an
> >> application where you need to, for example, import data from another
> >> service into Kafka, then consume and process that data. You can setup
> the
> >> worker and submit a job directly from your code, reducing the
> operational
> >> complexity. It's probably not the right long term solution as your usage
> >> expands, but it can significantly ease adoption.
> >>
> >>
> >>>
> >>>
> >>> I still think adding one or two specific end-to-end use-cases in the
> KIP,
> >>> showing how copycat will pan out for them for import/export will really
> >>> clarify things.
> >>>
> >>
> >> There were a couple of examples already in the KIP -- JDBC, HDFS, log
> >> import, and now I've also added mirror maker. Were you looking for
> >> something more specific? I could also explain a full source -> kafka ->
> >> sink pipeline, but I don't know that there's much to add there beyond
> the
> >> fact that we would like schemas to carry across the entire pipeline.
> >> Otherwise it's just chaining connectors. Besides, I think most of the
> >> interesting use cases actually have additional processing steps in
> between,
> >> i.e. using stream processing frameworks or custom consumers + producers.
> >>
> >> --
> >> Thanks,
> >> Ewen
>

Reply via email to