We were talking on the call about a logo...so here I present "The Original Copycat": http://shirtoid.com/67790/the-original-copycat/
-Jay On Tue, Jun 23, 2015 at 6:28 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > One more reason to have CopyCat as a separate project is to sidestep > the entire "Why CopyCat and not X" discussion :) > > On Tue, Jun 23, 2015 at 6:26 PM, Gwen Shapira <gshap...@cloudera.com> > wrote: > > Re: Flume vs. CopyCat > > > > I would love to have an automagically-parallelizing, schema-aware > > version of Flume with great reliability guarantees. Flume has good > > core architecture and I'm sure that if the Flume community is > > interested, it can be extended in that direction. > > > > However, the Apache way is not to stop new innovation just because > > some systems already exists. We develop the best systems we can, and > > users choose the ones they prefer - thats how ecosystems thrive. > > If we can have Flume and NiFi, Sentry and Argus, Flink and Storm, > > Parquet and ORC, I'm sure we can also have CopyCat in the zoo :) > > > > Gwen > > > > > > > > On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava > > <e...@confluent.io> wrote: > >> On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik <ros...@hortonworks.com> > wrote: > >> > >>> Thanks Jay and Ewen for the response. > >>> > >>> > >>> >@Jay > >>> > > >>> > 3. This has a built in notion of parallelism throughout. > >>> > >>> > >>> > >>> It was not obvious how it will look like or differ from existing > systemsÅ > >>> since all of existing ones do parallelize data movement. > >>> > >> > >> I'm guessing some confusion here might also be because we want both > >> parallelization and distribution. > >> > >> Roughly speaking, I think of Copycat making the consumer group > abstraction > >> available for any import task, and the idea is to make this automatic > and > >> transparent to the user. This isn't interesting for systems that > literally > >> only have a single input stream, but Copycat source connectors have a > >> built-in notion of parallel input streams. The connector's job is to > inform > >> the the Copycat framework of what input streams there are and Copycat > >> handles running tasks, balancing the streams across them, handles > failures > >> by rebalancing as necessary, provides offset commit and storage so tasks > >> can resume from the last known-good state, etc. > >> > >> On the sink side, the input is the Kafka consumer group, which obviously > >> already has this parallelism built in. Depending on the output, this may > >> manifest in different ways. For HDFS, the effect is just that your > output > >> files are partitioned (one per topic-partition). > >> > >> As for other systems, can you be more specific? Some of them obviously > do > >> (e.g. Camus), but others require you to handle this manually. I don't > want > >> to pick on Flume specifically, but as an example, it requires either > >> configuring multiple (or multiplexed) flows in a single agent or manage > >> multiple agents independently. This isn't really the same as what I've > >> described above where you hand Copycat one config and it automatically > >> spreads the work across multiple, fault-tolerant tasks. But flume is > also > >> targeting a much different general problem, trying to build potentially > >> large, multi-stage data flows with all sorts of transformations, > filtering, > >> etc. > >> > >> > >>> > >>> > >>> @Ewen, > >>> > >>> >Import: Flume is just one of many similar systems designed around log > >>> >collection. See notes below, but one major point is that they > generally > >>> >don't provide any sort of guaranteed delivery semantics. > >>> > >>> > >>> I think most of them do provide guarantees of some sort (Ex. Flume & > >>> FluentD). > >>> > >> > >> This part of the discussion gets a little bit tricky, not least because > it > >> seems people can't agree on exactly what these terms mean. > >> > >> First, some systems that you didn't mention. Logstash definitely doesn't > >> have any guarantees as it uses a simple 20-event in-memory buffer > between > >> stages. As far as I can tell, Heka doesn't provide these semantics > either, > >> although I have not investigated it as deeply. > >> > >> fluentd has an article discussing the options for it ( > >> http://docs.fluentd.org/articles/high-availability), but I actually > think > >> the article on writing plugins is more informative > >> http://docs.fluentd.org/articles/plugin-development The most important > >> point is that input plugins have no way to track or discovery downstream > >> delivery (i.e. they cannot get acks, nor is there any sort of offset > >> tracked that it can lookup to discover where to restart upon failure, > nor > >> is it guaranteed that after router.emit() returns that the data will > have > >> already been delivered downstream). So if I have a replicated input data > >> store, e.g. a replicated database, and I am just reading off it's > >> changelog, does fluentd actually guarantee something like at least once > >> delivery to the sink? In fact, fluentd's own documentation (the high > >> availability doc) describes data loss scenarios that aren't inherent to > >> every system (e.g., if their log aggregator dies, which not every > system is > >> susceptible to, vs. if an event is generated on a single host and that > host > >> dies before reporting it anywhere, then of course the data is > permanently > >> lost). > >> > >> Flume actually does have a (somewhat confusingly named) transaction > concept > >> to help control this. The reliability actually depends on what type of > >> channel implementation you use. Gwen and Jeff from Cloudera integrated > >> Kafka and Flume, including a Kafka channel (see > >> > http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/ > ). > >> This does allow for better control over delivery semantics, and I think > if > >> you use something like Kafka for every channel in your pipeline, you can > >> get something like what Copycat can provide. I'd argue flume's approach > has > >> some other drawbacks though. In order to work correctly, every source > and > >> sink has to handle the transaction semantics, which adds complexity > >> (although they do offer great skeleton examples in their docs!). > >> > >> Copycat tries to avoid that complexity for connector developers by > changing > >> the framework to use streams, offsets, and commits, and pushing the > >> complexities of dealing with any sorts of errors/failures into the > >> framework. Ideally connector developers only need to a) check for > offsets > >> at startup and rewind to the last known committed offset and b) load > events > >> from the source system (with stream IDs and offsets) and pass them to > the > >> framework. > >> > >> > >>> > >>> >YARN: My point isn't that YARN is bad, it's that tying to any > particular > >>> >cluster manager severely limits the applicability of the tool. The > goal is > >>> >to make Copycat agnostic to the cluster manager so it can run under > Mesos, > >>> >YARN, etc. > >>> > >>> ok. Got it. Sounds like there is plan to do some work here to ensure > >>> out-of-the-box it works with more than one scheduler (as @Jay listed > out). > >>> In that case, IMO it would be better to actually rephrase it in the KIP > >>> that it will support more than one scheduler. > >>> > >>> > >> Tried to add some wording to clarify that. > >> > >> > >>> > >>> >Exactly once: You accomplish this in any system by managing offsets > in the > >>> >destination system atomically with the data or through some kind of > >>> >deduplication. Jiangjie actually just gave a great talk about this > issue > >>> >at > >>> >a recent Kafka meetup, perhaps he can share some slides about it. > When you > >>> >see all the details involved, you'll see why I think it might be nice > to > >>> >have the framework help you manage the complexities of achieving > different > >>> >delivery semantics ;) > >>> > >>> > >>> Deduplication as a post processing step is a common recommendation done > >>> today Å but that is a workaround/fix for the inability to provide > >>> exactly-once by the delivery systems. IMO such post processing should > not > >>> be considered part of the "exacty-once" guarantee of Copycat. > >>> > >>> > >>> Will be good to know how this guarantee will be possible when > delivering > >>> to HDFS. > >>> Would be great if someone can share those slides if it is discussed > there. > >>> > >>> > >> For HDFS, the gist of the solution is to write to temporary files and > then > >> rename atomically to their final destination, including offset > information > >> (e.g., it can just be in the filename). Readers only see files that have > >> been "committed". If there is a failure, any existing temp files get > >> cleaned up and reading is reset to the last committed offset. There are > >> some tricky details if you have zombie processes and depending on how > you > >> organize the data across files, but this isn't really the point of this > >> KIP. If you're interested in HDFS specifically, I'd suggest looking at > >> Camus's implementation. > >> > >> > >>> > >>> > >>> > >>> Was looking for clarification on this .. > >>> - Export side - is this like a map reduce kind of job or something > else ? > >>> If delivering to hdfs would this be running on the hadoop cluster or > >>> outside ? > >>> > >> - Import side - how does this look ? Is it a bunch of flume like > processes > >>> ? maybe just some kind of a broker that translates the incoming > protocol > >>> into outgoing Kafka producer api protocol ? If delivering to hdfs, will > >>> this run on the cluster or outside ? > >>> > >> > >> No mapreduce; in fact, no other frameworks required unless the connector > >> needs it for some reason. Both source and sink look structurally the > same. > >> Probably the most common scenario is to run a set of workers that > provide > >> the copycat service. You submit connector jobs to run on these workers. > A > >> coordinator handles distributing the work across worker nodes. > Coordinators > >> determine how to divide the tasks and generate configs for them, then > the > >> framework handles distributing that work. Each individual task handles > some > >> subset of the job. For source tasks, that subset is a set of input > streams > >> (in the JDBC example in the KIP, each table would have a corresponding > >> stream). For sink tasks, the subset is determined automatically by the > >> framework via the underlying consumer group as a subset of > topic-partitions > >> (since the input is from Kafka). Connectors are kept simple, just > >> processing streams of records (either generating them by reading from > the > >> source system or recording them into the sink system). Source tasks also > >> include information about offsets, and sink tasks either need to manage > >> offsets themselves or implement flush() functionality. Given these > >> primitives, the framework can then handle other complexities like > different > >> delivery semantics without any additional support from the connectors. > >> > >> The motivation for the additional modes of execution (agent, embedded) > was > >> to support a couple of other common use cases. Agent mode is completely > >> standalone, which provides for a much simpler implementation and handles > >> use cases where there isn't an easy way to avoid running the job across > >> many machines (e.g., if you have to load logs directly from log files). > >> Embedded mode is actually a simple variant of the distributed mode, but > >> lets you setup and run the entire cluster alongside the rest of your > >> distributed app. This is useful if you want to get up and running with > an > >> application where you need to, for example, import data from another > >> service into Kafka, then consume and process that data. You can setup > the > >> worker and submit a job directly from your code, reducing the > operational > >> complexity. It's probably not the right long term solution as your usage > >> expands, but it can significantly ease adoption. > >> > >> > >>> > >>> > >>> I still think adding one or two specific end-to-end use-cases in the > KIP, > >>> showing how copycat will pan out for them for import/export will really > >>> clarify things. > >>> > >> > >> There were a couple of examples already in the KIP -- JDBC, HDFS, log > >> import, and now I've also added mirror maker. Were you looking for > >> something more specific? I could also explain a full source -> kafka -> > >> sink pipeline, but I don't know that there's much to add there beyond > the > >> fact that we would like schemas to carry across the entire pipeline. > >> Otherwise it's just chaining connectors. Besides, I think most of the > >> interesting use cases actually have additional processing steps in > between, > >> i.e. using stream processing frameworks or custom consumers + producers. > >> > >> -- > >> Thanks, > >> Ewen >