On Mon, Mar 18, 2019 at 6:54 PM Ivan Kelly <iv...@apache.org> wrote: > Sorry it took me so long to take a look at this, last few weeks have > been hectic. > > I still haven't gone through it fully, but putting the transaction > buffer outside of the partition is fine with me. The thing I objected > most to in previous discussions was having a separate transaction > buffer per transaction as this would create a huge amount of metadata. >
Sure. The per transaction buffer might still be useful for some use cases. So I leave it extendable in the message header, so we can still have that implementation in future. > > So, I would suggest go with the sidecar approach and remove the inline > approach from the document to keep things simple. > I think inline approach is there just for comparison and make sure the decisions are made based on tradeoffs. The full proposal is describe in "a full proposal" section. > > I still have a bunch of questions about the design, but I need to chew > it over for a while. It would be good to have some sequence diagrams > of the interactions (https://sequencediagram.org/ <- this is a really > handy tool for building them). Does the diagram in overview work? If it doesn't work, I can try this tool. > We should precisely define how > retention will work between the partition and the partition's > transaction buffer, as it seems that that is the biggest can of worms > in the whole thing. > The retention is done via a retention cursor. So we don't reimplement any retention logic just for transaction buffer. A simple way to think about this : - if a transaction is aborted, the messages in transaction buffer are marked as `DELETED` in retention cursor. - if a transaction is committed, the messages in transaction buffer are kept there. The messages are only marked as `DELETED` in retention cursor, when the segment contains the transaction marker is deleted. We simply maintains a mapping between transaction and its transaction marker in the data partition, so we can quickly figure out whether the transaction markers are deleted or not. So all these are done via the "cursors" in Pulsar. There is really nothing reimplemented. > > -Ivan > > > On Fri, Mar 15, 2019 at 4:01 AM Sijie Guo <guosi...@gmail.com> wrote: > > > > Any other more comments on this topic? > > > > - Sijie > > > > On Sun, Mar 10, 2019 at 8:57 PM Jia Zhai <zhaiji...@gmail.com> wrote: > > > > > Thanks @Sijie for the PIP. > > > It has with enough details for me, It looks great, especially for the > > > sidecar > > > approach. Left some comments. > > > > > > Best Regards. > > > > > > > > > Jia Zhai > > > > > > Beijing, China > > > > > > Mobile: +86 15810491983 > > > > > > > > > > > > > > > On Fri, Mar 8, 2019 at 9:58 PM Sijie Guo <guosi...@gmail.com> wrote: > > > > > > > Hi Team, > > > > > > > > I have written down all my thoughts around supporting transactional > > > > streaming at Pulsar. > > > > > > > > > > > > > > > > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx > > > > > > > > Please take a look and feel free to comment on the google doc. We can > > > start > > > > from there. > > > > > > > > Also apologies first if there are in-consistency or typos or language > > > > errors in the doc. feel free to fix them. > > > > > > > > Thanks, > > > > Sijie > > > > > > > > On Tue, Mar 5, 2019 at 1:49 PM Sijie Guo <guosi...@gmail.com> wrote: > > > > > > > > > Will send the detailed proposal. We can go from there. > > > > > > > > > > One interesting question I would like to reply here. > > > > > > > > > > > But this is more microbatching than streaming. > > > > > > > > > > I think people usually have a wrong impression about > "microbatching" vs > > > > > "streaming". > > > > > The "microbatching" vs "streaming" are usually found in the context > > > > > talking about spark streaming vs storm/flink. > > > > > The context is more about how computing engine "scheduling" > > > computations. > > > > > > > > > > In reality, "batching" (microbatching) is almost everywhere in a > > > > > "streaming" pipeline. e.g. even in pulsar client, bookie journal. > > > > > In the streaming world, you will still do "microbatching" for many > > > > reasons > > > > > (such as throughput, windowing semantics and such). > > > > > but the "microbatching" here is not about "scheduling" anymore. > > > > > > > > > > - Sijie > > > > > > > > > > On Tue, Mar 5, 2019 at 4:20 AM Ivan Kelly <iv...@apache.org> > wrote: > > > > > > > > > >> > > My replies inline assume the above, so if you have a different > > > view > > > > of > > > > >> > > the general shape let me know. > > > > >> > > > > > > >> > > > > > >> > Yes. We are on the same view of the general shape. I will write > down > > > > the > > > > >> > details of my proposal and will share it with the community > > > tomorrow. > > > > >> > > > > >> Please do. I think there's a lot of details missing. > > > > >> > > > > >> > Diagrams can easily drawn to compare the differences here. I > will > > > > >> > incorporate into the proposal and show the differences. > > > > >> > > > > >> Diagrams and detailed design would be very useful. I still see a > bunch > > > > >> of unknowns in your design. > > > > >> > > > > >> > I don't think it is a hard dependency. All these components > should > > > be > > > > >> done > > > > >> > by interfaces. > > > > >> > > > > >> The architecture you're proposing requires a metadata service > that can > > > > >> scale horizontally to be able to scale, so it is a hard > dependency. An > > > > >> implementation backed by ZK would be only a toy. > > > > >> > > > > >> > I think I know why do you think interleaving is okay now. In > your > > > > mind, > > > > >> > transactions are carrying one message per partition. > > > > >> > > > > >> Well yes, or only a few. I think we have a very different view of > how > > > > >> transactions will be used. You seem to be favouring few large > > > > >> transactions, where as what Matteo and I have discussed is many > > > > >> smaller transactions, and this informs both designs. With large > > > > >> transactions, you're basically micro batching, and you can afford > to > > > > >> make the individual transactions more expensive since you have > fewer. > > > > >> For many smaller transactions, we need to make the transaction > itself > > > > >> as cheap as possible. > > > > >> > > > > >> > A common case of transaction in streaming, is > read-transfer-write: > > > > read > > > > >> a > > > > >> > batch of messages, process them and write the results to pulsar > and > > > > >> > acknowledges the messages. If you are doing this in a 100ms > window, > > > > the > > > > >> > data can still be large enough, especially the results can be > > > multiple > > > > >> > times of the input messages. With that being said, at high > > > throughput > > > > >> > transactional streaming, data can be large per transaction, > > > > continuously > > > > >> > storing entries of same transaction will have a huge benefit. > > > > >> > > > > >> Ok, sure, I'll give you that. If you are reusing a single > transaction > > > > >> for a lot of inputs there may be a lot of output messages on the > same > > > > >> topic. But this is more microbatching than streaming. > > > > >> > > > > >> > Another large category of use cases should be considered is > "batch" > > > > data > > > > >> > processing. You definitely don't want your future "batch"-ish > data > > > > >> > processing workload to jump back-and-forth in ledgers. In that > way, > > > > >> entries > > > > >> > of same transaction in same partition stored continuously will > huge > > > > >> > benefits. > > > > >> > > > > >> Why would you use transaction here? This is more of a usecase for > > > > >> idempotent producer. > > > > >> > > > > >> > Transaction coordinators are responsible for committing, > aborting > > > and > > > > >> > cleaning up transactions. > > > > >> > > > > >> How, not where. I'm my experience, the trickiest part in > transaction > > > > >> systems is the cleanup. > > > > >> > > > > >> > > I think having a ledger, or a shadow topic, per topic would > work > > > > fine. > > > > >> > > There's no need for indexing. We can already look up an > message > > > by > > > > >> > > messageid. This message id should be part of the transaction > > > commit > > > > >> > > message, and subsequently part of the commit marker written to > > > each > > > > >> > > topic involved in the transaction. > > > > >> > > > > > >> > > > > > >> > The assumption you have here is all the message ids can be > stored > > > into > > > > >> > within one commit message or the commit markers. > > > > >> > > > > >> Are you expecting to have more than 100k messages per transaction? > > > > >> > > > > >> > > Caching is no different to any > > > > >> > > other choice of storage for the message data. > > > > >> > > > > > >> > Caching behavior is very different from normal streaming case. > Since > > > > >> > accessing those entries of same transaction will be jump > > > > back-and-forth > > > > >> in > > > > >> > the shadow topic. > > > > >> > > > > >> Why would you be jumping back and forth for the same transaction > > > > >> within the same topic? You jump to the first message from that > topic > > > > >> in the transaction and read forward. > > > > >> > > > > >> > How are you going to delete data of aborted transactions in any > form > > > > of > > > > >> > interleaving storage? > > > > >> > If we don't compact, those data of aborted transactions will be > left > > > > >> there > > > > >> > forever. > > > > >> > > > > >> Aborted transactions should be very rare. The only conflict > should be > > > > >> on acknowledge, and these should only conflict if multiple > consumers > > > > >> get the same message, which shouldn't be the case. In any case, > they > > > > >> should be naturally cleaned up with topic itself (see below). > > > > >> > > > > >> > If you don't rewrite shadow topic to the main topic, you either > have > > > > to > > > > >> do > > > > >> > compaction or retain the shadow topic forever, no? > > > > >> > > > > >> No. Lets say that the maximum age of a single transaction is 1h. > That > > > > >> means you only need to retain an hour more of data more in the > shadow > > > > >> topic as you would have to retain in the main topic. Of course we > > > > >> wouldn't use wallclock time for this, but something around the low > > > > >> watermark or something, but that's the basic idea. I haven't > worked > > > > >> out all the details. > > > > >> > > > > >> I look forward to seeing your full design. > > > > >> > > > > >> -Ivan > > > > >> > > > > > > > > > > > > >