Hi Team, I have written down all my thoughts around supporting transactional streaming at Pulsar.
https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx Please take a look and feel free to comment on the google doc. We can start from there. Also apologies first if there are in-consistency or typos or language errors in the doc. feel free to fix them. Thanks, Sijie On Tue, Mar 5, 2019 at 1:49 PM Sijie Guo <guosi...@gmail.com> wrote: > Will send the detailed proposal. We can go from there. > > One interesting question I would like to reply here. > > > But this is more microbatching than streaming. > > I think people usually have a wrong impression about "microbatching" vs > "streaming". > The "microbatching" vs "streaming" are usually found in the context > talking about spark streaming vs storm/flink. > The context is more about how computing engine "scheduling" computations. > > In reality, "batching" (microbatching) is almost everywhere in a > "streaming" pipeline. e.g. even in pulsar client, bookie journal. > In the streaming world, you will still do "microbatching" for many reasons > (such as throughput, windowing semantics and such). > but the "microbatching" here is not about "scheduling" anymore. > > - Sijie > > On Tue, Mar 5, 2019 at 4:20 AM Ivan Kelly <iv...@apache.org> wrote: > >> > > My replies inline assume the above, so if you have a different view of >> > > the general shape let me know. >> > > >> > >> > Yes. We are on the same view of the general shape. I will write down the >> > details of my proposal and will share it with the community tomorrow. >> >> Please do. I think there's a lot of details missing. >> >> > Diagrams can easily drawn to compare the differences here. I will >> > incorporate into the proposal and show the differences. >> >> Diagrams and detailed design would be very useful. I still see a bunch >> of unknowns in your design. >> >> > I don't think it is a hard dependency. All these components should be >> done >> > by interfaces. >> >> The architecture you're proposing requires a metadata service that can >> scale horizontally to be able to scale, so it is a hard dependency. An >> implementation backed by ZK would be only a toy. >> >> > I think I know why do you think interleaving is okay now. In your mind, >> > transactions are carrying one message per partition. >> >> Well yes, or only a few. I think we have a very different view of how >> transactions will be used. You seem to be favouring few large >> transactions, where as what Matteo and I have discussed is many >> smaller transactions, and this informs both designs. With large >> transactions, you're basically micro batching, and you can afford to >> make the individual transactions more expensive since you have fewer. >> For many smaller transactions, we need to make the transaction itself >> as cheap as possible. >> >> > A common case of transaction in streaming, is read-transfer-write: read >> a >> > batch of messages, process them and write the results to pulsar and >> > acknowledges the messages. If you are doing this in a 100ms window, the >> > data can still be large enough, especially the results can be multiple >> > times of the input messages. With that being said, at high throughput >> > transactional streaming, data can be large per transaction, continuously >> > storing entries of same transaction will have a huge benefit. >> >> Ok, sure, I'll give you that. If you are reusing a single transaction >> for a lot of inputs there may be a lot of output messages on the same >> topic. But this is more microbatching than streaming. >> >> > Another large category of use cases should be considered is "batch" data >> > processing. You definitely don't want your future "batch"-ish data >> > processing workload to jump back-and-forth in ledgers. In that way, >> entries >> > of same transaction in same partition stored continuously will huge >> > benefits. >> >> Why would you use transaction here? This is more of a usecase for >> idempotent producer. >> >> > Transaction coordinators are responsible for committing, aborting and >> > cleaning up transactions. >> >> How, not where. I'm my experience, the trickiest part in transaction >> systems is the cleanup. >> >> > > I think having a ledger, or a shadow topic, per topic would work fine. >> > > There's no need for indexing. We can already look up an message by >> > > messageid. This message id should be part of the transaction commit >> > > message, and subsequently part of the commit marker written to each >> > > topic involved in the transaction. >> > >> > >> > The assumption you have here is all the message ids can be stored into >> > within one commit message or the commit markers. >> >> Are you expecting to have more than 100k messages per transaction? >> >> > > Caching is no different to any >> > > other choice of storage for the message data. >> > >> > Caching behavior is very different from normal streaming case. Since >> > accessing those entries of same transaction will be jump back-and-forth >> in >> > the shadow topic. >> >> Why would you be jumping back and forth for the same transaction >> within the same topic? You jump to the first message from that topic >> in the transaction and read forward. >> >> > How are you going to delete data of aborted transactions in any form of >> > interleaving storage? >> > If we don't compact, those data of aborted transactions will be left >> there >> > forever. >> >> Aborted transactions should be very rare. The only conflict should be >> on acknowledge, and these should only conflict if multiple consumers >> get the same message, which shouldn't be the case. In any case, they >> should be naturally cleaned up with topic itself (see below). >> >> > If you don't rewrite shadow topic to the main topic, you either have to >> do >> > compaction or retain the shadow topic forever, no? >> >> No. Lets say that the maximum age of a single transaction is 1h. That >> means you only need to retain an hour more of data more in the shadow >> topic as you would have to retain in the main topic. Of course we >> wouldn't use wallclock time for this, but something around the low >> watermark or something, but that's the basic idea. I haven't worked >> out all the details. >> >> I look forward to seeing your full design. >> >> -Ivan >> >