or even better - if topic creation is done dynamically by the replicator, setting the initial offsets for partitions could be made part of topic creation ? even less API changes this way
On Thu, Dec 29, 2016 at 10:49 PM, radai <radai.rosenbl...@gmail.com> wrote: > ah, I didnt realize we are limiting the discussion to master --> slave. > > but - if we're talking about master-slave replication, and under the > conditions i outlined earlier (src and dest match in #partitions, no > foreign writes to dest) it "just works", seems to me the only thing youre > really missing is not an explicit desired offset param on each and every > request, but just the ability to "reset" the starting offset on the dest > cluster at topic creation. > > let me try and run through a more detailed scenario: > > 1. suppose i set up the original cluster (src). no remote cluster yet. > lets say over some period of time i produce 1 million msgs to topic X on > this src cluster. > 2. company grows, 2nd site is opened, dest cluster is created, topic X is > created on (brand new) dest cluster. > 3. offsets are manually set on every partition of X on the dest cluster to > match either the oldest retained or current offset of the matching > partition of X in src. in pseudo code: > > for (partI in numPartitions) { > partIOffset > if (replicateAllRetainedHistory) { > partIOffset = src.getOldestRetained(partI) > } else { > partIOffset = src.getCurrent(partI) //will not copy over history > } > dest.resetStartingOffset(partI, partIOffset) <---- new mgmt API > } > > 4. now you are free to start replicating. under master --> slave > assumptions offsets will match from this point forward > > seems to me something like this could be made part of the replicator > component (mirror maker, or whatever else you want to use) - if topic X > does not exist in destination, create it, reset initial offsets to match > source, start replication > > On Thu, Dec 29, 2016 at 12:41 PM, Andrey L. Neporada < > anepor...@yandex-team.ru> wrote: > >> >> > On 29 Dec 2016, at 20:43, radai <radai.rosenbl...@gmail.com> wrote: >> > >> > so, if i follow your suggested logic correctly, there would be some >> sort of >> > : >> > >> > produce(partition, msg, requestedOffset) >> > >> >> > which would fail if requestedOffset is already taken (by another >> previous >> > such explicit call or by another regular call that just happened to get >> > assigned that offset by the partition leader on the target cluster). >> > >> >> Yes. More formally, my proposal is to extend ProduceRequest by adding >> MessageSetStartOffset: >> >> ProduceRequest => RequiredAcks Timeout [TopicName [Partition >> MessageSetStartOffset MessageSetSize MessageSet]] >> RequiredAcks => int16 >> Timeout => int32 >> Partition => int32 >> MessageSetSize => int32 >> MessageSetStartOffset => int64 >> >> If MessageSetStartOffset is -1, ProduceRequest should work exactly as >> before - i.e. assign next available offset to given MessageSet. >> >> >> > how would you meaningfully handle this failure? >> > >> > suppose this happens to some cross-cluster replicator (like mirror >> maker). >> > there is no use in retrying. the options would be: >> > >> > 1. get the next available offset - which would violate what youre >> trying to >> > achieve >> > 2. skip msgs - so replication is incomplete, any offset "already taken" >> on >> > the destination is not replicated from source >> > 3. stop replication for this partition completely - because starting >> from >> > now _ALL_ offsets will be taken - 1 foreign msg ruins everything for the >> > entire partition. >> > >> > none of these options look good to me. >> > >> > >> >> Since we are discussing master-slave replication, the only client writing >> to slave cluster is the replicator itself. >> In this case ProduceRequest failure is some kind of replication logic >> error - for example when two replication instances are somehow launched for >> single partition. >> The best option here is just to stop replication process. >> >> So the answer to your question is (3), but this scenario should never >> happen. >> >> >> > >> > On Thu, Dec 29, 2016 at 3:22 AM, Andrey L. Neporada < >> > anepor...@yandex-team.ru> wrote: >> > >> >> Hi! >> >> >> >>> On 27 Dec 2016, at 19:35, radai <radai.rosenbl...@gmail.com> wrote: >> >>> >> >>> IIUC if you replicate from a single source cluster to a single target >> >>> cluster, the topic has the same number of partitions on both, and no >> one >> >>> writes directly to the target cluster (so master --> slave) the >> offsets >> >>> would be preserved. >> >>> >> >> >> >> Yes, exactly. When you >> >> 1) create topic with the same number of partitions on both master and >> >> slave clusters >> >> 2) write only to master >> >> 3) replicate partition to partition from master to slave >> >> - in this case the offsets will be preserved. >> >> >> >> However, you usually already have cluster that works and want to >> replicate >> >> some topics to another one. >> >> IMHO, in this scenario there should be a way to make message offsets >> equal >> >> on both clusters. >> >> >> >>> but in the general case - how would you handle the case where multiple >> >>> producers "claim" the same offset ? >> >> >> >> The same way as Kafka handles concurrent produce requests for the same >> >> partition - produce requests for partition are serialized. >> >> If the next produce request “overlaps” with previous one, it fails. >> >> >> >>> >> >>> >> >>> On Mon, Dec 26, 2016 at 4:52 AM, Andrey L. Neporada < >> >>> anepor...@yandex-team.ru> wrote: >> >>> >> >>>> Hi all! >> >>>> >> >>>> Suppose you have two Kafka clusters and want to replicate topics from >> >>>> primary cluster to secondary one. >> >>>> It would be very convenient for readers if the message offsets for >> >>>> replicated topics would be the same as for primary topics. >> >>>> >> >>>> As far as I know, currently there is no way to achieve this. >> >>>> I wonder is it possible/reasonable to add message offset to >> >> ProduceRequest? >> >>>> >> >>>> >> >>>> — >> >>>> Andrey Neporada >> >>>> >> >>>> >> >>>> >> >>>> >> >> >> >> >> >> >