I know this is a really old thread, but it looked like the only pertinent one that came up when searching for ‘exactly once’ in the archives. I just want to confirm my understanding of the 0.8 version in that it still doesn’t completely support exactly once semantics. With the producer configured in sync mode and quorum commits there are still some edge case failure modes where the producer won’t receive the ack and resends the message(s). I think I read that the consumers don’t see uncommitted messages in the log, but I don’t think that addresses this producer case. Please correct me if I am missing something here.
Don’t get me wrong we are very thankful for the 0.8 features. It offers by far the best message delivery guarantees out of the products we evaluated like Rabbit and ActiveMQ. We attempt to make are downstream consumer processes idempotent to mitigate this edge case, but it isn’t always feasible. Also the suggestion by Milind in this thread of using Storm for exactly once guarantees makes a lot of sense. Trident State seems to address this very issue ( https://github.com/nathanmarz/storm/wiki/Trident-state) so we could just have it mediate our topics that required exactly once. -Jonathan On Sat, Nov 3, 2012 at 1:53 PM, Milind Parikh <milindpar...@gmail.com>wrote: > Why wouldn't the storm approach provide semantics of exactly once > delivery? https://github.com/nathanmarz/storm > > Nathan actually credits the Kafka_devs for the basic idea of transaction > persisting in one of his talks. > > Regards > Milind > > On Nov 3, 2012 11:51 AM, "Rohit Prasad" <rohit.prasa...@gmail.com> wrote: > > > I agree that this approach only prevents duplicate messages to partition > > from the Producer side. There needs to be a similar approach on the > > consumer side too. Using Zk can be one solution, or other non-ZK > > approaches. > > > > Even if Consumer reads none or all messages of a transaction. But that > does > > not solve the transaction problem yet. Because the business/application > > logic inside the Consumer thread may execute partially and fail. So it > > becomes tricky to decide the point when you want to say that you have > > "consumed" the message and increase consumption offset. If your consumer > > thread is saving some value into DB/HDFS/etc, ideally you want this save > > operation and consumption offset to be incremented atomically. Thats why > it > > boils down to Application logic implementing transactions and dealing > with > > duplicates. > > Maybe a journalling or redo log approach on Consumer side can help build > > such a system. > > > > It will be nice if eventually kafka can be a transport which provides > > "exactly once" semantics for message delivery. Then consumer threads can > be > > sure that they receive messages once, and can build appln logic on top of > > that. > > > > I have a use case similar to what Jay mentioned in a previous mail. I > want > > to do aggregation but want the aggregated data to be correct, possible > > avoiding duplicates incase of failures/crashes. > > > > > > > > On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <tombrow...@gmail.com> wrote: > > > > > That approach allows a producer to prevent duplicate messages to the > > > partition, but what about the consumer? In my case, I don't want the > > > consumer to be able to read any of the messages unless it can read all > > > of the messages from a transaction. > > > > > > I also like the idea of there being multiple types of Kafka > > > transaction, though, just to accommodate different performance, > > > reliability, and consumption patterns. Of course, the added complexity > > > of that might just sink the whole thing. > > > > > > --Tom > > > > > > On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <rohit.prasa...@gmail.com > > > > > wrote: > > > > Getting transactional support is quite hard problem. There will > always > > be > > > > corner cases where the solution will not work, unless you want to go > > down > > > > the path of 2PC, paxos, etc which ofcourse will degrade kafka's > > > > performance. It is best to reconcile data and deal with duplicate > > > messages > > > > in Application layer. Having said that it would be amazing if we can > > > build > > > > "at most once" semantics in Kafka!! > > > > > > > > Regarding above approaches, > > > > The producer will always have a doubt if its commit went through. > i.e. > > if > > > > the ack for "commit" is not received by the producer. Or If producer > > dies > > > > immediately after calling the commit. When it is restarted how does > it > > > know > > > > if last operation went through? > > > > > > > > I suggest the following - > > > > 1. Producer should attach a timestamp at the beginning of each > message > > > and > > > > send it to Server. > > > > 2. On restarts/timeouts/re-connections, the producer should first > read > > > the > > > > last committed message from the leader of the partition. > > > > 3. From timestamp, it can know how many messages went through before > it > > > > died (or connection was broken). And it can infer how many messages > to > > > > replay. > > > > > > > > The above approach can be used with existing Kafka libraries since > you > > > can > > > > have a producer and consumer thread together in an application to > > > implement > > > > this logic. Or someone can take the initiative to write a > Transactional > > > > producer (which internally has both producer and a consumer to read > > last > > > > committed message.) I will be developing one for kafka 0.8 in c++. > > > > > > > > The above approach will work even if you batch messages for a single > > > > partition. > > > > The above approach will work only if a single producer is writing to > a > > > > partition. I want hear opinions about the above approach. I sure > there > > > can > > > > be corner-cases where it may break. > > > > > > > > If there are multiple producers to a partition, then some book > keeping > > on > > > > server side with regards to last msg committed from a "co-relation > id" > > > (to > > > > identify unique producer) may be needed. > > > > > > > > > > > > Regards, > > > > Rohit > > > > > > > > > > > > On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > >> If you use Kafka just as a redo log, you can't undo anything that's > > > written > > > >> to the log. Write-ahead logs in typical database systems are both > redo > > > and > > > >> undo logs. Transaction commits and rollbacks are implemented on top > of > > > the > > > >> logs. However, general-purpose write-ahead logs for transactions are > > > much > > > >> more complicated. > > > >> > > > >> Thanks, > > > >> > > > >> Jun > > > >> > > > >> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <jay.kr...@gmail.com> > > > wrote: > > > >> > > > >> > This is an important feature and I am interested in helping out in > > the > > > >> > design and implementation, though I am working on 0.8 features for > > the > > > >> next > > > >> > month so I may not be of too much use. I have thought a little bit > > > about > > > >> > this, but I am not yet sure of the best approach. > > > >> > > > > >> > Here is a specific use case I think is important to address: > > consider > > > a > > > >> > case where you are doing processing of one or more streams and > > > producing > > > >> an > > > >> > output stream. This processing may involve some kind of local > state > > > (say > > > >> > counters or other local aggregation intermediate state). This is a > > > common > > > >> > scenario. The problem is to give reasonable semantics to this > > > computation > > > >> > in the presence of failures. The processor effectively has a > > > >> > position/offset in each of its input streams as well as whatever > > local > > > >> > state. The problem is that if this process fails it needs to > restore > > > to a > > > >> > state that matches the last produced messages. There are several > > > >> solutions > > > >> > to this problem. One is to make the output somehow idempotent, > this > > > will > > > >> > solve some cases but is not a general solution as many things > cannot > > > be > > > >> > made idempotent easily. > > > >> > > > > >> > I think the two proposals you give outline a couple of basic > > > approaches: > > > >> > 1. Store the messages on the server somewhere but don't add them > to > > > the > > > >> log > > > >> > until the commit call > > > >> > 2. Store the messages in the log but don't make them available to > > the > > > >> > consumer until the commit call > > > >> > Another option you didn't mention: > > > >> > > > > >> > I can give several subtleties to these approaches. > > > >> > > > > >> > One advantage of the second approach is that messages are in the > log > > > and > > > >> > can be available for reading or not. This makes it possible to > > > support a > > > >> > kind of "dirty read" that allows the consumer to specify whether > > they > > > >> want > > > >> > to immediately see all messages with low latency but potentially > see > > > >> > uncommitted messages or only see committed messages. > > > >> > > > > >> > The problem with the second approach at least in the way you > > describe > > > it > > > >> is > > > >> > that you have to lock the log until the commit occurs otherwise > you > > > can't > > > >> > roll back (because otherwise someone else may have appended their > > own > > > >> > messages and you can't truncate the log). This would have all the > > > >> problems > > > >> > of remote locks. I think this might be a deal-breaker. > > > >> > > > > >> > Another variation on the second approach would be the following: > > have > > > >> each > > > >> > producer maintain an id and generation number. Keep a schedule of > > > valid > > > >> > offset/id/generation numbers on the broker and only hand these > out. > > > This > > > >> > solution would support non-blocking multi-writer appends but > > requires > > > >> more > > > >> > participation from the producer (i.e. getting a generation number > > and > > > >> id). > > > >> > > > > >> > Cheers, > > > >> > > > > >> > -Jay > > > >> > > > > >> > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <tombrow...@gmail.com> > > > wrote: > > > >> > > > > >> > > I have come up with two different possibilities, both with > > different > > > >> > > trade-offs. > > > >> > > > > > >> > > The first would be to support "true" transactions by writing > > > >> > > transactional data into a temporary file and then copy it > directly > > > to > > > >> > > the end of the partition when the commit command is created. The > > > >> > > upside to this approach is that individual transactions can be > > > larger > > > >> > > than a single batch, and more than one producer could conduct > > > >> > > transactions at once. The downside is the extra IO involved in > > > writing > > > >> > > it and reading it from disk an extra time. > > > >> > > > > > >> > > The second would be to allow any number of messages to be > appended > > > to > > > >> > > a topic, but not move the "end of topic" offset until the commit > > was > > > >> > > received. If a rollback was received, or the producer timed out, > > the > > > >> > > partition could be truncated at the most recently recognized > "end > > of > > > >> > > topic" offset. The upside is that there is very little extra IO > > > (only > > > >> > > to store the official "end of topic" metadata), and it seems > like > > it > > > >> > > should be easy to implement. The downside is that this the > > > >> > > "transaction" feature is incompatible with anything but a single > > > >> > > producer per partition. > > > >> > > > > > >> > > I am interested in your thoughts on these. > > > >> > > > > > >> > > --Tom > > > >> > > > > > >> > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole < > > phi...@loggly.com> > > > >> > wrote: > > > >> > > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote: > > > >> > > >> The closest concept of transaction on the publisher side, > that > > I > > > can > > > >> > > >> think of, is using batch of messages in a single call to the > > > >> > > >> synchronous producer. > > > >> > > >> > > > >> > > >> Precisely, you can configure a Kafka producer to use the > "sync" > > > mode > > > >> > > >> and batch messages that require transactional guarantees in a > > > >> > > >> single send() call. That will ensure that either all the > > > messages in > > > >> > > >> the batch are sent or none. > > > >> > > > > > > >> > > > This is an interesting feature -- something I wasn't aware of. > > > Still > > > >> it > > > >> > > > doesn't solve the problem *completely*. As many people > realise, > > > it's > > > >> > > still > > > >> > > > possible for the batch of messages to get into Kafka fine, but > > the > > > >> ack > > > >> > > from > > > >> > > > Kafka to be lost on its way back to the Producer. In that case > > the > > > >> > > Producer > > > >> > > > erroneously believes the messages didn't get in, and might > > re-send > > > >> > them. > > > >> > > > > > > >> > > > You guys *haven't* solved that issue, right? I believe you > write > > > >> about > > > >> > > it on > > > >> > > > the Kafka site. > > > >> > > > > > > >> > > >> > > > >> > > >> Thanks, > > > >> > > >> Neha > > > >> > > >> > > > >> > > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown < > > tombrow...@gmail.com > > > > > > > >> > > wrote: > > > >> > > >> > Is there an accepted, or recommended way to make writes to > a > > > Kafka > > > >> > > >> > queue idempotent, or within a transaction? > > > >> > > >> > > > > >> > > >> > I can configure my system such that each queue has exactly > > one > > > >> > > producer. > > > >> > > >> > > > > >> > > >> > (If there are no accepted/recommended ways, I have a few > > ideas > > > I > > > >> > would > > > >> > > >> > like to propose. I would also be willing to implement them > if > > > >> > needed) > > > >> > > >> > > > > >> > > >> > Thanks in advance! > > > >> > > >> > > > > >> > > >> > --Tom > > > >> > > > > > > >> > > > -- > > > >> > > > Philip O'Toole > > > >> > > > > > > >> > > > Senior Developer > > > >> > > > Loggly, Inc. > > > >> > > > San Francisco, Calif. > > > >> > > > www.loggly.com > > > >> > > > > > > >> > > > Come join us! > > > >> > > > http://loggly.com/company/careers/ > > > >> > > > > > >> > > > > >> > > > > > >