small correction in my third point - 3. IO and memory constraints - We would want a solution that *does not take* 2x the number of writes.
On Mon, Dec 19, 2016 at 12:37 PM, Sriram Subramanian <r...@confluent.io> wrote: > Radai, > > I think it is important to understand the key requirements that we don’t > want to compromise. We can then understand the tradeoffs of the different > approaches. We did in fact start with the double journal approach couple of > years back. I will highlight the must have requirements first and then > explain the trade offs based on my understanding. > > 1. End to end latency for stream processing - This is probably one of the > biggest reasons to support transactions in Kafka. We would like to support > very low latency for end to end processing across steam topologies. This > means you would want your downstream processors to see the output of your > processing immediately. The low latency is a requirement even if we only > expose committed messages. > > 2. Speculative execution - We would like to go one step further for stream > processing. 99% of the transactions will always succeed. We would like to > take advantage of this and process the messages optimistically even if the > transactions are still unfinished. If the transactions abort, we would do a > cascading abort across the topology. This helps us to complete all the > processing and keep the output ready and expose them once the transactions > are committed. This will help us to significantly bring down the latency > for end to end stream processing and provide the ability to keep exactly > once as the default setting. > > 3. IO and memory constraints - We would want a solution that takes 2x the > number of writes. This will bring down broker utilization by half. I don’t > really understand the in memory solution (would be useful if you can > explain it more if you think it solves these goals) but the same resource > constraints apply. What has made Kafka successful is the ability to run > very high throughput clusters with very few machines. We would like to keep > this true when a cluster is largely dominated by stream processing > workloads. > > 4. Provide both read committed and read uncommitted isolation levels - > This is actually a desired feature. This is similar to database isolation > levels (except that we provide only two of them for now). Downstream > systems that need strong guarantees with some performance impact can choose > read committed isolation level. Systems that want to optimize for > performance and can live with approximations would choose read uncommitted > options. This helps to nicely decouple downstream users that would like to > share topics but have different end goals. > > There are other obvious goals like correctness of the protocol and > simplicity of the design that needs to be true by default. > > Given these goals, the double journal approach was a non starter to enable > low end to end latency and did not provide the ability to do speculative > execution in the future. We also found the resource constraints > (specifically IO/Network) to be unacceptable. > > We did understand the complexity of the consumers but it was the best > tradeoff considering the other must have goals. We also thought of another > approach to push the consumer buffering to the broker side. This would > enable multiple consumer groups to share the same buffer pool for a > specific topic partition. However, in the worst case, you would need to > bring the entire log into memory to remove the aborted transaction (for a > consumer that is catching up from time 0). This would also make us loose > zero copy semantics. > > I would be excited to hear an option that can solve our must have goals > and still keep the consumer really thin. The abstraction seems fine since > we allow the end users to pick the guarantees they need. > > On Mon, Dec 19, 2016 at 10:56 AM, Jay Kreps <j...@confluent.io> wrote: > >> Hey Radai, >> >> I'm not sure if I fully understand what you are proposing, but I >> interpreted it to be similar to a proposal we worked through back at >> LinkedIn. The proposal was to commit to a central txlog topic, and then >> recopy to the destination topic upon transaction commit. The observation >> on >> that approach at the time were the following: >> >> 1. It is cleaner since the output topics have only committed data! >> 2. You need full replication on the txlog topic to ensure atomicity. We >> weren't able to come up with a solution where you buffer in memory or >> use >> renaming tricks the way you are describing. The reason is that once you >> begin committing you must ensure that the commit eventually succeeds to >> guarantee atomicity. If you use a transient store you might commit some >> data and then have a server failure that causes you to lose the rest >> of the >> transaction. >> 3. Having a single log allows the reader to choose a "read uncommitted" >> mode that hands out messages immediately. This is important for cases >> where >> latency is important, especially for stream processing topologies where >> these latencies stack up across multiple stages. >> >> For the stream processing use case, item (2) is a bit of a deal killer. >> This takes the cost of a transient message write (say the intermediate >> result of a stream processing topology) from 3x writes (assuming 3x >> replication) to 6x writes. This means you basically can't default it on. >> If >> we can in fact get the cost down to a single buffered write (i.e. 1x the >> data is written to memory and buffered to disk if the transaction is >> large) >> as in the KIP-98 proposal without too many other negative side effects I >> think that could be compelling. >> >> -Jay >> >> >> >> On Mon, Dec 19, 2016 at 9:36 AM, radai <radai.rosenbl...@gmail.com> >> wrote: >> >> > regarding efficiency: >> > >> > I'd like to distinguish between server efficiency (resource utilization >> of >> > the broker machine alone) and overall network efficiency (resource >> > utilization on brokers, producers and consumers, including network >> > traffic). >> > my proposal is not as resource-efficient on the broker (although it can >> be, >> > depends on a few trade offs and implementation details). HOWEVER, if i >> look >> > at the overall efficiency: >> > >> > 1.clients would need to either buffer or double-read uncommitted >> msgs. >> > for N clients reading the stream M times (after re-starts and >> reconsumes) >> > this would mean a M*N factor in either network BW or disk/memory space >> > (depends on if buffer vs re-read). potentially N*M more broker-side >> reads >> > too. >> > 2 to reduce the broker side cost several things can be done (this is >> not >> > an either-or list, these are commulative): >> > 2.1 - keep TX logs in mem (+overflow to disk) - trades disk writes >> > for TX resiliency >> > 2.2 - when "appending" TX logs to real partitions - instead of >> > reading from (disk-based) TX log and writing to partition log (x2 disk >> > writes) the TX log can be made a segment file (so file rename, with >> > associated protocol changes). this would avoid double writing by simply >> > making the TX file part of the partition (for large enough TXs. smaller >> > ones can be rewritten). >> > 2.3 - the approach above could be combined with a background >> "defrag" >> > - similar in concept to compaction - to further reduce the total of >> > resulting number of files. >> > >> > I think my main issue with the current proposal, more important than >> > performance, is lack of proper "encapsulation" of transactions - I dont >> > think downstream consumers should see uncommitted msgs. ever. >> > >> > >> > On Sun, Dec 18, 2016 at 10:19 PM, Becket Qin <becket....@gmail.com> >> wrote: >> > >> > > @Jason >> > > >> > > Yes, second thought on the number of messages included, the offset >> delta >> > > will probably be sufficient. The use case I encounter before for >> number >> > of >> > > messages in a message set is an embedded mirror maker on the >> destination >> > > broker side which fetches message directly from the source cluster. >> > Ideally >> > > the destination cluster only needs to check CRC and assign the offsets >> > > because all the message verification has been done by the source >> cluster, >> > > but due to the lack of the number of messages in the message set, we >> have >> > > to decompress the message set to increment offsets correctly. By >> knowing >> > > the number of the messages in the message set, we can avoid doing >> that. >> > The >> > > offset delta will also help. It's just then the offsets may have holes >> > for >> > > log compacted topics, but that may be fine. >> > > >> > > @Apurva >> > > >> > > I am not sure if it is true that the consumer will either deliver all >> the >> > > message for the entire transaction or none of them from one poll() >> call. >> > If >> > > we allow the transactions to be across partitions, unless the consumer >> > > consumes from all the partitions involved in a transactions, it seems >> > > impossible for it to deliver *all* the messages in a transaction, >> right? >> > A >> > > weaker guarantee is we will deliver all or none of the messages that >> > belong >> > > to the same transaction in ONE partition, but this would be different >> > from >> > > the guarantee from the producer side. >> > > >> > > My two cents on Radai's sideways partition design: >> > > 1. If we consider the producer side behavior as doing a two phase >> commit >> > > which including the committing the consumer offsets, it is a little >> > awkward >> > > that we allow uncommitted message goes into the main log and rely on >> the >> > > consumer to filter out. So semantic wise I think it would be better >> if we >> > > can avoid this. Radai's suggestion is actually intuitive because if >> the >> > > brokers do not want to expose uncommitted transactions to the >> consumer, >> > the >> > > brokers have to buffer it. >> > > >> > > 2. Regarding the efficiency. I think may be it worth looking at the >> > > efficiency cost v.s benefit. The efficiency includes both server side >> > > efficiency and consumer side efficiency. >> > > >> > > Regarding the server side efficiency, the current proposal would >> probably >> > > have better efficiency regardless of whether something goes wrong. >> > Radai's >> > > suggestion would put more burden on the server side. If nothing goes >> > wrong >> > > we always pay the cost of having double copy of the transactional >> > messages >> > > and do not get the semantic benefit. But if something goes wrong, the >> > > efficiency cost we pay we get us a better semantic. >> > > >> > > For the consumer side efficiency, because there is no need to buffer >> the >> > > uncommitted messages. The current proposal may have to potentially >> buffer >> > > uncommitted messages so it would be less efficient than Radai's >> > suggestion >> > > when a transaction aborts. When everything goes well, both design >> seems >> > > having the similar performance. However, it depends on whether we are >> > > willing to loosen the consumer side transaction guarantee that I >> > mentioned >> > > earlier to Apurva. >> > > >> > > Currently the biggest pressure on the consumer side is that it has to >> > > buffer incomplete transactions. There are two reasons for it, >> > > A. A transaction may be aborted so we cannot expose the messages to >> the >> > > users. >> > > B. We want to return all or none of the messages in a transaction in >> ONE >> > > partition. >> > > >> > > While reason A is mandatory, I think reason B may be discussable. >> Radai's >> > > design actually removes reason A because there is no uncommitted >> messages >> > > exposed to the consumers. This may potentially give us a chance to >> > > significantly improve consumer side efficiency in normal cases. It >> again >> > > depends on the use case, i.e. whether user can process a transaction >> > > progressively (message by message) or it has to be buffered and >> returned >> > > all together. If in most cases, users can process the transactions >> > message >> > > by message (most stream processing tasks probably can do so), then >> with >> > > Radai's proposal we don't need to buffer the transactions for the >> users >> > > anymore, which is a big difference. For the latter case, the consumer >> may >> > > have to buffer the incomplete transactions otherwise we are just >> throwing >> > > the burden onto the users. >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > On Fri, Dec 16, 2016 at 4:56 PM, Jay Kreps <j...@confluent.io> wrote: >> > > >> > > > Yeah good point. I relent! >> > > > >> > > > -jay >> > > > >> > > > On Fri, Dec 16, 2016 at 1:46 PM Jason Gustafson <ja...@confluent.io >> > >> > > > wrote: >> > > > >> > > > > Jay/Ismael, >> > > > > >> > > > > >> > > > > >> > > > > I agree that lazy initialization of metadata seems unavoidable. >> > > Ideally, >> > > > we >> > > > > >> > > > > could follow the same pattern for transactions, but remember that >> in >> > > the >> > > > > >> > > > > consumer+producer use case, the initialization needs to be >> completed >> > > > prior >> > > > > >> > > > > to setting the consumer's position. Otherwise we risk reading >> stale >> > > > > >> > > > > offsets. But it would be pretty awkward if you have to begin a >> > > > transaction >> > > > > >> > > > > first to ensure that your consumer can read the right offset from >> the >> > > > > >> > > > > consumer, right? It's a bit easier to explain that you should >> always >> > > call >> > > > > >> > > > > `producer.init()` prior to initializing the consumer. Users would >> > > > probably >> > > > > >> > > > > get this right without any special effort. >> > > > > >> > > > > >> > > > > >> > > > > -Jason >> > > > > >> > > > > >> > > > > >> > > > > On Wed, Dec 14, 2016 at 1:52 AM, Rajini Sivaram < >> rsiva...@pivotal.io >> > > >> > > > > wrote: >> > > > > >> > > > > >> > > > > >> > > > > > Hi Apurva, >> > > > > >> > > > > > >> > > > > >> > > > > > Thank you for the answers. Just one follow-on. >> > > > > >> > > > > > >> > > > > >> > > > > > 15. Let me rephrase my original question. If all control >> messages >> > > > > (messages >> > > > > >> > > > > > to transaction logs and markers on user logs) were acknowledged >> > only >> > > > > after >> > > > > >> > > > > > flushing the log segment, will transactions become durable in >> the >> > > > > >> > > > > > traditional sense (i.e. not restricted to min.insync.replicas >> > > > failures) ? >> > > > > >> > > > > > This is not a suggestion to update the KIP. It seems to me that >> the >> > > > > design >> > > > > >> > > > > > enables full durability if required in the future with a rather >> > > > > >> > > > > > non-intrusive change. I just wanted to make sure I haven't >> missed >> > > > > anything >> > > > > >> > > > > > fundamental that prevents Kafka from doing this. >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > On Wed, Dec 14, 2016 at 5:30 AM, Ben Kirwin <b...@kirw.in> >> wrote: >> > > > > >> > > > > > >> > > > > >> > > > > > > Hi Apurva, >> > > > > >> > > > > > > >> > > > > >> > > > > > > Thanks for the detailed answers... and sorry for the late >> reply! >> > > > > >> > > > > > > >> > > > > >> > > > > > > It does sound like, if the input-partitions-to-app-id mapping >> > never >> > > > > >> > > > > > > changes, the existing fencing mechanisms should prevent >> > duplicates. >> > > > > >> > > > > > Great! >> > > > > >> > > > > > > I'm a bit concerned the proposed API will be delicate to >> program >> > > > > against >> > > > > >> > > > > > > successfully -- even in the simple case, we need to create a >> new >> > > > > producer >> > > > > >> > > > > > > instance per input partition, and anything fancier is going to >> > need >> > > > its >> > > > > >> > > > > > own >> > > > > >> > > > > > > implementation of the Streams/Samza-style 'task' idea -- but >> that >> > > may >> > > > > be >> > > > > >> > > > > > > fine for this sort of advanced feature. >> > > > > >> > > > > > > >> > > > > >> > > > > > > For the second question, I notice that Jason also elaborated >> on >> > > this >> > > > > >> > > > > > > downthread: >> > > > > >> > > > > > > >> > > > > >> > > > > > > > We also looked at removing the producer ID. >> > > > > >> > > > > > > > This was discussed somewhere above, but basically the idea >> is >> > to >> > > > > store >> > > > > >> > > > > > > the >> > > > > >> > > > > > > > AppID in the message set header directly and avoid the >> mapping >> > to >> > > > > >> > > > > > > producer >> > > > > >> > > > > > > > ID altogether. As long as batching isn't too bad, the >> impact on >> > > > total >> > > > > >> > > > > > > size >> > > > > >> > > > > > > > may not be too bad, but we were ultimately more comfortable >> > with >> > > a >> > > > > >> > > > > > fixed >> > > > > >> > > > > > > > size ID. >> > > > > >> > > > > > > >> > > > > >> > > > > > > ...which suggests that the distinction is useful for >> performance, >> > > but >> > > > > not >> > > > > >> > > > > > > necessary for correctness, which makes good sense to me. >> (Would a >> > > > > 128-bid >> > > > > >> > > > > > > ID be a reasonable compromise? That's enough room for a UUID, >> or >> > a >> > > > > >> > > > > > > reasonable hash of an arbitrary string, and has only a >> marginal >> > > > > increase >> > > > > >> > > > > > on >> > > > > >> > > > > > > the message size.) >> > > > > >> > > > > > > >> > > > > >> > > > > > > On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta < >> > apu...@confluent.io >> > > > >> > > > > >> > > > > > wrote: >> > > > > >> > > > > > > >> > > > > >> > > > > > > > Hi Ben, >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > Now, on to your first question of how deal with consumer >> > > > rebalances. >> > > > > >> > > > > > The >> > > > > >> > > > > > > > short answer is that the application needs to ensure that >> the >> > the >> > > > > >> > > > > > > > assignment of input partitions to appId is consistent across >> > > > > >> > > > > > rebalances. >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > For Kafka streams, they already ensure that the mapping of >> > input >> > > > > >> > > > > > > partitions >> > > > > >> > > > > > > > to task Id is invariant across rebalances by implementing a >> > > custom >> > > > > >> > > > > > sticky >> > > > > >> > > > > > > > assignor. Other non-streams apps can trivially have one >> > producer >> > > > per >> > > > > >> > > > > > > input >> > > > > >> > > > > > > > partition and have the appId be the same as the partition >> > number >> > > to >> > > > > >> > > > > > > achieve >> > > > > >> > > > > > > > the same effect. >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > With this precondition in place, we can maintain >> transactions >> > > > across >> > > > > >> > > > > > > > rebalances. >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > Hope this answers your question. >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > Thanks, >> > > > > >> > > > > > > > Apurva >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin <b...@kirw.in> >> > wrote: >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > > Thanks for this! I'm looking forward to going through the >> > full >> > > > > >> > > > > > proposal >> > > > > >> > > > > > > > in >> > > > > >> > > > > > > > > detail soon; a few early questions: >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > First: what happens when a consumer rebalances in the >> middle >> > > of a >> > > > > >> > > > > > > > > transaction? The full documentation suggests that such a >> > > > > transaction >> > > > > >> > > > > > > > ought >> > > > > >> > > > > > > > > to be rejected: >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > > [...] if a rebalance has happened and this consumer >> > > > > >> > > > > > > > > > instance becomes a zombie, even if this offset message >> is >> > > > > appended >> > > > > >> > > > > > in >> > > > > >> > > > > > > > the >> > > > > >> > > > > > > > > > offset topic, the transaction will be rejected later on >> > when >> > > it >> > > > > >> > > > > > tries >> > > > > >> > > > > > > > to >> > > > > >> > > > > > > > > > commit the transaction via the EndTxnRequest. >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > ...but it's unclear to me how we ensure that a transaction >> > > can't >> > > > > >> > > > > > > complete >> > > > > >> > > > > > > > > if a rebalance has happened. (It's quite possible I'm >> missing >> > > > > >> > > > > > something >> > > > > >> > > > > > > > > obvious!) >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > As a concrete example: suppose a process with PID 1 adds >> > > offsets >> > > > > for >> > > > > >> > > > > > > some >> > > > > >> > > > > > > > > partition to a transaction; a consumer rebalance happens >> that >> > > > > assigns >> > > > > >> > > > > > > the >> > > > > >> > > > > > > > > partition to a process with PID 2, which adds some >> offsets to >> > > its >> > > > > >> > > > > > > current >> > > > > >> > > > > > > > > transaction; both processes try and commit. Allowing both >> > > commits >> > > > > >> > > > > > would >> > > > > >> > > > > > > > > cause the messages to be processed twice -- how is that >> > > avoided? >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > Second: App IDs normally map to a single PID. It seems >> like >> > one >> > > > > could >> > > > > >> > > > > > > do >> > > > > >> > > > > > > > > away with the PID concept entirely, and just use App IDs >> in >> > > most >> > > > > >> > > > > > places >> > > > > >> > > > > > > > > that require a PID. This feels like it would be >> significantly >> > > > > >> > > > > > simpler, >> > > > > >> > > > > > > > > though it does increase the message size. Are there other >> > > reasons >> > > > > why >> > > > > >> > > > > > > the >> > > > > >> > > > > > > > > App ID / PID split is necessary? >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang < >> > > > wangg...@gmail.com >> > > > > > >> > > > > >> > > > > > > > wrote: >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > > Hi all, >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > I have just created KIP-98 to enhance Kafka with exactly >> > once >> > > > > >> > > > > > > delivery >> > > > > >> > > > > > > > > > semantics: >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > *https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > > > > >> > > > > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging >> > > > > >> > > > > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > > > > >> > > > > > > > > > 98+-+Exactly+Once+Delivery+and >> +Transactional+Messaging>* >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > This KIP adds a transactional messaging mechanism along >> > with >> > > an >> > > > > >> > > > > > > > > idempotent >> > > > > >> > > > > > > > > > producer implementation to make sure that 1) duplicated >> > > > messages >> > > > > >> > > > > > sent >> > > > > >> > > > > > > > > from >> > > > > >> > > > > > > > > > the same identified producer can be detected on the >> broker >> > > > side, >> > > > > >> > > > > > and >> > > > > >> > > > > > > > 2) a >> > > > > >> > > > > > > > > > group of messages sent within a transaction will >> atomically >> > > be >> > > > > >> > > > > > either >> > > > > >> > > > > > > > > > reflected and fetchable to consumers or not as a whole. >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > The above wiki page provides a high-level view of the >> > > proposed >> > > > > >> > > > > > > changes >> > > > > >> > > > > > > > as >> > > > > >> > > > > > > > > > well as summarized guarantees. Initial draft of the >> > detailed >> > > > > >> > > > > > > > > implementation >> > > > > >> > > > > > > > > > design is described in this Google doc: >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > https://docs.google.com/document/d/11Jqy_ >> > > > > >> > > > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF >> > > > > >> > > > > > > > > > 0wSw9ra8 >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > We would love to hear your comments and suggestions. >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > Thanks, >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > -- Guozhang >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > >> > > >> > >> > >