Hi XiangYing > I noticed you write conf transactionMetadataStoreProviderClassName is configured in broker.conf. Should this be added to `Configuration Changes`?
I have rewritten the design, in the new design I removed the section which has "transactionMetadataStoreProviderClassName", instead it, I write new sections: "Upgrade" and "Downgrade". > This proposal will add a batch mechanism for pending ack, but this proposal is no detailed description for pending ack similar to the description of metadata log batch. In the new design, I have appended "detail of pending ack" at section "Acknowledge entry with the aggregated records". On Fri, Jun 10, 2022 at 9:19 PM Xiangying Meng <xiangy...@apache.org> wrote: > I think this is a nice optimization of transaction internal components. > But I have two little questions: > 1. I noticed you write conf transactionMetadataStoreProviderClassName is > configured in broker.conf. Should this be added to `Configuration Changes`? > 2. This proposal will add a batch mechanism for pending ack, but this > proposal is no detailed description for pending ack similar to the > description of metadata log batch. > > On Fri, Jun 10, 2022 at 7:15 PM Enrico Olivelli <eolive...@gmail.com> > wrote: > > > I have read the PIP, and overall I agree with the design. > > Good work ! > > > > I am not sure I understand the part of making it configurable via a > > classname. > > I believe it is better to simply have a flag > > "transactionEnableBatchWrites". > > Otherwise the matrix of possible implementations will grow without > limits. > > > > Enrico > > > > Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng > > <yubiao.f...@streamnative.io.invalid> ha scritto: > > > > > > Hi Pulsar community: > > > > > > I open a pip to discuss "Batch writing ledger for transaction > operation" > > > > > > Proposal Link: https://github.com/apache/pulsar/issues/15370 > > > > > > ## Motivation > > > > > > Before reading the background, I suggest you read section “Transaction > > > Flow” of [PIP-31: Transactional Streaming]( > > > > > > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx > > > ) > > > > > > ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow > > </p> > > >  > > > In *Figure 1. Normal Flow vs. Transaction Flow*: > > > - The gray square boxes represent logical components. > > > - All the blue boxes represent logs. The logs are usually Managed > ledger > > > - Each arrow represents the request flow or message flow. These > > operations > > > occur in sequence indicated by the numbers next to each arrow. > > > - The black arrows indicate those shared by transaction and normal > flow. > > > - The blue arrows represent normal-message-specific flow. > > > - The orange arrows represent transaction-message-specific flow. > > > - The sections below are numbered to match the operations showed in the > > > diagram(differ from [PIP-31: Transactional Streaming]( > > > > > > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx > > > )) > > > > > > > > > #### 2.4a Write logs to ledger which Acknowledgement State is > PENDING_ACK > > > [Acknowledgement State Machine]( > > > > > > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u > > ) > > > tells about the changes of the Acknowledge State and why we need > > persistent > > > “The Log which the Acknowledgement State is PENDING_ACK”. > > > #### 2.4a’ Mark messages is no longer useful with current subscription > > > Update `Cursor` to mark the messages as DELETED. So they can be > deleted. > > > #### 3.2b Mark messages is no longer useful with current subscription > > > The implementation here is exactly the same as 2.4a’, except that the > > > execution is triggered later, after the Transaction has been committed. > > > > > > > > > ### Analyze the performance cost of transaction > > > As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow > > vs. > > > Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so > the > > > remaining orange arrows are the additional performance overhead of all > > > transactions. > > > In terms of whether or not each transaction is executed multiple times, > > we > > > can split the flow into two classes(Optimizing a process that is > executed > > > multiple times will yield more benefits): > > > - Executed once each transaction: flow-1.x and flow-3.x > > > - Executed multiple times each transaction: flow-2.x > > > > > > So optimizing the flow 2.x with a lot of execution is a good choice. > > Let's > > > split flow-2.x into two groups: those that cost more and those that > cost > > > less: > > > - No disk written: flow-2.1 and fow-2.3 > > > - Disk written: fow-2.1a, fow-2.3a, flow-2.4a > > > > > > From the previous analysis, we found that optimizing flow-2.1a, > > flow-2.3a, > > > flow-2.4a would bring the most benefits, and batch writes would be an > > > excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a > are > > > both manipulations written into the transaction log, we can combine > them > > in > > > one batch; 2.4a is the operation of writing pending ACK log, we combine > > > multiple 2.4a's into one batch for processing. > > > As we can see from “Transaction Flow” of [PIP-31: Transactional > > Streaming]( > > > > > > https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx > > ), > > > these instructions are strictly sequential (guaranteed by the client): > > > - flow-1.x end before flow-2.x start > > > - flow-2.x end before flow-3.x start > > > - flow-3.1a end before flow-3.3a start > > > > > > Therefore, the broker does not need to worry about the dependency of > > these > > > flows, we can also put flow-1a flow-31a and flow-3.3a into The > > Transaction > > > Log Batch too. > > > > > > ## Goal > > > Provide a mechanism for Transaction Log Store and Pending Ack Store: > > accept > > > multiple write requests, buffer all those records, and persist to a > > single > > > BK entry(aka “Batched Entry”). This will improve broker->BK throughput. > > > - Allow users to specify control of the max size, max record of The > > Buffer. > > > - Allow users to specify control max delay time of The Write Request. > > > - Multiple raw data can be recovered from Batched Entry. > > > - Configurable “batched implementation” and “common implementation” > > switch. > > > > > > ## Approach > > > ### Buffer requests and write Bookie > > > Create a new protobuf record called “Batched Transaction Data” with an > > > array inside. When receive a request, we put it in the array. > > > > > > Request: > > > ``` > > > [Request 1]{ data, callback } > > > [Request 2]]{ data, callback } > > > … > > > … > > > [Request N]]{ data, callback } > > > ``` > > > Buffer: > > > ``` > > > [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] } > > > ``` > > > Write Bookie: > > > ``` > > > LedgerHandle async write ( BatchedTransactionData to byteBuf ) > > > LedgerHandle callback: ledgerId=1, entryId=1 > > > ``` > > > Request-Callback: > > > ``` > > > Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0} > > > Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1} > > > … > > > … > > > Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1} > > > ``` > > > > > > ### Delete BatchedTransactionMeta > > > [PIP 45]( > > > > > > https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level > > ) > > > has supported batch index delete. So the Raw Data added to a batch can > be > > > with different batch indexes but with the same ledger ID and entry ID. > > > > > > Read: > > > ``` > > > [BatchedTransactionData] > > > ``` > > > After split: > > > ``` > > > {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0} > > > {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1} > > > … > > > {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1} > > > ``` > > > Users can delete whole of the batched Entry: > > > ```java > > > cursor.delete( Position {ledgerId = 1, entryId = 1} ) > > > ``` > > > Users can also delete only part of the batched Entry: > > > ```java > > > cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} ) > > > ``` > > > > > > ## Changes > > > > > > ### Protocol Changes > > > New protobuf record to buffer requests. > > > > > > BatchedTransactionMetadataEntry > > > ``` > > > message BatchedTransactionMetadataEntry{ > > > // Array for buffer transaction log data. > > > repeated TransactionMetadataEntry transaction_log = 12; > > > } > > > ``` > > > > > > BatchedPendingAckMetadataEntry > > > ``` > > > message BatchedPendingAckMetadataEntry{ > > > // Array for buffer pending ack data. > > > repeated PendingAckMetadataEntry pending_ack_log=6; > > > } > > > ``` > > > > > > Note: To ensure forward compatibility, we need to distinguish the old > > > TransactionMetadataEntry/PendingAckMetadataEntry data from the new > > > BatchedTransactionData data, and we add A magic number in front of the > > > bytes that proto serializes: > > > ``` > > > [Magic Num] [PendingAckMetadataEntry proto bytes] ==> [Enrty] > > > ``` > > > Read Entry: > > > ``` > > > /-- true --> > [BatchedTransactionMetadataEntry] > > > [Entry] --> has Magic Num ? > > > \-- false --> [TransactionMetadataEntry] > > > ``` > > > > > > > > > ### API Changes > > > > > > BatchAddDataCallback > > > The Transaction Coordinator does not directly operate the Managed > Ledger, > > > uses the Transaction Log Store to operate on Managed Ledger. The > Managed > > > Ledger write API provides a callback class: AddEntryCallback, the same > > > Transaction Log Store that provides bulk writes, provides a callback > > class: > > > BatchAddDataCallback. <a > href="#BatchedAddDataCallbackExplains">Explains > > > why do we need BatchAddDataCallback </a>. > > > > > >  > > > Figure.BatchAddDataCallback in Write Flow > > > > > > ```java > > > interface BatchAddDataCallback { > > > /** > > > * Successed callback function for “add data asynchronously” > > > * > > > * @param posotion A Position is a pointer to a specific entry into > > the > > > managed ledger. > > > * @param byteBuf The raw data which added. > > > * @param batchIndex Raw data count in The whole Batched Entry. > > > * @param batchSize The current raw data index in the batch. > > > * @param ctx opaque context > > > */ > > > void addComplete(Position position, ByteBuf byteBuf, int > batchIndex, > > > int batchSize, Object context); > > > /** > > > * Failure callback function for “add data asynchronously” > > > * > > > * @param ctx opaque context > > > */ > > > void addFailed(ManagedLedgerException exception, Object ctx); > > > } > > > ``` > > > > > > ### Configuration Changes > > > Add the Batch threshold parameters to control the refresh frequency. > > > > > > broker.conf > > > ``` > > > transactionLogBatchedWriteEnabled = false; > > > transactionLogBatchedWriteMaxRecords= 512; > > > transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4; > > > transactionLogBatchedWriteMaxDelayInMillis= 1; > > > > > > pendingAckBatchedWriteEnabled = false; > > > pendingAckBatchedWriteMaxRecords= 512; > > > pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4; > > > pendingAckBatchedWriteMaxDelayInMillis= 1; > > > ``` > > > > > > ### Compatibility > > > After the batch feature is enabled, users can only downgrade to the > > larger > > > than “first version that supports BatchedTransactionMeta reading” to > > > consume data. Data in a lower version broker cannot be parsed, > resulting > > in > > > data loss. We also provide <a href="#librarySupports"> Library support > > for > > > Compatibility with older versions Broker</a>, If the user uses this > > library > > > on older version Broker<sup>[0]</sup>, all new data results can be > > > processed correctly and none of the data will be lost. > > > > > > ---- > > > **[0]old version Broker**: Not less than 2.9.2 and 2.10 > > > > > > ### Observability > > > When using the Batch feature, users will adjust the frequency of disk > > > brushing to achieve the optimal performance. We provide two observable > > > indicators for users' reference > > > > > > ``` > > > BatchedDataStoreMXBeanImpl { > > > /** The number of logs in each batch. **/ > > > Rate batchRecordCount; > > > /** The size of each batch. **/ > > > Rate batchSizeCount; > > > } > > > ``` > > > > > > ## Test plan > > > The test should cover the following cases: > > > > > > - The batch mechanism works abides by the total count, total size, and > > max > > > delay limitation. > > > - The returned position for writing data is correct. > > > - The managedCursor can delete and mark delete the > > BatchedTransactionMeta. > > > - Performance tests and compare before-after improvement. > > > > > > ## The appendix > > > > > > ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need > > > BatchAddDataCallback </p> > > > After all produced messages and acknowledgements to all partitions are > > > committed or aborted, the TC writes the final COMMITTED or ABORTED > > > transaction status message to its transaction log, indicating that the > > > transaction is complete (shown as 3.3a in the diagram). At this point, > > all > > > the messages pertaining to the transaction in its transaction log can > > > safely be removed. > > > e.g. There are two transactions: > > >  > > > Transaction Log Write: > > > ``` > > > transaction_1: start transaction > > > transaction_2: start transaction > > > transaction_1: add partition to tx > > > transaction_1: add subscription to tx > > > transaction_2: add partition to tx > > > transaction_1: commit > > > ``` > > > Bookie Write: > > > ``` > > > [Entry]{ BatchedTransactionData={LogRecordSize=6} } > > > ``` > > > Bookie Response: > > > ``` > > > {ledgerId=2, entryId=3} > > > ``` > > > Transaction Log callback: > > > ``` > > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0, > > ctx} > > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1, > > ctx} > > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2, > > ctx} > > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3, > > ctx} > > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4, > > ctx} > > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5, > > ctx} > > > ``` > > > The entry(2,3) actually has 6 transaction logs, transaction_1 relations > > to > > > [0,2,3,5] and transaction_2 relations to [1,4]. > > > After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are > not > > > needed because the transaction has already been completed, but now we > > could > > > not delete Entry(2,3), Because the logs [1,4] are still useful that > > > transaction_2 is not finished and we still need them for the recovery > > > operation. > > > The BatchIndex and BatchSize can clearly indicate the location of each > > > transaction log in the ledger. When the transaction log is no longer > > used, > > > Users can accurately delete it according to position and batchIndex. > > > > > > ### <p id="librarySupports">Library support for Compatibility with > older > > > versions Broker</p> > > > In broker.conf we can configure the > > > [transactionMetadataStoreProviderClassName]( > > > > > > https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535 > > ) > > > to replace the implementation of TransactionLog, we can also configure > > the > > > [transactionPendingAckStoreProviderClassName]( > > > > > > https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549 > > ) > > > to replace the implementation of PendingAckStore, > > > We will provide a library containing the classes which can read batched > > > transaction logs and pending ack logs: > > > > > > #### TransactionLogBatchEnryReadableImpl > > > ```java > > > public class TransactionLogBatchEnryReadableImpl extends > > > MLTransactionLogImpl { > > > /** > > > * Different from the parent class, when this method reads an > Entry, > > it > > > can identify > > > * whether the Entry is transction log or batched transaction log. > If > > > the Entry is a > > > * transaction log, it maintains the same logic as the parent class. > > If > > > the transaction log > > > * is batched, it will be split into transaction log and processed > > > according to the original > > > * logic > > > */ > > > @Override > > > void replayAsync(TransactionLogReplayCallback > > > transactionLogReplayCallback); > > > } > > > ``` > > > > > > #### PendingAckStoreBatchEntryReadableImpl > > > ```java > > > public class PendingAckStoreBatchEntryReadableImpl extends > > > MLPendingAckStore { > > > /** > > > * Different from the parent class, when this method reads an > Entry, > > it > > > can identify > > > * whether the Entry is pending ack log or batched pending ack log. > > If > > > the Entry is a > > > * pending ack log, it maintains the same logic as the parent class. > > If > > > the pending ack > > > * log is batched, it will be split into pending ack log and > processed > > > according to the > > > * original logic > > > */ > > > void replayAsync(pendingAckHandle, executorService); > > > } > > > ``` > > > > > > How to use this library > > > 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib > > > 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is > > > > > > “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”, > > > set transactionPendingAckStoreProviderClassName is > > > > > > “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”. > > > 3. Restart broker. > > >