I have read the PIP, and overall I agree with the design. Good work ! I am not sure I understand the part of making it configurable via a classname. I believe it is better to simply have a flag "transactionEnableBatchWrites". Otherwise the matrix of possible implementations will grow without limits.
Enrico Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng <yubiao.f...@streamnative.io.invalid> ha scritto: > > Hi Pulsar community: > > I open a pip to discuss "Batch writing ledger for transaction operation" > > Proposal Link: https://github.com/apache/pulsar/issues/15370 > > ## Motivation > > Before reading the background, I suggest you read section “Transaction > Flow” of [PIP-31: Transactional Streaming]( > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx > ) > > ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow </p> >  > In *Figure 1. Normal Flow vs. Transaction Flow*: > - The gray square boxes represent logical components. > - All the blue boxes represent logs. The logs are usually Managed ledger > - Each arrow represents the request flow or message flow. These operations > occur in sequence indicated by the numbers next to each arrow. > - The black arrows indicate those shared by transaction and normal flow. > - The blue arrows represent normal-message-specific flow. > - The orange arrows represent transaction-message-specific flow. > - The sections below are numbered to match the operations showed in the > diagram(differ from [PIP-31: Transactional Streaming]( > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx > )) > > > #### 2.4a Write logs to ledger which Acknowledgement State is PENDING_ACK > [Acknowledgement State Machine]( > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u) > tells about the changes of the Acknowledge State and why we need persistent > “The Log which the Acknowledgement State is PENDING_ACK”. > #### 2.4a’ Mark messages is no longer useful with current subscription > Update `Cursor` to mark the messages as DELETED. So they can be deleted. > #### 3.2b Mark messages is no longer useful with current subscription > The implementation here is exactly the same as 2.4a’, except that the > execution is triggered later, after the Transaction has been committed. > > > ### Analyze the performance cost of transaction > As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow vs. > Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so the > remaining orange arrows are the additional performance overhead of all > transactions. > In terms of whether or not each transaction is executed multiple times, we > can split the flow into two classes(Optimizing a process that is executed > multiple times will yield more benefits): > - Executed once each transaction: flow-1.x and flow-3.x > - Executed multiple times each transaction: flow-2.x > > So optimizing the flow 2.x with a lot of execution is a good choice. Let's > split flow-2.x into two groups: those that cost more and those that cost > less: > - No disk written: flow-2.1 and fow-2.3 > - Disk written: fow-2.1a, fow-2.3a, flow-2.4a > > From the previous analysis, we found that optimizing flow-2.1a, flow-2.3a, > flow-2.4a would bring the most benefits, and batch writes would be an > excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a are > both manipulations written into the transaction log, we can combine them in > one batch; 2.4a is the operation of writing pending ACK log, we combine > multiple 2.4a's into one batch for processing. > As we can see from “Transaction Flow” of [PIP-31: Transactional Streaming]( > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx), > these instructions are strictly sequential (guaranteed by the client): > - flow-1.x end before flow-2.x start > - flow-2.x end before flow-3.x start > - flow-3.1a end before flow-3.3a start > > Therefore, the broker does not need to worry about the dependency of these > flows, we can also put flow-1a flow-31a and flow-3.3a into The Transaction > Log Batch too. > > ## Goal > Provide a mechanism for Transaction Log Store and Pending Ack Store: accept > multiple write requests, buffer all those records, and persist to a single > BK entry(aka “Batched Entry”). This will improve broker->BK throughput. > - Allow users to specify control of the max size, max record of The Buffer. > - Allow users to specify control max delay time of The Write Request. > - Multiple raw data can be recovered from Batched Entry. > - Configurable “batched implementation” and “common implementation” switch. > > ## Approach > ### Buffer requests and write Bookie > Create a new protobuf record called “Batched Transaction Data” with an > array inside. When receive a request, we put it in the array. > > Request: > ``` > [Request 1]{ data, callback } > [Request 2]]{ data, callback } > … > … > [Request N]]{ data, callback } > ``` > Buffer: > ``` > [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] } > ``` > Write Bookie: > ``` > LedgerHandle async write ( BatchedTransactionData to byteBuf ) > LedgerHandle callback: ledgerId=1, entryId=1 > ``` > Request-Callback: > ``` > Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0} > Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1} > … > … > Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1} > ``` > > ### Delete BatchedTransactionMeta > [PIP 45]( > https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level) > has supported batch index delete. So the Raw Data added to a batch can be > with different batch indexes but with the same ledger ID and entry ID. > > Read: > ``` > [BatchedTransactionData] > ``` > After split: > ``` > {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0} > {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1} > … > {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1} > ``` > Users can delete whole of the batched Entry: > ```java > cursor.delete( Position {ledgerId = 1, entryId = 1} ) > ``` > Users can also delete only part of the batched Entry: > ```java > cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} ) > ``` > > ## Changes > > ### Protocol Changes > New protobuf record to buffer requests. > > BatchedTransactionMetadataEntry > ``` > message BatchedTransactionMetadataEntry{ > // Array for buffer transaction log data. > repeated TransactionMetadataEntry transaction_log = 12; > } > ``` > > BatchedPendingAckMetadataEntry > ``` > message BatchedPendingAckMetadataEntry{ > // Array for buffer pending ack data. > repeated PendingAckMetadataEntry pending_ack_log=6; > } > ``` > > Note: To ensure forward compatibility, we need to distinguish the old > TransactionMetadataEntry/PendingAckMetadataEntry data from the new > BatchedTransactionData data, and we add A magic number in front of the > bytes that proto serializes: > ``` > [Magic Num] [PendingAckMetadataEntry proto bytes] ==> [Enrty] > ``` > Read Entry: > ``` > /-- true --> [BatchedTransactionMetadataEntry] > [Entry] --> has Magic Num ? > \-- false --> [TransactionMetadataEntry] > ``` > > > ### API Changes > > BatchAddDataCallback > The Transaction Coordinator does not directly operate the Managed Ledger, > uses the Transaction Log Store to operate on Managed Ledger. The Managed > Ledger write API provides a callback class: AddEntryCallback, the same > Transaction Log Store that provides bulk writes, provides a callback class: > BatchAddDataCallback. <a href="#BatchedAddDataCallbackExplains">Explains > why do we need BatchAddDataCallback </a>. > >  > Figure.BatchAddDataCallback in Write Flow > > ```java > interface BatchAddDataCallback { > /** > * Successed callback function for “add data asynchronously” > * > * @param posotion A Position is a pointer to a specific entry into the > managed ledger. > * @param byteBuf The raw data which added. > * @param batchIndex Raw data count in The whole Batched Entry. > * @param batchSize The current raw data index in the batch. > * @param ctx opaque context > */ > void addComplete(Position position, ByteBuf byteBuf, int batchIndex, > int batchSize, Object context); > /** > * Failure callback function for “add data asynchronously” > * > * @param ctx opaque context > */ > void addFailed(ManagedLedgerException exception, Object ctx); > } > ``` > > ### Configuration Changes > Add the Batch threshold parameters to control the refresh frequency. > > broker.conf > ``` > transactionLogBatchedWriteEnabled = false; > transactionLogBatchedWriteMaxRecords= 512; > transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4; > transactionLogBatchedWriteMaxDelayInMillis= 1; > > pendingAckBatchedWriteEnabled = false; > pendingAckBatchedWriteMaxRecords= 512; > pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4; > pendingAckBatchedWriteMaxDelayInMillis= 1; > ``` > > ### Compatibility > After the batch feature is enabled, users can only downgrade to the larger > than “first version that supports BatchedTransactionMeta reading” to > consume data. Data in a lower version broker cannot be parsed, resulting in > data loss. We also provide <a href="#librarySupports"> Library support for > Compatibility with older versions Broker</a>, If the user uses this library > on older version Broker<sup>[0]</sup>, all new data results can be > processed correctly and none of the data will be lost. > > ---- > **[0]old version Broker**: Not less than 2.9.2 and 2.10 > > ### Observability > When using the Batch feature, users will adjust the frequency of disk > brushing to achieve the optimal performance. We provide two observable > indicators for users' reference > > ``` > BatchedDataStoreMXBeanImpl { > /** The number of logs in each batch. **/ > Rate batchRecordCount; > /** The size of each batch. **/ > Rate batchSizeCount; > } > ``` > > ## Test plan > The test should cover the following cases: > > - The batch mechanism works abides by the total count, total size, and max > delay limitation. > - The returned position for writing data is correct. > - The managedCursor can delete and mark delete the BatchedTransactionMeta. > - Performance tests and compare before-after improvement. > > ## The appendix > > ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need > BatchAddDataCallback </p> > After all produced messages and acknowledgements to all partitions are > committed or aborted, the TC writes the final COMMITTED or ABORTED > transaction status message to its transaction log, indicating that the > transaction is complete (shown as 3.3a in the diagram). At this point, all > the messages pertaining to the transaction in its transaction log can > safely be removed. > e.g. There are two transactions: >  > Transaction Log Write: > ``` > transaction_1: start transaction > transaction_2: start transaction > transaction_1: add partition to tx > transaction_1: add subscription to tx > transaction_2: add partition to tx > transaction_1: commit > ``` > Bookie Write: > ``` > [Entry]{ BatchedTransactionData={LogRecordSize=6} } > ``` > Bookie Response: > ``` > {ledgerId=2, entryId=3} > ``` > Transaction Log callback: > ``` > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0, ctx} > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1, ctx} > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2, ctx} > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3, ctx} > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4, ctx} > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5, ctx} > ``` > The entry(2,3) actually has 6 transaction logs, transaction_1 relations to > [0,2,3,5] and transaction_2 relations to [1,4]. > After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are not > needed because the transaction has already been completed, but now we could > not delete Entry(2,3), Because the logs [1,4] are still useful that > transaction_2 is not finished and we still need them for the recovery > operation. > The BatchIndex and BatchSize can clearly indicate the location of each > transaction log in the ledger. When the transaction log is no longer used, > Users can accurately delete it according to position and batchIndex. > > ### <p id="librarySupports">Library support for Compatibility with older > versions Broker</p> > In broker.conf we can configure the > [transactionMetadataStoreProviderClassName]( > https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535) > to replace the implementation of TransactionLog, we can also configure the > [transactionPendingAckStoreProviderClassName]( > https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549) > to replace the implementation of PendingAckStore, > We will provide a library containing the classes which can read batched > transaction logs and pending ack logs: > > #### TransactionLogBatchEnryReadableImpl > ```java > public class TransactionLogBatchEnryReadableImpl extends > MLTransactionLogImpl { > /** > * Different from the parent class, when this method reads an Entry, it > can identify > * whether the Entry is transction log or batched transaction log. If > the Entry is a > * transaction log, it maintains the same logic as the parent class. If > the transaction log > * is batched, it will be split into transaction log and processed > according to the original > * logic > */ > @Override > void replayAsync(TransactionLogReplayCallback > transactionLogReplayCallback); > } > ``` > > #### PendingAckStoreBatchEntryReadableImpl > ```java > public class PendingAckStoreBatchEntryReadableImpl extends > MLPendingAckStore { > /** > * Different from the parent class, when this method reads an Entry, it > can identify > * whether the Entry is pending ack log or batched pending ack log. If > the Entry is a > * pending ack log, it maintains the same logic as the parent class. If > the pending ack > * log is batched, it will be split into pending ack log and processed > according to the > * original logic > */ > void replayAsync(pendingAckHandle, executorService); > } > ``` > > How to use this library > 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib > 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is > “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”, > set transactionPendingAckStoreProviderClassName is > “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”. > 3. Restart broker.