> I am not sure I understand the part of making it configurable via a classname. I believe it is better to simply have a flag "transactionEnableBatchWrites". Otherwise the matrix of possible implementations will grow without limits.
Good idea, I've modified the design and added a switch in the Configure Changes section. On Fri, Jun 10, 2022 at 7:14 PM Enrico Olivelli <eolive...@gmail.com> wrote: > I have read the PIP, and overall I agree with the design. > Good work ! > > I am not sure I understand the part of making it configurable via a > classname. > I believe it is better to simply have a flag > "transactionEnableBatchWrites". > Otherwise the matrix of possible implementations will grow without limits. > > Enrico > > Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng > <yubiao.f...@streamnative.io.invalid> ha scritto: > > > > Hi Pulsar community: > > > > I open a pip to discuss "Batch writing ledger for transaction operation" > > > > Proposal Link: https://github.com/apache/pulsar/issues/15370 > > > > ## Motivation > > > > Before reading the background, I suggest you read section “Transaction > > Flow” of [PIP-31: Transactional Streaming]( > > > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx > > ) > > > > ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow > </p> > >  > > In *Figure 1. Normal Flow vs. Transaction Flow*: > > - The gray square boxes represent logical components. > > - All the blue boxes represent logs. The logs are usually Managed ledger > > - Each arrow represents the request flow or message flow. These > operations > > occur in sequence indicated by the numbers next to each arrow. > > - The black arrows indicate those shared by transaction and normal flow. > > - The blue arrows represent normal-message-specific flow. > > - The orange arrows represent transaction-message-specific flow. > > - The sections below are numbered to match the operations showed in the > > diagram(differ from [PIP-31: Transactional Streaming]( > > > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx > > )) > > > > > > #### 2.4a Write logs to ledger which Acknowledgement State is PENDING_ACK > > [Acknowledgement State Machine]( > > > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u > ) > > tells about the changes of the Acknowledge State and why we need > persistent > > “The Log which the Acknowledgement State is PENDING_ACK”. > > #### 2.4a’ Mark messages is no longer useful with current subscription > > Update `Cursor` to mark the messages as DELETED. So they can be deleted. > > #### 3.2b Mark messages is no longer useful with current subscription > > The implementation here is exactly the same as 2.4a’, except that the > > execution is triggered later, after the Transaction has been committed. > > > > > > ### Analyze the performance cost of transaction > > As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow > vs. > > Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so the > > remaining orange arrows are the additional performance overhead of all > > transactions. > > In terms of whether or not each transaction is executed multiple times, > we > > can split the flow into two classes(Optimizing a process that is executed > > multiple times will yield more benefits): > > - Executed once each transaction: flow-1.x and flow-3.x > > - Executed multiple times each transaction: flow-2.x > > > > So optimizing the flow 2.x with a lot of execution is a good choice. > Let's > > split flow-2.x into two groups: those that cost more and those that cost > > less: > > - No disk written: flow-2.1 and fow-2.3 > > - Disk written: fow-2.1a, fow-2.3a, flow-2.4a > > > > From the previous analysis, we found that optimizing flow-2.1a, > flow-2.3a, > > flow-2.4a would bring the most benefits, and batch writes would be an > > excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a are > > both manipulations written into the transaction log, we can combine them > in > > one batch; 2.4a is the operation of writing pending ACK log, we combine > > multiple 2.4a's into one batch for processing. > > As we can see from “Transaction Flow” of [PIP-31: Transactional > Streaming]( > > > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx > ), > > these instructions are strictly sequential (guaranteed by the client): > > - flow-1.x end before flow-2.x start > > - flow-2.x end before flow-3.x start > > - flow-3.1a end before flow-3.3a start > > > > Therefore, the broker does not need to worry about the dependency of > these > > flows, we can also put flow-1a flow-31a and flow-3.3a into The > Transaction > > Log Batch too. > > > > ## Goal > > Provide a mechanism for Transaction Log Store and Pending Ack Store: > accept > > multiple write requests, buffer all those records, and persist to a > single > > BK entry(aka “Batched Entry”). This will improve broker->BK throughput. > > - Allow users to specify control of the max size, max record of The > Buffer. > > - Allow users to specify control max delay time of The Write Request. > > - Multiple raw data can be recovered from Batched Entry. > > - Configurable “batched implementation” and “common implementation” > switch. > > > > ## Approach > > ### Buffer requests and write Bookie > > Create a new protobuf record called “Batched Transaction Data” with an > > array inside. When receive a request, we put it in the array. > > > > Request: > > ``` > > [Request 1]{ data, callback } > > [Request 2]]{ data, callback } > > … > > … > > [Request N]]{ data, callback } > > ``` > > Buffer: > > ``` > > [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] } > > ``` > > Write Bookie: > > ``` > > LedgerHandle async write ( BatchedTransactionData to byteBuf ) > > LedgerHandle callback: ledgerId=1, entryId=1 > > ``` > > Request-Callback: > > ``` > > Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0} > > Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1} > > … > > … > > Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1} > > ``` > > > > ### Delete BatchedTransactionMeta > > [PIP 45]( > > > https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level > ) > > has supported batch index delete. So the Raw Data added to a batch can be > > with different batch indexes but with the same ledger ID and entry ID. > > > > Read: > > ``` > > [BatchedTransactionData] > > ``` > > After split: > > ``` > > {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0} > > {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1} > > … > > {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1} > > ``` > > Users can delete whole of the batched Entry: > > ```java > > cursor.delete( Position {ledgerId = 1, entryId = 1} ) > > ``` > > Users can also delete only part of the batched Entry: > > ```java > > cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} ) > > ``` > > > > ## Changes > > > > ### Protocol Changes > > New protobuf record to buffer requests. > > > > BatchedTransactionMetadataEntry > > ``` > > message BatchedTransactionMetadataEntry{ > > // Array for buffer transaction log data. > > repeated TransactionMetadataEntry transaction_log = 12; > > } > > ``` > > > > BatchedPendingAckMetadataEntry > > ``` > > message BatchedPendingAckMetadataEntry{ > > // Array for buffer pending ack data. > > repeated PendingAckMetadataEntry pending_ack_log=6; > > } > > ``` > > > > Note: To ensure forward compatibility, we need to distinguish the old > > TransactionMetadataEntry/PendingAckMetadataEntry data from the new > > BatchedTransactionData data, and we add A magic number in front of the > > bytes that proto serializes: > > ``` > > [Magic Num] [PendingAckMetadataEntry proto bytes] ==> [Enrty] > > ``` > > Read Entry: > > ``` > > /-- true --> [BatchedTransactionMetadataEntry] > > [Entry] --> has Magic Num ? > > \-- false --> [TransactionMetadataEntry] > > ``` > > > > > > ### API Changes > > > > BatchAddDataCallback > > The Transaction Coordinator does not directly operate the Managed Ledger, > > uses the Transaction Log Store to operate on Managed Ledger. The Managed > > Ledger write API provides a callback class: AddEntryCallback, the same > > Transaction Log Store that provides bulk writes, provides a callback > class: > > BatchAddDataCallback. <a href="#BatchedAddDataCallbackExplains">Explains > > why do we need BatchAddDataCallback </a>. > > > >  > > Figure.BatchAddDataCallback in Write Flow > > > > ```java > > interface BatchAddDataCallback { > > /** > > * Successed callback function for “add data asynchronously” > > * > > * @param posotion A Position is a pointer to a specific entry into > the > > managed ledger. > > * @param byteBuf The raw data which added. > > * @param batchIndex Raw data count in The whole Batched Entry. > > * @param batchSize The current raw data index in the batch. > > * @param ctx opaque context > > */ > > void addComplete(Position position, ByteBuf byteBuf, int batchIndex, > > int batchSize, Object context); > > /** > > * Failure callback function for “add data asynchronously” > > * > > * @param ctx opaque context > > */ > > void addFailed(ManagedLedgerException exception, Object ctx); > > } > > ``` > > > > ### Configuration Changes > > Add the Batch threshold parameters to control the refresh frequency. > > > > broker.conf > > ``` > > transactionLogBatchedWriteEnabled = false; > > transactionLogBatchedWriteMaxRecords= 512; > > transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4; > > transactionLogBatchedWriteMaxDelayInMillis= 1; > > > > pendingAckBatchedWriteEnabled = false; > > pendingAckBatchedWriteMaxRecords= 512; > > pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4; > > pendingAckBatchedWriteMaxDelayInMillis= 1; > > ``` > > > > ### Compatibility > > After the batch feature is enabled, users can only downgrade to the > larger > > than “first version that supports BatchedTransactionMeta reading” to > > consume data. Data in a lower version broker cannot be parsed, resulting > in > > data loss. We also provide <a href="#librarySupports"> Library support > for > > Compatibility with older versions Broker</a>, If the user uses this > library > > on older version Broker<sup>[0]</sup>, all new data results can be > > processed correctly and none of the data will be lost. > > > > ---- > > **[0]old version Broker**: Not less than 2.9.2 and 2.10 > > > > ### Observability > > When using the Batch feature, users will adjust the frequency of disk > > brushing to achieve the optimal performance. We provide two observable > > indicators for users' reference > > > > ``` > > BatchedDataStoreMXBeanImpl { > > /** The number of logs in each batch. **/ > > Rate batchRecordCount; > > /** The size of each batch. **/ > > Rate batchSizeCount; > > } > > ``` > > > > ## Test plan > > The test should cover the following cases: > > > > - The batch mechanism works abides by the total count, total size, and > max > > delay limitation. > > - The returned position for writing data is correct. > > - The managedCursor can delete and mark delete the > BatchedTransactionMeta. > > - Performance tests and compare before-after improvement. > > > > ## The appendix > > > > ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need > > BatchAddDataCallback </p> > > After all produced messages and acknowledgements to all partitions are > > committed or aborted, the TC writes the final COMMITTED or ABORTED > > transaction status message to its transaction log, indicating that the > > transaction is complete (shown as 3.3a in the diagram). At this point, > all > > the messages pertaining to the transaction in its transaction log can > > safely be removed. > > e.g. There are two transactions: > >  > > Transaction Log Write: > > ``` > > transaction_1: start transaction > > transaction_2: start transaction > > transaction_1: add partition to tx > > transaction_1: add subscription to tx > > transaction_2: add partition to tx > > transaction_1: commit > > ``` > > Bookie Write: > > ``` > > [Entry]{ BatchedTransactionData={LogRecordSize=6} } > > ``` > > Bookie Response: > > ``` > > {ledgerId=2, entryId=3} > > ``` > > Transaction Log callback: > > ``` > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0, > ctx} > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1, > ctx} > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2, > ctx} > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3, > ctx} > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4, > ctx} > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5, > ctx} > > ``` > > The entry(2,3) actually has 6 transaction logs, transaction_1 relations > to > > [0,2,3,5] and transaction_2 relations to [1,4]. > > After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are not > > needed because the transaction has already been completed, but now we > could > > not delete Entry(2,3), Because the logs [1,4] are still useful that > > transaction_2 is not finished and we still need them for the recovery > > operation. > > The BatchIndex and BatchSize can clearly indicate the location of each > > transaction log in the ledger. When the transaction log is no longer > used, > > Users can accurately delete it according to position and batchIndex. > > > > ### <p id="librarySupports">Library support for Compatibility with older > > versions Broker</p> > > In broker.conf we can configure the > > [transactionMetadataStoreProviderClassName]( > > > https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535 > ) > > to replace the implementation of TransactionLog, we can also configure > the > > [transactionPendingAckStoreProviderClassName]( > > > https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549 > ) > > to replace the implementation of PendingAckStore, > > We will provide a library containing the classes which can read batched > > transaction logs and pending ack logs: > > > > #### TransactionLogBatchEnryReadableImpl > > ```java > > public class TransactionLogBatchEnryReadableImpl extends > > MLTransactionLogImpl { > > /** > > * Different from the parent class, when this method reads an Entry, > it > > can identify > > * whether the Entry is transction log or batched transaction log. If > > the Entry is a > > * transaction log, it maintains the same logic as the parent class. > If > > the transaction log > > * is batched, it will be split into transaction log and processed > > according to the original > > * logic > > */ > > @Override > > void replayAsync(TransactionLogReplayCallback > > transactionLogReplayCallback); > > } > > ``` > > > > #### PendingAckStoreBatchEntryReadableImpl > > ```java > > public class PendingAckStoreBatchEntryReadableImpl extends > > MLPendingAckStore { > > /** > > * Different from the parent class, when this method reads an Entry, > it > > can identify > > * whether the Entry is pending ack log or batched pending ack log. > If > > the Entry is a > > * pending ack log, it maintains the same logic as the parent class. > If > > the pending ack > > * log is batched, it will be split into pending ack log and processed > > according to the > > * original logic > > */ > > void replayAsync(pendingAckHandle, executorService); > > } > > ``` > > > > How to use this library > > 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib > > 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is > > > “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”, > > set transactionPendingAckStoreProviderClassName is > > > “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”. > > 3. Restart broker. >