One more reason to have CopyCat as a separate project is to sidestep the entire "Why CopyCat and not X" discussion :)
On Tue, Jun 23, 2015 at 6:26 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > Re: Flume vs. CopyCat > > I would love to have an automagically-parallelizing, schema-aware > version of Flume with great reliability guarantees. Flume has good > core architecture and I'm sure that if the Flume community is > interested, it can be extended in that direction. > > However, the Apache way is not to stop new innovation just because > some systems already exists. We develop the best systems we can, and > users choose the ones they prefer - thats how ecosystems thrive. > If we can have Flume and NiFi, Sentry and Argus, Flink and Storm, > Parquet and ORC, I'm sure we can also have CopyCat in the zoo :) > > Gwen > > > > On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava > <e...@confluent.io> wrote: >> On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik <ros...@hortonworks.com> wrote: >> >>> Thanks Jay and Ewen for the response. >>> >>> >>> >@Jay >>> > >>> > 3. This has a built in notion of parallelism throughout. >>> >>> >>> >>> It was not obvious how it will look like or differ from existing systemsÅ >>> since all of existing ones do parallelize data movement. >>> >> >> I'm guessing some confusion here might also be because we want both >> parallelization and distribution. >> >> Roughly speaking, I think of Copycat making the consumer group abstraction >> available for any import task, and the idea is to make this automatic and >> transparent to the user. This isn't interesting for systems that literally >> only have a single input stream, but Copycat source connectors have a >> built-in notion of parallel input streams. The connector's job is to inform >> the the Copycat framework of what input streams there are and Copycat >> handles running tasks, balancing the streams across them, handles failures >> by rebalancing as necessary, provides offset commit and storage so tasks >> can resume from the last known-good state, etc. >> >> On the sink side, the input is the Kafka consumer group, which obviously >> already has this parallelism built in. Depending on the output, this may >> manifest in different ways. For HDFS, the effect is just that your output >> files are partitioned (one per topic-partition). >> >> As for other systems, can you be more specific? Some of them obviously do >> (e.g. Camus), but others require you to handle this manually. I don't want >> to pick on Flume specifically, but as an example, it requires either >> configuring multiple (or multiplexed) flows in a single agent or manage >> multiple agents independently. This isn't really the same as what I've >> described above where you hand Copycat one config and it automatically >> spreads the work across multiple, fault-tolerant tasks. But flume is also >> targeting a much different general problem, trying to build potentially >> large, multi-stage data flows with all sorts of transformations, filtering, >> etc. >> >> >>> >>> >>> @Ewen, >>> >>> >Import: Flume is just one of many similar systems designed around log >>> >collection. See notes below, but one major point is that they generally >>> >don't provide any sort of guaranteed delivery semantics. >>> >>> >>> I think most of them do provide guarantees of some sort (Ex. Flume & >>> FluentD). >>> >> >> This part of the discussion gets a little bit tricky, not least because it >> seems people can't agree on exactly what these terms mean. >> >> First, some systems that you didn't mention. Logstash definitely doesn't >> have any guarantees as it uses a simple 20-event in-memory buffer between >> stages. As far as I can tell, Heka doesn't provide these semantics either, >> although I have not investigated it as deeply. >> >> fluentd has an article discussing the options for it ( >> http://docs.fluentd.org/articles/high-availability), but I actually think >> the article on writing plugins is more informative >> http://docs.fluentd.org/articles/plugin-development The most important >> point is that input plugins have no way to track or discovery downstream >> delivery (i.e. they cannot get acks, nor is there any sort of offset >> tracked that it can lookup to discover where to restart upon failure, nor >> is it guaranteed that after router.emit() returns that the data will have >> already been delivered downstream). So if I have a replicated input data >> store, e.g. a replicated database, and I am just reading off it's >> changelog, does fluentd actually guarantee something like at least once >> delivery to the sink? In fact, fluentd's own documentation (the high >> availability doc) describes data loss scenarios that aren't inherent to >> every system (e.g., if their log aggregator dies, which not every system is >> susceptible to, vs. if an event is generated on a single host and that host >> dies before reporting it anywhere, then of course the data is permanently >> lost). >> >> Flume actually does have a (somewhat confusingly named) transaction concept >> to help control this. The reliability actually depends on what type of >> channel implementation you use. Gwen and Jeff from Cloudera integrated >> Kafka and Flume, including a Kafka channel (see >> http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/). >> This does allow for better control over delivery semantics, and I think if >> you use something like Kafka for every channel in your pipeline, you can >> get something like what Copycat can provide. I'd argue flume's approach has >> some other drawbacks though. In order to work correctly, every source and >> sink has to handle the transaction semantics, which adds complexity >> (although they do offer great skeleton examples in their docs!). >> >> Copycat tries to avoid that complexity for connector developers by changing >> the framework to use streams, offsets, and commits, and pushing the >> complexities of dealing with any sorts of errors/failures into the >> framework. Ideally connector developers only need to a) check for offsets >> at startup and rewind to the last known committed offset and b) load events >> from the source system (with stream IDs and offsets) and pass them to the >> framework. >> >> >>> >>> >YARN: My point isn't that YARN is bad, it's that tying to any particular >>> >cluster manager severely limits the applicability of the tool. The goal is >>> >to make Copycat agnostic to the cluster manager so it can run under Mesos, >>> >YARN, etc. >>> >>> ok. Got it. Sounds like there is plan to do some work here to ensure >>> out-of-the-box it works with more than one scheduler (as @Jay listed out). >>> In that case, IMO it would be better to actually rephrase it in the KIP >>> that it will support more than one scheduler. >>> >>> >> Tried to add some wording to clarify that. >> >> >>> >>> >Exactly once: You accomplish this in any system by managing offsets in the >>> >destination system atomically with the data or through some kind of >>> >deduplication. Jiangjie actually just gave a great talk about this issue >>> >at >>> >a recent Kafka meetup, perhaps he can share some slides about it. When you >>> >see all the details involved, you'll see why I think it might be nice to >>> >have the framework help you manage the complexities of achieving different >>> >delivery semantics ;) >>> >>> >>> Deduplication as a post processing step is a common recommendation done >>> today Å but that is a workaround/fix for the inability to provide >>> exactly-once by the delivery systems. IMO such post processing should not >>> be considered part of the "exacty-once" guarantee of Copycat. >>> >>> >>> Will be good to know how this guarantee will be possible when delivering >>> to HDFS. >>> Would be great if someone can share those slides if it is discussed there. >>> >>> >> For HDFS, the gist of the solution is to write to temporary files and then >> rename atomically to their final destination, including offset information >> (e.g., it can just be in the filename). Readers only see files that have >> been "committed". If there is a failure, any existing temp files get >> cleaned up and reading is reset to the last committed offset. There are >> some tricky details if you have zombie processes and depending on how you >> organize the data across files, but this isn't really the point of this >> KIP. If you're interested in HDFS specifically, I'd suggest looking at >> Camus's implementation. >> >> >>> >>> >>> >>> Was looking for clarification on this .. >>> - Export side - is this like a map reduce kind of job or something else ? >>> If delivering to hdfs would this be running on the hadoop cluster or >>> outside ? >>> >> - Import side - how does this look ? Is it a bunch of flume like processes >>> ? maybe just some kind of a broker that translates the incoming protocol >>> into outgoing Kafka producer api protocol ? If delivering to hdfs, will >>> this run on the cluster or outside ? >>> >> >> No mapreduce; in fact, no other frameworks required unless the connector >> needs it for some reason. Both source and sink look structurally the same. >> Probably the most common scenario is to run a set of workers that provide >> the copycat service. You submit connector jobs to run on these workers. A >> coordinator handles distributing the work across worker nodes. Coordinators >> determine how to divide the tasks and generate configs for them, then the >> framework handles distributing that work. Each individual task handles some >> subset of the job. For source tasks, that subset is a set of input streams >> (in the JDBC example in the KIP, each table would have a corresponding >> stream). For sink tasks, the subset is determined automatically by the >> framework via the underlying consumer group as a subset of topic-partitions >> (since the input is from Kafka). Connectors are kept simple, just >> processing streams of records (either generating them by reading from the >> source system or recording them into the sink system). Source tasks also >> include information about offsets, and sink tasks either need to manage >> offsets themselves or implement flush() functionality. Given these >> primitives, the framework can then handle other complexities like different >> delivery semantics without any additional support from the connectors. >> >> The motivation for the additional modes of execution (agent, embedded) was >> to support a couple of other common use cases. Agent mode is completely >> standalone, which provides for a much simpler implementation and handles >> use cases where there isn't an easy way to avoid running the job across >> many machines (e.g., if you have to load logs directly from log files). >> Embedded mode is actually a simple variant of the distributed mode, but >> lets you setup and run the entire cluster alongside the rest of your >> distributed app. This is useful if you want to get up and running with an >> application where you need to, for example, import data from another >> service into Kafka, then consume and process that data. You can setup the >> worker and submit a job directly from your code, reducing the operational >> complexity. It's probably not the right long term solution as your usage >> expands, but it can significantly ease adoption. >> >> >>> >>> >>> I still think adding one or two specific end-to-end use-cases in the KIP, >>> showing how copycat will pan out for them for import/export will really >>> clarify things. >>> >> >> There were a couple of examples already in the KIP -- JDBC, HDFS, log >> import, and now I've also added mirror maker. Were you looking for >> something more specific? I could also explain a full source -> kafka -> >> sink pipeline, but I don't know that there's much to add there beyond the >> fact that we would like schemas to carry across the entire pipeline. >> Otherwise it's just chaining connectors. Besides, I think most of the >> interesting use cases actually have additional processing steps in between, >> i.e. using stream processing frameworks or custom consumers + producers. >> >> -- >> Thanks, >> Ewen