Will send the detailed proposal. We can go from there. One interesting question I would like to reply here.
> But this is more microbatching than streaming. I think people usually have a wrong impression about "microbatching" vs "streaming". The "microbatching" vs "streaming" are usually found in the context talking about spark streaming vs storm/flink. The context is more about how computing engine "scheduling" computations. In reality, "batching" (microbatching) is almost everywhere in a "streaming" pipeline. e.g. even in pulsar client, bookie journal. In the streaming world, you will still do "microbatching" for many reasons (such as throughput, windowing semantics and such). but the "microbatching" here is not about "scheduling" anymore. - Sijie On Tue, Mar 5, 2019 at 4:20 AM Ivan Kelly <iv...@apache.org> wrote: > > > My replies inline assume the above, so if you have a different view of > > > the general shape let me know. > > > > > > > Yes. We are on the same view of the general shape. I will write down the > > details of my proposal and will share it with the community tomorrow. > > Please do. I think there's a lot of details missing. > > > Diagrams can easily drawn to compare the differences here. I will > > incorporate into the proposal and show the differences. > > Diagrams and detailed design would be very useful. I still see a bunch > of unknowns in your design. > > > I don't think it is a hard dependency. All these components should be > done > > by interfaces. > > The architecture you're proposing requires a metadata service that can > scale horizontally to be able to scale, so it is a hard dependency. An > implementation backed by ZK would be only a toy. > > > I think I know why do you think interleaving is okay now. In your mind, > > transactions are carrying one message per partition. > > Well yes, or only a few. I think we have a very different view of how > transactions will be used. You seem to be favouring few large > transactions, where as what Matteo and I have discussed is many > smaller transactions, and this informs both designs. With large > transactions, you're basically micro batching, and you can afford to > make the individual transactions more expensive since you have fewer. > For many smaller transactions, we need to make the transaction itself > as cheap as possible. > > > A common case of transaction in streaming, is read-transfer-write: read a > > batch of messages, process them and write the results to pulsar and > > acknowledges the messages. If you are doing this in a 100ms window, the > > data can still be large enough, especially the results can be multiple > > times of the input messages. With that being said, at high throughput > > transactional streaming, data can be large per transaction, continuously > > storing entries of same transaction will have a huge benefit. > > Ok, sure, I'll give you that. If you are reusing a single transaction > for a lot of inputs there may be a lot of output messages on the same > topic. But this is more microbatching than streaming. > > > Another large category of use cases should be considered is "batch" data > > processing. You definitely don't want your future "batch"-ish data > > processing workload to jump back-and-forth in ledgers. In that way, > entries > > of same transaction in same partition stored continuously will huge > > benefits. > > Why would you use transaction here? This is more of a usecase for > idempotent producer. > > > Transaction coordinators are responsible for committing, aborting and > > cleaning up transactions. > > How, not where. I'm my experience, the trickiest part in transaction > systems is the cleanup. > > > > I think having a ledger, or a shadow topic, per topic would work fine. > > > There's no need for indexing. We can already look up an message by > > > messageid. This message id should be part of the transaction commit > > > message, and subsequently part of the commit marker written to each > > > topic involved in the transaction. > > > > > > The assumption you have here is all the message ids can be stored into > > within one commit message or the commit markers. > > Are you expecting to have more than 100k messages per transaction? > > > > Caching is no different to any > > > other choice of storage for the message data. > > > > Caching behavior is very different from normal streaming case. Since > > accessing those entries of same transaction will be jump back-and-forth > in > > the shadow topic. > > Why would you be jumping back and forth for the same transaction > within the same topic? You jump to the first message from that topic > in the transaction and read forward. > > > How are you going to delete data of aborted transactions in any form of > > interleaving storage? > > If we don't compact, those data of aborted transactions will be left > there > > forever. > > Aborted transactions should be very rare. The only conflict should be > on acknowledge, and these should only conflict if multiple consumers > get the same message, which shouldn't be the case. In any case, they > should be naturally cleaned up with topic itself (see below). > > > If you don't rewrite shadow topic to the main topic, you either have to > do > > compaction or retain the shadow topic forever, no? > > No. Lets say that the maximum age of a single transaction is 1h. That > means you only need to retain an hour more of data more in the shadow > topic as you would have to retain in the main topic. Of course we > wouldn't use wallclock time for this, but something around the low > watermark or something, but that's the basic idea. I haven't worked > out all the details. > > I look forward to seeing your full design. > > -Ivan >