On Mon, Jul 6, 2015 at 6:24 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> On Mon, Jul 6, 2015 at 4:33 PM, Ewen Cheslack-Postava <e...@confluent.io> > wrote: > > > On Mon, Jul 6, 2015 at 11:40 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hi Ewen, > > > > > > I read through the KIP page and here are some comments on the design > > > section: > > > > > > 1. "... and Copycat does not require that all partitions be > enumerated". > > > Not very clear about this, do you mean Copycat allows non-enumerable > > stream > > > partitions? > > > > > > > Maybe I should change "enumerated" to just plain "listed". The point is > > that the framework shouldn't ever need to ask connectors for a complete > > list of their current partitions. Requiring the connector to explicitly > > list all partitions can be simplifying for the framework and connectors > > (e.g. we could push the work of dividing partitions over tasks into the > > framework, as we do with topic-partitions in sinks), but there are some > > cases where that behavior isn't ideal (e.g. JMX metrics, where an app > > restart could change the set of metrics, and can cause particularly bad > > behavior during a rolling restart of a service since Copycat would end up > > continuously readjusting assignments). > > > > > > > Makes sense. > > > > > > > > 2. "... translates the data to Copycat's format, decides the > destination > > > topic (and possibly partition) in Kafka." Just to confirm it seems > > > indicating two destination scenarios Copycat connectors should be able > to > > > support: > > > > > > a. Specific destination topics per task (e.g. as illustrated in the > > digram, > > > task 1 to topics A and B, task 2 to topics B and C). > > > b. Specific destination topic-partitions per task (as said in "possibly > > > partition", like task 1 to topicA-partition1 and topicB-partition1, > task > > 2 > > > to topicA-partition2 and topicB-partition2). > > > > > > I understand connector developers needs to implement the dynamic > mapping > > > coordination from the source streams to tasks, but does the mapping > from > > > tasks to destination topic-partitions (for sinking Copycat I assume it > > > would be stream-partitions) also need to be implemented dynamically > since > > > the destination stream could also change? > > > > > > > Not sure I understand what you're getting at here. Connectors can do > > arbitrary shuffling to the output (which may not matter for many > > connectors, e.g. HDFS, where there's only one output). Some may not need > > that (e.g. reading a database commit log, you probably want to maintain > > ordering within a single topic). > > > > But as of now, there's no need to track the tasks -> destination > > topic-partitions at all. There's one or two things I can think of where > you > > could possibly optimize them a bit in a a couple of cases if you knew > this > > mapping (e.g. the flush + offset commit process), but I don't think that > > info is that useful to copycat. > > > > > > > From your diagrams different tasks can push to different output streams > (for source Copycat they are just output topics) or stream-partitions, so I > was asking how this is done in practice. But from your reply it seems at > least the first version of Copycat would not support that, i.e. all tasks > will be pushing to the same stream(s), and if the streams are partitioned > all tasks will be pushing to all partitions? > Ah, I think I see. Unlike the inputs where partitions are balanced over workers, there's no equivalent concept for outputs. In practice, source connectors write to different topics/partitions just as they would with a normal Kafka producer (the actual class is different since it carries a bit of extra info about the source partition & offset, but the relevant info just gets converted directly to a ProducerRecord). So in general, yes, all tasks can push to all partitions. In practice, tasks may end up only writing to a subset because of their assignment of partitions. The JDBC sample is a good example of this -- one topic per table might be reasonable, in which case each task "owns" and is the only writer to the set of topics corresponding to its tables. In the case of sink connectors/tasks, the mapping to a partitioned stream isn't even all that strict since writing outputs is entirely under the control of the tasks. For something like HDFS, there are output "streams" (which are a series of files), in which case there is strict ownership since there can only be one writer at a time. Something like Elasticsearch or any of the NoSQL data stores might lie at the other end of the spectrum where there is, in a sense, only one big partition that all tasks write to (i.e. the entire data store). > > > > > > > > > 3. "Delivery Guarantees": depending on how we define the guarantees, it > > may > > > not only depends on the output system but also the input system. For > > > example, duplicates may be generated from the input systems as well. Do > > we > > > also need to consider these scenarios? > > > > > > > Yes, that's correct. For source connectors, if the source system > introduces > > duplicates then we are not doing deduplication and if it drops data > there's > > nothing we can do. Same deal with the output system for sink connectors. > I > > guess on the sink side the expected semantics are more clear since > > SinkTask.flush() makes the expectations pretty clear, but on the source > > side the expectation of no duplicates/dropped data is implicit. > > > > > OK. > > > > > > > 4. "Integration with Process Management": for "Resource constrained > > > connectors", I am not sure how it is different in deployment from > > > "Copycat-as-a-service"? I feel there are generally three different > types: > > > > > > 1) run-as-a-service: on a shared cluster equipped with some resource > > > manager, a Copycat framework is ever-running and users submit their > > > connector jobs via REST. > > > 2) standalone: on a single machine, start a Copycat instance with the > > > configured master + #.workers processes via some cmdline tool. > > > 3) embedded library: the Copycat code will be running on whatever the > > > embedding application is running on. > > > > > > > The reason it's different from Copycat-as-a-service is because you can > > apply resource constraints *on a single, specific copycat connector*. In > > "as-a-service" mode, all the connectors and tasks are mixed up across the > > workers, so if you want to set a CPU or memory constraint on one > > connector's tasks, you can't do that. In order to do that with a resource > > manager that works at the process level and support varying constraints > > (e.g. connector A gets 1 CPU, connector B gets 10 CPU), you need to make > > sure the processes you are applying limits to only contain one > connector's > > tasks. > > > > Because "resource constrained connectors" only runs one connector and > it's > > tasks, it is functionally the same as using embedded mode, not adding any > > code besides Copycat to the program, and running that under the cluster > > manager. > > > > > > > OK. So it sounds the difference is that in the first case the cluster is > shared and users can only specify the #.tasks but cannot control how many > containers will be allocated to these tasks, and in the latter case the > cluster is "private" and users can specify the #.containers (i.e. number of > processors) while submitting their connector jobs, is that right? > Yes, and just to be clear, the idea is that in the second "private" mode, specifying the # of containers is *outside Copycat's scope*. If you're running in that mode, you almost certainly need to do some config around whatever resource management/deployment scheme you use, and setting the # of container/worker processes in that configuration (rather than in Copycat config) shouldn't be a big deal. The goal is to cleanly separate those two so we avoid all the deployment-specific stuff in copycat. I think there probably are some ways we could nicely integrate with these frameworks by providing ApplicationMaster/Framework implementations that are more intelligent than just running under marathon/slider. I think that's worth doing, but probably just needs to expose the right info from Copycat rather than deep integration inside Copycat. > > > > > > > > 5. Some terminology suggestions, how about the following descriptions > (no > > > technical difference except the CLI APIs, just some naming changes) of > > > Copycat: > > > > > > a. Copycat developers needs to implement the "*connector*" module, > which > > > include the "*master*" and "*worker*" logic: > > > > > > 1) "master" is responsible for coordinating the assignment from the > > > resource stream partitions to the workers (and possibly also the > > assignment > > > from the workers to the destination stream partitions?) *dynamically*, > > and > > > 2) "worker" is responsible for polling from the assigned resource > > stream > > > partitions and pushing to the assigned destination stream partitions. > > > > > > > Hmm, I removed most discussion of distributed mode, but this terminology > > seems like it will be confusing when we get back to that discussion. > > "worker" is already being used for the container process, so there's > > already confusion there. > > > > For "connector module", that's fine with me. I've been using "connector > > plugin" as I've tried to refine terminology a bit, but either one works > -- > > most important part was to avoid confusion with the "connector" > component. > > > > > > > The main difference is that in the KIP wiki the physical computation unit > of a single process is referred to as the "worker", while I name it as > "container" since it sounds more "physical" and worker sounds more logical. > But either is fine I guess. > Maybe we should just go with "process" since that's what we're actually referring to :) I think I just muddled the naming of that because I happened to have a prototype with a class called Worker and that was the class that you ultimately called .run() on to start things off. Documentation-wise, "process" is probably the clearest, helps explain the interaction with resource managers, and where necessary can be qualified by "standalone-mode", "clustered mode", or "embedded-mode" to specify the way copycat is running within that process. > > > > > > > > b. Copycat framework includes: > > > > > > 1) The interface for the connector workers polling-from-resource and > > > pushing-to-destination function calls, > > > > > > > Yes, I'd call this the "connector API". > > > > > > > 2) The interface for resource management integration: it leverages > the > > > underlying resource managers like YARN / Mesos to get a list of > > allocated " > > > *containers*". > > > > > > > Ok, guess we ended up on different pages on this again. One of the goals > > was to get rid of all this dependence on custom code for every resource > > manager framework like Samza has. > > > > > > > 3) A "*connector manager*" responsible for coordinating the > assignment > > > from the connector master / worker processes to the allocated > containers > > > *dynamically*. > > > > > > > Yes, this makes sense. This is what I thought might get confusing with > > "master connector logic" above. But since both of these components are > > doing conceptually similar things (breaking up work and assigning it), > the > > naming may just always be a bit confusing. > > > > > > > > > > c. Copycat users need to specify the *connector configurations* through > > > config files or ZK / other storage systems, including #.tasks, starting > > > offsets, etc, and start the *connector job* with its configurations > (each > > > job as its own configs) via the above mentioned three different modes: > > > > > > 1) submit the job via REST to a Copycat service running on a shared > > > cluster with resource manager, or > > > 2) start the job in standalone mode in a single machine, with all the > > > master / workers running on that single machine. > > > > > > > These two sound fine. > > > > > > > 3) start a copycat instance first in embedded mode and then add > > > connectors, all the added connectors (i.e. their master / workers) run > on > > > the single machine where the embedding app code is running. > > > > > > > I'm not sure if I'm just misreading this, but the goal with embedded mode > > is to still support running connectors (i.e. their master/workers in your > > terminology) in a distributed fashion. On sinks this is trivial since the > > consumer gives you this for free, but the point was to make sure the same > > thing works for sources as well (i.e. the framework helps with *source* > > partition balancing). If this is what you were thinking as well, could > you > > clarify what you meant by "run on the single machine where the embedding > > app code is running"? > > > Hmm, I had the impression that embedded mode runs on single host because I > saw the following lines in the sample code: > > final Copycat copycat = new Copycat(“app-id”); > copycat.start(); > String[] inputTopics = copycat.addConnector(importConfig); > > which starts a copycat instance and then add a connector job to that > copycat instance, so if this Java code runs on a single machine then all of > these will just happen on a single machine. Maybe that could be clarified a > bit on the wiki? > I'll see what I can do to clarify. To be honest, it's not entirely clear how to handle this code executing on multiple servers since it's just my pseudo code :) Specifically, I the key config that can conflict for Kafka consumers is the topic subscription, but there's a simple rule for resolving this. For connector configs, we'll need to think through how different configs are handled since there are valid use cases for conflicts (e.g. config updates). > > > > > > > > d. As for the CLI APIs, we will only need one for the standalone mode > > since > > > the run-as-a-service mode will always have some resource manager to > > > allocate the containers. > > > > > > > Ok, this seems to confirm we're still not on the same page for resource > > managers... Why can't run-as-a-service mode run without a resource > manager? > > By "container" in this case do you mean that the resource manager will > run > > the worker processes, which in turn are assigned connectors/tasks to > > execute as threads in that process? > > > > I tried to clarify the KIP to make it clear that the only *processes* I > > expected are copycat workers (or standalone mode processes). Resource > > managers can be used to start these processes (and help maintain the > > cluster by restarting them if they crash), but there's no other deep > > integration or custom code required given the current description in the > > KIP. > > > > > OK got it, so just to confirm: > > The copycat-worker command is used to startup some containers in the > cluster, but only one "copycat connector manager" will be elected and > running within the same cluster, and then copycat connector jobs can be > submitted via any container's REST proxy and get assigned by the connector > manager to the containers in the cluster, right? > Yes, although that assumes some design of the distributed version of copycat which I backed off from in the KIP so it can undergo further discussion and not derail the major first step we were trying to address w/ this KIP which was agreeing on the inclusion of Copycat and it's scope at a very high level. -Ewen > > > > -Ewen > > > > > > > > > > > > Guozhang > > > > > > > > > On Mon, Jun 29, 2015 at 9:50 AM, Ewen Cheslack-Postava < > > e...@confluent.io> > > > wrote: > > > > > > > Seems like discussion has mostly quieted down on this. Any more > > > questions, > > > > comments, or discussion? If nobody brings up any other issues, I'll > > > start a > > > > vote thread in a day or two. > > > > > > > > -Ewen > > > > > > > > On Thu, Jun 25, 2015 at 3:36 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > > > We were talking on the call about a logo...so here I present "The > > > > Original > > > > > Copycat": > > > > > http://shirtoid.com/67790/the-original-copycat/ > > > > > > > > > > -Jay > > > > > > > > > > On Tue, Jun 23, 2015 at 6:28 PM, Gwen Shapira < > gshap...@cloudera.com > > > > > > > > wrote: > > > > > > > > > > > One more reason to have CopyCat as a separate project is to > > sidestep > > > > > > the entire "Why CopyCat and not X" discussion :) > > > > > > > > > > > > On Tue, Jun 23, 2015 at 6:26 PM, Gwen Shapira < > > gshap...@cloudera.com > > > > > > > > > > wrote: > > > > > > > Re: Flume vs. CopyCat > > > > > > > > > > > > > > I would love to have an automagically-parallelizing, > schema-aware > > > > > > > version of Flume with great reliability guarantees. Flume has > > good > > > > > > > core architecture and I'm sure that if the Flume community is > > > > > > > interested, it can be extended in that direction. > > > > > > > > > > > > > > However, the Apache way is not to stop new innovation just > > because > > > > > > > some systems already exists. We develop the best systems we > can, > > > and > > > > > > > users choose the ones they prefer - thats how ecosystems > thrive. > > > > > > > If we can have Flume and NiFi, Sentry and Argus, Flink and > Storm, > > > > > > > Parquet and ORC, I'm sure we can also have CopyCat in the zoo > :) > > > > > > > > > > > > > > Gwen > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava > > > > > > > <e...@confluent.io> wrote: > > > > > > >> On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik < > > > > ros...@hortonworks.com> > > > > > > wrote: > > > > > > >> > > > > > > >>> Thanks Jay and Ewen for the response. > > > > > > >>> > > > > > > >>> > > > > > > >>> >@Jay > > > > > > >>> > > > > > > > >>> > 3. This has a built in notion of parallelism throughout. > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> It was not obvious how it will look like or differ from > > existing > > > > > > systemsŠ > > > > > > >>> since all of existing ones do parallelize data movement. > > > > > > >>> > > > > > > >> > > > > > > >> I'm guessing some confusion here might also be because we want > > > both > > > > > > >> parallelization and distribution. > > > > > > >> > > > > > > >> Roughly speaking, I think of Copycat making the consumer group > > > > > > abstraction > > > > > > >> available for any import task, and the idea is to make this > > > > automatic > > > > > > and > > > > > > >> transparent to the user. This isn't interesting for systems > that > > > > > > literally > > > > > > >> only have a single input stream, but Copycat source connectors > > > have > > > > a > > > > > > >> built-in notion of parallel input streams. The connector's job > > is > > > to > > > > > > inform > > > > > > >> the the Copycat framework of what input streams there are and > > > > Copycat > > > > > > >> handles running tasks, balancing the streams across them, > > handles > > > > > > failures > > > > > > >> by rebalancing as necessary, provides offset commit and > storage > > so > > > > > tasks > > > > > > >> can resume from the last known-good state, etc. > > > > > > >> > > > > > > >> On the sink side, the input is the Kafka consumer group, which > > > > > obviously > > > > > > >> already has this parallelism built in. Depending on the > output, > > > this > > > > > may > > > > > > >> manifest in different ways. For HDFS, the effect is just that > > your > > > > > > output > > > > > > >> files are partitioned (one per topic-partition). > > > > > > >> > > > > > > >> As for other systems, can you be more specific? Some of them > > > > obviously > > > > > > do > > > > > > >> (e.g. Camus), but others require you to handle this manually. > I > > > > don't > > > > > > want > > > > > > >> to pick on Flume specifically, but as an example, it requires > > > either > > > > > > >> configuring multiple (or multiplexed) flows in a single agent > or > > > > > manage > > > > > > >> multiple agents independently. This isn't really the same as > > what > > > > I've > > > > > > >> described above where you hand Copycat one config and it > > > > automatically > > > > > > >> spreads the work across multiple, fault-tolerant tasks. But > > flume > > > is > > > > > > also > > > > > > >> targeting a much different general problem, trying to build > > > > > potentially > > > > > > >> large, multi-stage data flows with all sorts of > transformations, > > > > > > filtering, > > > > > > >> etc. > > > > > > >> > > > > > > >> > > > > > > >>> > > > > > > >>> > > > > > > >>> @Ewen, > > > > > > >>> > > > > > > >>> >Import: Flume is just one of many similar systems designed > > > around > > > > > log > > > > > > >>> >collection. See notes below, but one major point is that > they > > > > > > generally > > > > > > >>> >don't provide any sort of guaranteed delivery semantics. > > > > > > >>> > > > > > > >>> > > > > > > >>> I think most of them do provide guarantees of some sort (Ex. > > > Flume > > > > & > > > > > > >>> FluentD). > > > > > > >>> > > > > > > >> > > > > > > >> This part of the discussion gets a little bit tricky, not > least > > > > > because > > > > > > it > > > > > > >> seems people can't agree on exactly what these terms mean. > > > > > > >> > > > > > > >> First, some systems that you didn't mention. Logstash > definitely > > > > > doesn't > > > > > > >> have any guarantees as it uses a simple 20-event in-memory > > buffer > > > > > > between > > > > > > >> stages. As far as I can tell, Heka doesn't provide these > > semantics > > > > > > either, > > > > > > >> although I have not investigated it as deeply. > > > > > > >> > > > > > > >> fluentd has an article discussing the options for it ( > > > > > > >> http://docs.fluentd.org/articles/high-availability), but I > > > actually > > > > > > think > > > > > > >> the article on writing plugins is more informative > > > > > > >> http://docs.fluentd.org/articles/plugin-development The most > > > > > important > > > > > > >> point is that input plugins have no way to track or discovery > > > > > downstream > > > > > > >> delivery (i.e. they cannot get acks, nor is there any sort of > > > offset > > > > > > >> tracked that it can lookup to discover where to restart upon > > > > failure, > > > > > > nor > > > > > > >> is it guaranteed that after router.emit() returns that the > data > > > will > > > > > > have > > > > > > >> already been delivered downstream). So if I have a replicated > > > input > > > > > data > > > > > > >> store, e.g. a replicated database, and I am just reading off > > it's > > > > > > >> changelog, does fluentd actually guarantee something like at > > least > > > > > once > > > > > > >> delivery to the sink? In fact, fluentd's own documentation > (the > > > high > > > > > > >> availability doc) describes data loss scenarios that aren't > > > inherent > > > > > to > > > > > > >> every system (e.g., if their log aggregator dies, which not > > every > > > > > > system is > > > > > > >> susceptible to, vs. if an event is generated on a single host > > and > > > > that > > > > > > host > > > > > > >> dies before reporting it anywhere, then of course the data is > > > > > > permanently > > > > > > >> lost). > > > > > > >> > > > > > > >> Flume actually does have a (somewhat confusingly named) > > > transaction > > > > > > concept > > > > > > >> to help control this. The reliability actually depends on what > > > type > > > > of > > > > > > >> channel implementation you use. Gwen and Jeff from Cloudera > > > > integrated > > > > > > >> Kafka and Flume, including a Kafka channel (see > > > > > > >> > > > > > > > > > > > > > > > > > > > > > http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/ > > > > > > ). > > > > > > >> This does allow for better control over delivery semantics, > and > > I > > > > > think > > > > > > if > > > > > > >> you use something like Kafka for every channel in your > pipeline, > > > you > > > > > can > > > > > > >> get something like what Copycat can provide. I'd argue flume's > > > > > approach > > > > > > has > > > > > > >> some other drawbacks though. In order to work correctly, every > > > > source > > > > > > and > > > > > > >> sink has to handle the transaction semantics, which adds > > > complexity > > > > > > >> (although they do offer great skeleton examples in their > docs!). > > > > > > >> > > > > > > >> Copycat tries to avoid that complexity for connector > developers > > by > > > > > > changing > > > > > > >> the framework to use streams, offsets, and commits, and > pushing > > > the > > > > > > >> complexities of dealing with any sorts of errors/failures into > > the > > > > > > >> framework. Ideally connector developers only need to a) check > > for > > > > > > offsets > > > > > > >> at startup and rewind to the last known committed offset and > b) > > > load > > > > > > events > > > > > > >> from the source system (with stream IDs and offsets) and pass > > them > > > > to > > > > > > the > > > > > > >> framework. > > > > > > >> > > > > > > >> > > > > > > >>> > > > > > > >>> >YARN: My point isn't that YARN is bad, it's that tying to > any > > > > > > particular > > > > > > >>> >cluster manager severely limits the applicability of the > tool. > > > The > > > > > > goal is > > > > > > >>> >to make Copycat agnostic to the cluster manager so it can > run > > > > under > > > > > > Mesos, > > > > > > >>> >YARN, etc. > > > > > > >>> > > > > > > >>> ok. Got it. Sounds like there is plan to do some work here to > > > > ensure > > > > > > >>> out-of-the-box it works with more than one scheduler (as @Jay > > > > listed > > > > > > out). > > > > > > >>> In that case, IMO it would be better to actually rephrase it > in > > > the > > > > > KIP > > > > > > >>> that it will support more than one scheduler. > > > > > > >>> > > > > > > >>> > > > > > > >> Tried to add some wording to clarify that. > > > > > > >> > > > > > > >> > > > > > > >>> > > > > > > >>> >Exactly once: You accomplish this in any system by managing > > > > offsets > > > > > > in the > > > > > > >>> >destination system atomically with the data or through some > > kind > > > > of > > > > > > >>> >deduplication. Jiangjie actually just gave a great talk > about > > > this > > > > > > issue > > > > > > >>> >at > > > > > > >>> >a recent Kafka meetup, perhaps he can share some slides > about > > > it. > > > > > > When you > > > > > > >>> >see all the details involved, you'll see why I think it > might > > be > > > > > nice > > > > > > to > > > > > > >>> >have the framework help you manage the complexities of > > achieving > > > > > > different > > > > > > >>> >delivery semantics ;) > > > > > > >>> > > > > > > >>> > > > > > > >>> Deduplication as a post processing step is a common > > > recommendation > > > > > done > > > > > > >>> today Š but that is a workaround/fix for the inability to > > provide > > > > > > >>> exactly-once by the delivery systems. IMO such post > processing > > > > should > > > > > > not > > > > > > >>> be considered part of the "exacty-once" guarantee of Copycat. > > > > > > >>> > > > > > > >>> > > > > > > >>> Will be good to know how this guarantee will be possible when > > > > > > delivering > > > > > > >>> to HDFS. > > > > > > >>> Would be great if someone can share those slides if it is > > > discussed > > > > > > there. > > > > > > >>> > > > > > > >>> > > > > > > >> For HDFS, the gist of the solution is to write to temporary > > files > > > > and > > > > > > then > > > > > > >> rename atomically to their final destination, including offset > > > > > > information > > > > > > >> (e.g., it can just be in the filename). Readers only see files > > > that > > > > > have > > > > > > >> been "committed". If there is a failure, any existing temp > files > > > get > > > > > > >> cleaned up and reading is reset to the last committed offset. > > > There > > > > > are > > > > > > >> some tricky details if you have zombie processes and depending > > on > > > > how > > > > > > you > > > > > > >> organize the data across files, but this isn't really the > point > > of > > > > > this > > > > > > >> KIP. If you're interested in HDFS specifically, I'd suggest > > > looking > > > > at > > > > > > >> Camus's implementation. > > > > > > >> > > > > > > >> > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> Was looking for clarification on this .. > > > > > > >>> - Export side - is this like a map reduce kind of job or > > > something > > > > > > else ? > > > > > > >>> If delivering to hdfs would this be running on the hadoop > > cluster > > > > or > > > > > > >>> outside ? > > > > > > >>> > > > > > > >> - Import side - how does this look ? Is it a bunch of flume > like > > > > > > processes > > > > > > >>> ? maybe just some kind of a broker that translates the > incoming > > > > > > protocol > > > > > > >>> into outgoing Kafka producer api protocol ? If delivering to > > > hdfs, > > > > > will > > > > > > >>> this run on the cluster or outside ? > > > > > > >>> > > > > > > >> > > > > > > >> No mapreduce; in fact, no other frameworks required unless the > > > > > connector > > > > > > >> needs it for some reason. Both source and sink look > structurally > > > the > > > > > > same. > > > > > > >> Probably the most common scenario is to run a set of workers > > that > > > > > > provide > > > > > > >> the copycat service. You submit connector jobs to run on these > > > > > workers. > > > > > > A > > > > > > >> coordinator handles distributing the work across worker nodes. > > > > > > Coordinators > > > > > > >> determine how to divide the tasks and generate configs for > them, > > > > then > > > > > > the > > > > > > >> framework handles distributing that work. Each individual task > > > > handles > > > > > > some > > > > > > >> subset of the job. For source tasks, that subset is a set of > > input > > > > > > streams > > > > > > >> (in the JDBC example in the KIP, each table would have a > > > > corresponding > > > > > > >> stream). For sink tasks, the subset is determined > automatically > > by > > > > the > > > > > > >> framework via the underlying consumer group as a subset of > > > > > > topic-partitions > > > > > > >> (since the input is from Kafka). Connectors are kept simple, > > just > > > > > > >> processing streams of records (either generating them by > reading > > > > from > > > > > > the > > > > > > >> source system or recording them into the sink system). Source > > > tasks > > > > > also > > > > > > >> include information about offsets, and sink tasks either need > to > > > > > manage > > > > > > >> offsets themselves or implement flush() functionality. Given > > these > > > > > > >> primitives, the framework can then handle other complexities > > like > > > > > > different > > > > > > >> delivery semantics without any additional support from the > > > > connectors. > > > > > > >> > > > > > > >> The motivation for the additional modes of execution (agent, > > > > embedded) > > > > > > was > > > > > > >> to support a couple of other common use cases. Agent mode is > > > > > completely > > > > > > >> standalone, which provides for a much simpler implementation > and > > > > > handles > > > > > > >> use cases where there isn't an easy way to avoid running the > job > > > > > across > > > > > > >> many machines (e.g., if you have to load logs directly from > log > > > > > files). > > > > > > >> Embedded mode is actually a simple variant of the distributed > > > mode, > > > > > but > > > > > > >> lets you setup and run the entire cluster alongside the rest > of > > > your > > > > > > >> distributed app. This is useful if you want to get up and > > running > > > > with > > > > > > an > > > > > > >> application where you need to, for example, import data from > > > another > > > > > > >> service into Kafka, then consume and process that data. You > can > > > > setup > > > > > > the > > > > > > >> worker and submit a job directly from your code, reducing the > > > > > > operational > > > > > > >> complexity. It's probably not the right long term solution as > > your > > > > > usage > > > > > > >> expands, but it can significantly ease adoption. > > > > > > >> > > > > > > >> > > > > > > >>> > > > > > > >>> > > > > > > >>> I still think adding one or two specific end-to-end use-cases > > in > > > > the > > > > > > KIP, > > > > > > >>> showing how copycat will pan out for them for import/export > > will > > > > > really > > > > > > >>> clarify things. > > > > > > >>> > > > > > > >> > > > > > > >> There were a couple of examples already in the KIP -- JDBC, > > HDFS, > > > > log > > > > > > >> import, and now I've also added mirror maker. Were you looking > > for > > > > > > >> something more specific? I could also explain a full source -> > > > kafka > > > > > -> > > > > > > >> sink pipeline, but I don't know that there's much to add there > > > > beyond > > > > > > the > > > > > > >> fact that we would like schemas to carry across the entire > > > pipeline. > > > > > > >> Otherwise it's just chaining connectors. Besides, I think most > > of > > > > the > > > > > > >> interesting use cases actually have additional processing > steps > > in > > > > > > between, > > > > > > >> i.e. using stream processing frameworks or custom consumers + > > > > > producers. > > > > > > >> > > > > > > >> -- > > > > > > >> Thanks, > > > > > > >> Ewen > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Ewen > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > Thanks, > > Ewen > > > > > > -- > -- Guozhang > -- Thanks, Ewen