Hi Pulsar community: I open a pip to discuss "Batch writing ledger for transaction operation"
Proposal Link: https://github.com/apache/pulsar/issues/15370 ## Motivation Before reading the background, I suggest you read section “Transaction Flow” of [PIP-31: Transactional Streaming]( https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx ) ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow </p>  In *Figure 1. Normal Flow vs. Transaction Flow*: - The gray square boxes represent logical components. - All the blue boxes represent logs. The logs are usually Managed ledger - Each arrow represents the request flow or message flow. These operations occur in sequence indicated by the numbers next to each arrow. - The black arrows indicate those shared by transaction and normal flow. - The blue arrows represent normal-message-specific flow. - The orange arrows represent transaction-message-specific flow. - The sections below are numbered to match the operations showed in the diagram(differ from [PIP-31: Transactional Streaming]( https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx )) #### 2.4a Write logs to ledger which Acknowledgement State is PENDING_ACK [Acknowledgement State Machine]( https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u) tells about the changes of the Acknowledge State and why we need persistent “The Log which the Acknowledgement State is PENDING_ACK”. #### 2.4a’ Mark messages is no longer useful with current subscription Update `Cursor` to mark the messages as DELETED. So they can be deleted. #### 3.2b Mark messages is no longer useful with current subscription The implementation here is exactly the same as 2.4a’, except that the execution is triggered later, after the Transaction has been committed. ### Analyze the performance cost of transaction As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow vs. Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so the remaining orange arrows are the additional performance overhead of all transactions. In terms of whether or not each transaction is executed multiple times, we can split the flow into two classes(Optimizing a process that is executed multiple times will yield more benefits): - Executed once each transaction: flow-1.x and flow-3.x - Executed multiple times each transaction: flow-2.x So optimizing the flow 2.x with a lot of execution is a good choice. Let's split flow-2.x into two groups: those that cost more and those that cost less: - No disk written: flow-2.1 and fow-2.3 - Disk written: fow-2.1a, fow-2.3a, flow-2.4a >From the previous analysis, we found that optimizing flow-2.1a, flow-2.3a, flow-2.4a would bring the most benefits, and batch writes would be an excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a are both manipulations written into the transaction log, we can combine them in one batch; 2.4a is the operation of writing pending ACK log, we combine multiple 2.4a's into one batch for processing. As we can see from “Transaction Flow” of [PIP-31: Transactional Streaming]( https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx), these instructions are strictly sequential (guaranteed by the client): - flow-1.x end before flow-2.x start - flow-2.x end before flow-3.x start - flow-3.1a end before flow-3.3a start Therefore, the broker does not need to worry about the dependency of these flows, we can also put flow-1a flow-31a and flow-3.3a into The Transaction Log Batch too. ## Goal Provide a mechanism for Transaction Log Store and Pending Ack Store: accept multiple write requests, buffer all those records, and persist to a single BK entry(aka “Batched Entry”). This will improve broker->BK throughput. - Allow users to specify control of the max size, max record of The Buffer. - Allow users to specify control max delay time of The Write Request. - Multiple raw data can be recovered from Batched Entry. - Configurable “batched implementation” and “common implementation” switch. ## Approach ### Buffer requests and write Bookie Create a new protobuf record called “Batched Transaction Data” with an array inside. When receive a request, we put it in the array. Request: ``` [Request 1]{ data, callback } [Request 2]]{ data, callback } … … [Request N]]{ data, callback } ``` Buffer: ``` [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] } ``` Write Bookie: ``` LedgerHandle async write ( BatchedTransactionData to byteBuf ) LedgerHandle callback: ledgerId=1, entryId=1 ``` Request-Callback: ``` Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0} Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1} … … Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1} ``` ### Delete BatchedTransactionMeta [PIP 45]( https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level) has supported batch index delete. So the Raw Data added to a batch can be with different batch indexes but with the same ledger ID and entry ID. Read: ``` [BatchedTransactionData] ``` After split: ``` {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0} {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1} … {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1} ``` Users can delete whole of the batched Entry: ```java cursor.delete( Position {ledgerId = 1, entryId = 1} ) ``` Users can also delete only part of the batched Entry: ```java cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} ) ``` ## Changes ### Protocol Changes New protobuf record to buffer requests. BatchedTransactionMetadataEntry ``` message BatchedTransactionMetadataEntry{ // Array for buffer transaction log data. repeated TransactionMetadataEntry transaction_log = 12; } ``` BatchedPendingAckMetadataEntry ``` message BatchedPendingAckMetadataEntry{ // Array for buffer pending ack data. repeated PendingAckMetadataEntry pending_ack_log=6; } ``` Note: To ensure forward compatibility, we need to distinguish the old TransactionMetadataEntry/PendingAckMetadataEntry data from the new BatchedTransactionData data, and we add A magic number in front of the bytes that proto serializes: ``` [Magic Num] [PendingAckMetadataEntry proto bytes] ==> [Enrty] ``` Read Entry: ``` /-- true --> [BatchedTransactionMetadataEntry] [Entry] --> has Magic Num ? \-- false --> [TransactionMetadataEntry] ``` ### API Changes BatchAddDataCallback The Transaction Coordinator does not directly operate the Managed Ledger, uses the Transaction Log Store to operate on Managed Ledger. The Managed Ledger write API provides a callback class: AddEntryCallback, the same Transaction Log Store that provides bulk writes, provides a callback class: BatchAddDataCallback. <a href="#BatchedAddDataCallbackExplains">Explains why do we need BatchAddDataCallback </a>.  Figure.BatchAddDataCallback in Write Flow ```java interface BatchAddDataCallback { /** * Successed callback function for “add data asynchronously” * * @param posotion A Position is a pointer to a specific entry into the managed ledger. * @param byteBuf The raw data which added. * @param batchIndex Raw data count in The whole Batched Entry. * @param batchSize The current raw data index in the batch. * @param ctx opaque context */ void addComplete(Position position, ByteBuf byteBuf, int batchIndex, int batchSize, Object context); /** * Failure callback function for “add data asynchronously” * * @param ctx opaque context */ void addFailed(ManagedLedgerException exception, Object ctx); } ``` ### Configuration Changes Add the Batch threshold parameters to control the refresh frequency. broker.conf ``` transactionLogBatchedWriteEnabled = false; transactionLogBatchedWriteMaxRecords= 512; transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4; transactionLogBatchedWriteMaxDelayInMillis= 1; pendingAckBatchedWriteEnabled = false; pendingAckBatchedWriteMaxRecords= 512; pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4; pendingAckBatchedWriteMaxDelayInMillis= 1; ``` ### Compatibility After the batch feature is enabled, users can only downgrade to the larger than “first version that supports BatchedTransactionMeta reading” to consume data. Data in a lower version broker cannot be parsed, resulting in data loss. We also provide <a href="#librarySupports"> Library support for Compatibility with older versions Broker</a>, If the user uses this library on older version Broker<sup>[0]</sup>, all new data results can be processed correctly and none of the data will be lost. ---- **[0]old version Broker**: Not less than 2.9.2 and 2.10 ### Observability When using the Batch feature, users will adjust the frequency of disk brushing to achieve the optimal performance. We provide two observable indicators for users' reference ``` BatchedDataStoreMXBeanImpl { /** The number of logs in each batch. **/ Rate batchRecordCount; /** The size of each batch. **/ Rate batchSizeCount; } ``` ## Test plan The test should cover the following cases: - The batch mechanism works abides by the total count, total size, and max delay limitation. - The returned position for writing data is correct. - The managedCursor can delete and mark delete the BatchedTransactionMeta. - Performance tests and compare before-after improvement. ## The appendix ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need BatchAddDataCallback </p> After all produced messages and acknowledgements to all partitions are committed or aborted, the TC writes the final COMMITTED or ABORTED transaction status message to its transaction log, indicating that the transaction is complete (shown as 3.3a in the diagram). At this point, all the messages pertaining to the transaction in its transaction log can safely be removed. e.g. There are two transactions:  Transaction Log Write: ``` transaction_1: start transaction transaction_2: start transaction transaction_1: add partition to tx transaction_1: add subscription to tx transaction_2: add partition to tx transaction_1: commit ``` Bookie Write: ``` [Entry]{ BatchedTransactionData={LogRecordSize=6} } ``` Bookie Response: ``` {ledgerId=2, entryId=3} ``` Transaction Log callback: ``` {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0, ctx} {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1, ctx} {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2, ctx} {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3, ctx} {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4, ctx} {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5, ctx} ``` The entry(2,3) actually has 6 transaction logs, transaction_1 relations to [0,2,3,5] and transaction_2 relations to [1,4]. After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are not needed because the transaction has already been completed, but now we could not delete Entry(2,3), Because the logs [1,4] are still useful that transaction_2 is not finished and we still need them for the recovery operation. The BatchIndex and BatchSize can clearly indicate the location of each transaction log in the ledger. When the transaction log is no longer used, Users can accurately delete it according to position and batchIndex. ### <p id="librarySupports">Library support for Compatibility with older versions Broker</p> In broker.conf we can configure the [transactionMetadataStoreProviderClassName]( https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535) to replace the implementation of TransactionLog, we can also configure the [transactionPendingAckStoreProviderClassName]( https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549) to replace the implementation of PendingAckStore, We will provide a library containing the classes which can read batched transaction logs and pending ack logs: #### TransactionLogBatchEnryReadableImpl ```java public class TransactionLogBatchEnryReadableImpl extends MLTransactionLogImpl { /** * Different from the parent class, when this method reads an Entry, it can identify * whether the Entry is transction log or batched transaction log. If the Entry is a * transaction log, it maintains the same logic as the parent class. If the transaction log * is batched, it will be split into transaction log and processed according to the original * logic */ @Override void replayAsync(TransactionLogReplayCallback transactionLogReplayCallback); } ``` #### PendingAckStoreBatchEntryReadableImpl ```java public class PendingAckStoreBatchEntryReadableImpl extends MLPendingAckStore { /** * Different from the parent class, when this method reads an Entry, it can identify * whether the Entry is pending ack log or batched pending ack log. If the Entry is a * pending ack log, it maintains the same logic as the parent class. If the pending ack * log is batched, it will be split into pending ack log and processed according to the * original logic */ void replayAsync(pendingAckHandle, executorService); } ``` How to use this library 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”, set transactionPendingAckStoreProviderClassName is “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”. 3. Restart broker.