Thanks @Sijie for the PIP. It has with enough details for me, It looks great, especially for the sidecar approach. Left some comments.
Best Regards. Jia Zhai Beijing, China Mobile: +86 15810491983 On Fri, Mar 8, 2019 at 9:58 PM Sijie Guo <guosi...@gmail.com> wrote: > Hi Team, > > I have written down all my thoughts around supporting transactional > streaming at Pulsar. > > > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx > > Please take a look and feel free to comment on the google doc. We can start > from there. > > Also apologies first if there are in-consistency or typos or language > errors in the doc. feel free to fix them. > > Thanks, > Sijie > > On Tue, Mar 5, 2019 at 1:49 PM Sijie Guo <guosi...@gmail.com> wrote: > > > Will send the detailed proposal. We can go from there. > > > > One interesting question I would like to reply here. > > > > > But this is more microbatching than streaming. > > > > I think people usually have a wrong impression about "microbatching" vs > > "streaming". > > The "microbatching" vs "streaming" are usually found in the context > > talking about spark streaming vs storm/flink. > > The context is more about how computing engine "scheduling" computations. > > > > In reality, "batching" (microbatching) is almost everywhere in a > > "streaming" pipeline. e.g. even in pulsar client, bookie journal. > > In the streaming world, you will still do "microbatching" for many > reasons > > (such as throughput, windowing semantics and such). > > but the "microbatching" here is not about "scheduling" anymore. > > > > - Sijie > > > > On Tue, Mar 5, 2019 at 4:20 AM Ivan Kelly <iv...@apache.org> wrote: > > > >> > > My replies inline assume the above, so if you have a different view > of > >> > > the general shape let me know. > >> > > > >> > > >> > Yes. We are on the same view of the general shape. I will write down > the > >> > details of my proposal and will share it with the community tomorrow. > >> > >> Please do. I think there's a lot of details missing. > >> > >> > Diagrams can easily drawn to compare the differences here. I will > >> > incorporate into the proposal and show the differences. > >> > >> Diagrams and detailed design would be very useful. I still see a bunch > >> of unknowns in your design. > >> > >> > I don't think it is a hard dependency. All these components should be > >> done > >> > by interfaces. > >> > >> The architecture you're proposing requires a metadata service that can > >> scale horizontally to be able to scale, so it is a hard dependency. An > >> implementation backed by ZK would be only a toy. > >> > >> > I think I know why do you think interleaving is okay now. In your > mind, > >> > transactions are carrying one message per partition. > >> > >> Well yes, or only a few. I think we have a very different view of how > >> transactions will be used. You seem to be favouring few large > >> transactions, where as what Matteo and I have discussed is many > >> smaller transactions, and this informs both designs. With large > >> transactions, you're basically micro batching, and you can afford to > >> make the individual transactions more expensive since you have fewer. > >> For many smaller transactions, we need to make the transaction itself > >> as cheap as possible. > >> > >> > A common case of transaction in streaming, is read-transfer-write: > read > >> a > >> > batch of messages, process them and write the results to pulsar and > >> > acknowledges the messages. If you are doing this in a 100ms window, > the > >> > data can still be large enough, especially the results can be multiple > >> > times of the input messages. With that being said, at high throughput > >> > transactional streaming, data can be large per transaction, > continuously > >> > storing entries of same transaction will have a huge benefit. > >> > >> Ok, sure, I'll give you that. If you are reusing a single transaction > >> for a lot of inputs there may be a lot of output messages on the same > >> topic. But this is more microbatching than streaming. > >> > >> > Another large category of use cases should be considered is "batch" > data > >> > processing. You definitely don't want your future "batch"-ish data > >> > processing workload to jump back-and-forth in ledgers. In that way, > >> entries > >> > of same transaction in same partition stored continuously will huge > >> > benefits. > >> > >> Why would you use transaction here? This is more of a usecase for > >> idempotent producer. > >> > >> > Transaction coordinators are responsible for committing, aborting and > >> > cleaning up transactions. > >> > >> How, not where. I'm my experience, the trickiest part in transaction > >> systems is the cleanup. > >> > >> > > I think having a ledger, or a shadow topic, per topic would work > fine. > >> > > There's no need for indexing. We can already look up an message by > >> > > messageid. This message id should be part of the transaction commit > >> > > message, and subsequently part of the commit marker written to each > >> > > topic involved in the transaction. > >> > > >> > > >> > The assumption you have here is all the message ids can be stored into > >> > within one commit message or the commit markers. > >> > >> Are you expecting to have more than 100k messages per transaction? > >> > >> > > Caching is no different to any > >> > > other choice of storage for the message data. > >> > > >> > Caching behavior is very different from normal streaming case. Since > >> > accessing those entries of same transaction will be jump > back-and-forth > >> in > >> > the shadow topic. > >> > >> Why would you be jumping back and forth for the same transaction > >> within the same topic? You jump to the first message from that topic > >> in the transaction and read forward. > >> > >> > How are you going to delete data of aborted transactions in any form > of > >> > interleaving storage? > >> > If we don't compact, those data of aborted transactions will be left > >> there > >> > forever. > >> > >> Aborted transactions should be very rare. The only conflict should be > >> on acknowledge, and these should only conflict if multiple consumers > >> get the same message, which shouldn't be the case. In any case, they > >> should be naturally cleaned up with topic itself (see below). > >> > >> > If you don't rewrite shadow topic to the main topic, you either have > to > >> do > >> > compaction or retain the shadow topic forever, no? > >> > >> No. Lets say that the maximum age of a single transaction is 1h. That > >> means you only need to retain an hour more of data more in the shadow > >> topic as you would have to retain in the main topic. Of course we > >> wouldn't use wallclock time for this, but something around the low > >> watermark or something, but that's the basic idea. I haven't worked > >> out all the details. > >> > >> I look forward to seeing your full design. > >> > >> -Ivan > >> > > >