Seems like discussion has mostly quieted down on this. Any more questions, comments, or discussion? If nobody brings up any other issues, I'll start a vote thread in a day or two.
-Ewen On Thu, Jun 25, 2015 at 3:36 PM, Jay Kreps <j...@confluent.io> wrote: > We were talking on the call about a logo...so here I present "The Original > Copycat": > http://shirtoid.com/67790/the-original-copycat/ > > -Jay > > On Tue, Jun 23, 2015 at 6:28 PM, Gwen Shapira <gshap...@cloudera.com> > wrote: > > > One more reason to have CopyCat as a separate project is to sidestep > > the entire "Why CopyCat and not X" discussion :) > > > > On Tue, Jun 23, 2015 at 6:26 PM, Gwen Shapira <gshap...@cloudera.com> > > wrote: > > > Re: Flume vs. CopyCat > > > > > > I would love to have an automagically-parallelizing, schema-aware > > > version of Flume with great reliability guarantees. Flume has good > > > core architecture and I'm sure that if the Flume community is > > > interested, it can be extended in that direction. > > > > > > However, the Apache way is not to stop new innovation just because > > > some systems already exists. We develop the best systems we can, and > > > users choose the ones they prefer - thats how ecosystems thrive. > > > If we can have Flume and NiFi, Sentry and Argus, Flink and Storm, > > > Parquet and ORC, I'm sure we can also have CopyCat in the zoo :) > > > > > > Gwen > > > > > > > > > > > > On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava > > > <e...@confluent.io> wrote: > > >> On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik <ros...@hortonworks.com> > > wrote: > > >> > > >>> Thanks Jay and Ewen for the response. > > >>> > > >>> > > >>> >@Jay > > >>> > > > >>> > 3. This has a built in notion of parallelism throughout. > > >>> > > >>> > > >>> > > >>> It was not obvious how it will look like or differ from existing > > systemsÅ > > >>> since all of existing ones do parallelize data movement. > > >>> > > >> > > >> I'm guessing some confusion here might also be because we want both > > >> parallelization and distribution. > > >> > > >> Roughly speaking, I think of Copycat making the consumer group > > abstraction > > >> available for any import task, and the idea is to make this automatic > > and > > >> transparent to the user. This isn't interesting for systems that > > literally > > >> only have a single input stream, but Copycat source connectors have a > > >> built-in notion of parallel input streams. The connector's job is to > > inform > > >> the the Copycat framework of what input streams there are and Copycat > > >> handles running tasks, balancing the streams across them, handles > > failures > > >> by rebalancing as necessary, provides offset commit and storage so > tasks > > >> can resume from the last known-good state, etc. > > >> > > >> On the sink side, the input is the Kafka consumer group, which > obviously > > >> already has this parallelism built in. Depending on the output, this > may > > >> manifest in different ways. For HDFS, the effect is just that your > > output > > >> files are partitioned (one per topic-partition). > > >> > > >> As for other systems, can you be more specific? Some of them obviously > > do > > >> (e.g. Camus), but others require you to handle this manually. I don't > > want > > >> to pick on Flume specifically, but as an example, it requires either > > >> configuring multiple (or multiplexed) flows in a single agent or > manage > > >> multiple agents independently. This isn't really the same as what I've > > >> described above where you hand Copycat one config and it automatically > > >> spreads the work across multiple, fault-tolerant tasks. But flume is > > also > > >> targeting a much different general problem, trying to build > potentially > > >> large, multi-stage data flows with all sorts of transformations, > > filtering, > > >> etc. > > >> > > >> > > >>> > > >>> > > >>> @Ewen, > > >>> > > >>> >Import: Flume is just one of many similar systems designed around > log > > >>> >collection. See notes below, but one major point is that they > > generally > > >>> >don't provide any sort of guaranteed delivery semantics. > > >>> > > >>> > > >>> I think most of them do provide guarantees of some sort (Ex. Flume & > > >>> FluentD). > > >>> > > >> > > >> This part of the discussion gets a little bit tricky, not least > because > > it > > >> seems people can't agree on exactly what these terms mean. > > >> > > >> First, some systems that you didn't mention. Logstash definitely > doesn't > > >> have any guarantees as it uses a simple 20-event in-memory buffer > > between > > >> stages. As far as I can tell, Heka doesn't provide these semantics > > either, > > >> although I have not investigated it as deeply. > > >> > > >> fluentd has an article discussing the options for it ( > > >> http://docs.fluentd.org/articles/high-availability), but I actually > > think > > >> the article on writing plugins is more informative > > >> http://docs.fluentd.org/articles/plugin-development The most > important > > >> point is that input plugins have no way to track or discovery > downstream > > >> delivery (i.e. they cannot get acks, nor is there any sort of offset > > >> tracked that it can lookup to discover where to restart upon failure, > > nor > > >> is it guaranteed that after router.emit() returns that the data will > > have > > >> already been delivered downstream). So if I have a replicated input > data > > >> store, e.g. a replicated database, and I am just reading off it's > > >> changelog, does fluentd actually guarantee something like at least > once > > >> delivery to the sink? In fact, fluentd's own documentation (the high > > >> availability doc) describes data loss scenarios that aren't inherent > to > > >> every system (e.g., if their log aggregator dies, which not every > > system is > > >> susceptible to, vs. if an event is generated on a single host and that > > host > > >> dies before reporting it anywhere, then of course the data is > > permanently > > >> lost). > > >> > > >> Flume actually does have a (somewhat confusingly named) transaction > > concept > > >> to help control this. The reliability actually depends on what type of > > >> channel implementation you use. Gwen and Jeff from Cloudera integrated > > >> Kafka and Flume, including a Kafka channel (see > > >> > > > http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/ > > ). > > >> This does allow for better control over delivery semantics, and I > think > > if > > >> you use something like Kafka for every channel in your pipeline, you > can > > >> get something like what Copycat can provide. I'd argue flume's > approach > > has > > >> some other drawbacks though. In order to work correctly, every source > > and > > >> sink has to handle the transaction semantics, which adds complexity > > >> (although they do offer great skeleton examples in their docs!). > > >> > > >> Copycat tries to avoid that complexity for connector developers by > > changing > > >> the framework to use streams, offsets, and commits, and pushing the > > >> complexities of dealing with any sorts of errors/failures into the > > >> framework. Ideally connector developers only need to a) check for > > offsets > > >> at startup and rewind to the last known committed offset and b) load > > events > > >> from the source system (with stream IDs and offsets) and pass them to > > the > > >> framework. > > >> > > >> > > >>> > > >>> >YARN: My point isn't that YARN is bad, it's that tying to any > > particular > > >>> >cluster manager severely limits the applicability of the tool. The > > goal is > > >>> >to make Copycat agnostic to the cluster manager so it can run under > > Mesos, > > >>> >YARN, etc. > > >>> > > >>> ok. Got it. Sounds like there is plan to do some work here to ensure > > >>> out-of-the-box it works with more than one scheduler (as @Jay listed > > out). > > >>> In that case, IMO it would be better to actually rephrase it in the > KIP > > >>> that it will support more than one scheduler. > > >>> > > >>> > > >> Tried to add some wording to clarify that. > > >> > > >> > > >>> > > >>> >Exactly once: You accomplish this in any system by managing offsets > > in the > > >>> >destination system atomically with the data or through some kind of > > >>> >deduplication. Jiangjie actually just gave a great talk about this > > issue > > >>> >at > > >>> >a recent Kafka meetup, perhaps he can share some slides about it. > > When you > > >>> >see all the details involved, you'll see why I think it might be > nice > > to > > >>> >have the framework help you manage the complexities of achieving > > different > > >>> >delivery semantics ;) > > >>> > > >>> > > >>> Deduplication as a post processing step is a common recommendation > done > > >>> today Å but that is a workaround/fix for the inability to provide > > >>> exactly-once by the delivery systems. IMO such post processing should > > not > > >>> be considered part of the "exacty-once" guarantee of Copycat. > > >>> > > >>> > > >>> Will be good to know how this guarantee will be possible when > > delivering > > >>> to HDFS. > > >>> Would be great if someone can share those slides if it is discussed > > there. > > >>> > > >>> > > >> For HDFS, the gist of the solution is to write to temporary files and > > then > > >> rename atomically to their final destination, including offset > > information > > >> (e.g., it can just be in the filename). Readers only see files that > have > > >> been "committed". If there is a failure, any existing temp files get > > >> cleaned up and reading is reset to the last committed offset. There > are > > >> some tricky details if you have zombie processes and depending on how > > you > > >> organize the data across files, but this isn't really the point of > this > > >> KIP. If you're interested in HDFS specifically, I'd suggest looking at > > >> Camus's implementation. > > >> > > >> > > >>> > > >>> > > >>> > > >>> Was looking for clarification on this .. > > >>> - Export side - is this like a map reduce kind of job or something > > else ? > > >>> If delivering to hdfs would this be running on the hadoop cluster or > > >>> outside ? > > >>> > > >> - Import side - how does this look ? Is it a bunch of flume like > > processes > > >>> ? maybe just some kind of a broker that translates the incoming > > protocol > > >>> into outgoing Kafka producer api protocol ? If delivering to hdfs, > will > > >>> this run on the cluster or outside ? > > >>> > > >> > > >> No mapreduce; in fact, no other frameworks required unless the > connector > > >> needs it for some reason. Both source and sink look structurally the > > same. > > >> Probably the most common scenario is to run a set of workers that > > provide > > >> the copycat service. You submit connector jobs to run on these > workers. > > A > > >> coordinator handles distributing the work across worker nodes. > > Coordinators > > >> determine how to divide the tasks and generate configs for them, then > > the > > >> framework handles distributing that work. Each individual task handles > > some > > >> subset of the job. For source tasks, that subset is a set of input > > streams > > >> (in the JDBC example in the KIP, each table would have a corresponding > > >> stream). For sink tasks, the subset is determined automatically by the > > >> framework via the underlying consumer group as a subset of > > topic-partitions > > >> (since the input is from Kafka). Connectors are kept simple, just > > >> processing streams of records (either generating them by reading from > > the > > >> source system or recording them into the sink system). Source tasks > also > > >> include information about offsets, and sink tasks either need to > manage > > >> offsets themselves or implement flush() functionality. Given these > > >> primitives, the framework can then handle other complexities like > > different > > >> delivery semantics without any additional support from the connectors. > > >> > > >> The motivation for the additional modes of execution (agent, embedded) > > was > > >> to support a couple of other common use cases. Agent mode is > completely > > >> standalone, which provides for a much simpler implementation and > handles > > >> use cases where there isn't an easy way to avoid running the job > across > > >> many machines (e.g., if you have to load logs directly from log > files). > > >> Embedded mode is actually a simple variant of the distributed mode, > but > > >> lets you setup and run the entire cluster alongside the rest of your > > >> distributed app. This is useful if you want to get up and running with > > an > > >> application where you need to, for example, import data from another > > >> service into Kafka, then consume and process that data. You can setup > > the > > >> worker and submit a job directly from your code, reducing the > > operational > > >> complexity. It's probably not the right long term solution as your > usage > > >> expands, but it can significantly ease adoption. > > >> > > >> > > >>> > > >>> > > >>> I still think adding one or two specific end-to-end use-cases in the > > KIP, > > >>> showing how copycat will pan out for them for import/export will > really > > >>> clarify things. > > >>> > > >> > > >> There were a couple of examples already in the KIP -- JDBC, HDFS, log > > >> import, and now I've also added mirror maker. Were you looking for > > >> something more specific? I could also explain a full source -> kafka > -> > > >> sink pipeline, but I don't know that there's much to add there beyond > > the > > >> fact that we would like schemas to carry across the entire pipeline. > > >> Otherwise it's just chaining connectors. Besides, I think most of the > > >> interesting use cases actually have additional processing steps in > > between, > > >> i.e. using stream processing frameworks or custom consumers + > producers. > > >> > > >> -- > > >> Thanks, > > >> Ewen > > > -- Thanks, Ewen