I have read the PIP, and overall I agree with the design.
Good work !

I am not sure I understand the part of making it configurable via a classname.
I believe it is better to simply have a flag "transactionEnableBatchWrites".
Otherwise the matrix of possible implementations will grow without limits.

Enrico

Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng
<yubiao.f...@streamnative.io.invalid> ha scritto:
>
> Hi Pulsar community:
>
> I open a pip to discuss "Batch writing ledger for transaction operation"
>
> Proposal Link: https://github.com/apache/pulsar/issues/15370
>
> ## Motivation
>
> Before reading the background, I suggest you read section “Transaction
> Flow” of [PIP-31: Transactional Streaming](
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> )
>
> ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow </p>
> ![MG3](
> https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
> )
> In *Figure 1. Normal Flow vs. Transaction Flow*:
> - The gray square boxes represent logical components.
> - All the blue boxes represent logs. The logs are usually Managed ledger
> - Each arrow represents the request flow or message flow. These operations
> occur in sequence indicated by the numbers next to each arrow.
> - The black arrows indicate those shared by transaction and normal flow.
> - The blue arrows represent normal-message-specific flow.
> - The orange arrows represent transaction-message-specific flow.
> - The sections below are numbered to match the operations showed in the
> diagram(differ from [PIP-31: Transactional Streaming](
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> ))
>
>
> #### 2.4a Write logs to ledger which Acknowledgement State is PENDING_ACK
> [Acknowledgement State Machine](
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u)
> tells about the changes of the Acknowledge State and why we need persistent
> “The Log which the Acknowledgement State is PENDING_ACK”.
> #### 2.4a’ Mark messages is no longer useful with current subscription
> Update `Cursor` to mark the messages as DELETED. So they can be deleted.
> #### 3.2b Mark messages is no longer useful with current subscription
> The implementation here is exactly the same as 2.4a’, except that the
> execution is triggered later, after the Transaction has been committed.
>
>
> ### Analyze the performance cost of transaction
> As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow vs.
> Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so the
> remaining orange arrows are the additional performance overhead of all
> transactions.
> In terms of whether or not each transaction is executed multiple times, we
> can split the flow into two classes(Optimizing a process that is executed
> multiple times will yield more benefits):
> - Executed once each transaction: flow-1.x and flow-3.x
> - Executed multiple times each transaction: flow-2.x
>
> So optimizing the flow 2.x with a lot of execution is a good choice. Let's
> split flow-2.x into two groups: those that cost more and those that cost
> less:
> - No disk written: flow-2.1 and fow-2.3
> - Disk written: fow-2.1a, fow-2.3a, flow-2.4a
>
> From the previous analysis, we found that optimizing flow-2.1a, flow-2.3a,
> flow-2.4a would bring the most benefits, and batch writes would be an
> excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a are
> both manipulations written into the transaction log, we can combine them in
> one batch; 2.4a is the operation of writing pending ACK log, we combine
> multiple 2.4a's into one batch for processing.
> As we can see from “Transaction Flow” of [PIP-31: Transactional Streaming](
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx),
> these instructions are strictly sequential (guaranteed by the client):
> - flow-1.x end before flow-2.x start
> - flow-2.x end before flow-3.x start
> - flow-3.1a end before flow-3.3a start
>
> Therefore, the broker does not need to worry about the dependency of these
> flows, we can also put flow-1a flow-31a and flow-3.3a into The Transaction
> Log Batch too.
>
> ## Goal
> Provide a mechanism for Transaction Log Store and Pending Ack Store: accept
> multiple write requests, buffer all those records, and persist to a single
> BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
> - Allow users to specify control of the max size, max record of The Buffer.
> - Allow users to specify control max delay time of The Write Request.
> - Multiple raw data can be recovered from Batched Entry.
> - Configurable “batched implementation” and “common implementation” switch.
>
> ## Approach
> ### Buffer requests and write Bookie
> Create a new protobuf record called “Batched Transaction Data” with an
> array inside. When receive a request, we put it in the array.
>
> Request:
> ```
> [Request 1]{ data, callback }
> [Request 2]]{ data, callback }
> …
> …
> [Request N]]{ data, callback }
> ```
> Buffer:
> ```
> [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
> ```
> Write Bookie:
> ```
> LedgerHandle async write ( BatchedTransactionData to byteBuf )
> LedgerHandle callback: ledgerId=1, entryId=1
> ```
> Request-Callback:
> ```
> Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> …
> …
> Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> ```
>
> ### Delete BatchedTransactionMeta
> [PIP 45](
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level)
> has supported batch index delete. So the Raw Data added to a batch can be
> with different batch indexes but with the same ledger ID and entry ID.
>
> Read:
> ```
> [BatchedTransactionData]
> ```
> After split:
> ```
> {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> …
> {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> ```
> Users can delete whole of the batched Entry:
> ```java
> cursor.delete( Position {ledgerId = 1, entryId = 1} )
> ```
> Users can also delete only part of the batched Entry:
> ```java
> cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
> ```
>
> ## Changes
>
> ### Protocol Changes
> New protobuf record to buffer requests.
>
> BatchedTransactionMetadataEntry
> ```
> message BatchedTransactionMetadataEntry{
>   // Array for buffer transaction log data.
>   repeated TransactionMetadataEntry transaction_log = 12;
> }
> ```
>
> BatchedPendingAckMetadataEntry
> ```
> message BatchedPendingAckMetadataEntry{
>   // Array for buffer pending ack data.
>   repeated PendingAckMetadataEntry pending_ack_log=6;
> }
> ```
>
> Note: To ensure forward compatibility, we need to distinguish the old
> TransactionMetadataEntry/PendingAckMetadataEntry data from the new
> BatchedTransactionData data, and we add A magic number in front of the
> bytes that proto serializes:
> ```
> [Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
> ```
> Read Entry:
> ```
>                            /-- true --> [BatchedTransactionMetadataEntry]
> [Entry] --> has Magic Num ?
>                            \-- false --> [TransactionMetadataEntry]
> ```
>
>
> ### API Changes
>
> BatchAddDataCallback
> The Transaction Coordinator does not directly operate the Managed Ledger,
> uses the Transaction Log Store to operate on Managed Ledger. The Managed
> Ledger write API provides a callback class: AddEntryCallback, the same
> Transaction Log Store that provides bulk writes, provides a callback class:
> BatchAddDataCallback. <a href="#BatchedAddDataCallbackExplains">Explains
> why do we need BatchAddDataCallback </a>.
>
> ![WechatIMG7](
> https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
> )
> Figure.BatchAddDataCallback in Write Flow
>
> ```java
> interface BatchAddDataCallback {
>     /**
>      * Successed callback function for “add data asynchronously”
>      *
>      * @param posotion A Position is a pointer to a specific entry into the
> managed ledger.
>      * @param byteBuf The raw data which added.
>      * @param batchIndex Raw data count in The whole Batched Entry.
>      * @param batchSize The current raw data index in the batch.
>      * @param ctx opaque context
>      */
>     void addComplete(Position position, ByteBuf byteBuf, int batchIndex,
> int batchSize, Object context);
>     /**
>      * Failure callback function for “add data asynchronously”
>      *
>      * @param ctx opaque context
>      */
>     void addFailed(ManagedLedgerException exception, Object ctx);
> }
> ```
>
> ### Configuration Changes
> Add the Batch threshold parameters to control the refresh frequency.
>
> broker.conf
> ```
> transactionLogBatchedWriteEnabled = false;
> transactionLogBatchedWriteMaxRecords= 512;
> transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
> transactionLogBatchedWriteMaxDelayInMillis= 1;
>
> pendingAckBatchedWriteEnabled = false;
> pendingAckBatchedWriteMaxRecords= 512;
> pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
> pendingAckBatchedWriteMaxDelayInMillis= 1;
> ```
>
> ### Compatibility
> After the batch feature is enabled, users can only downgrade to the larger
> than “first version that supports BatchedTransactionMeta reading” to
> consume data. Data in a lower version broker cannot be parsed, resulting in
> data loss. We also provide <a href="#librarySupports"> Library support for
> Compatibility with older versions Broker</a>, If the user uses this library
> on older version Broker<sup>[0]</sup>, all new data results can be
> processed correctly and none of the data will be lost.
>
> ----
> **[0]old version Broker**: Not less than 2.9.2 and 2.10
>
> ### Observability
> When using the Batch feature, users will adjust the frequency of disk
> brushing to achieve the optimal performance. We provide two observable
> indicators for users' reference
>
> ```
> BatchedDataStoreMXBeanImpl {
>     /** The number of logs in each batch. **/
>     Rate batchRecordCount;
>     /** The size of each batch. **/
>     Rate batchSizeCount;
> }
> ```
>
> ## Test plan
> The test should cover the following cases:
>
> - The batch mechanism works abides by the total count, total size, and max
> delay limitation.
> - The returned position for writing data is correct.
> - The managedCursor can delete and mark delete the BatchedTransactionMeta.
> - Performance tests and compare before-after improvement.
>
> ## The appendix
>
> ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
> BatchAddDataCallback  </p>
> After all produced messages and acknowledgements to all partitions are
> committed or aborted, the TC writes the final COMMITTED or ABORTED
> transaction status message to its transaction log, indicating that the
> transaction is complete (shown as 3.3a in the diagram). At this point, all
> the messages pertaining to the transaction in its transaction log can
> safely be removed.
> e.g. There are two transactions:
> ![截屏2022-06-10 11 56 49](
> https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
> )
> Transaction Log Write:
> ```
> transaction_1: start transaction
> transaction_2: start transaction
> transaction_1: add partition to tx
> transaction_1: add subscription to tx
> transaction_2: add partition to tx
> transaction_1: commit
> ```
> Bookie Write:
> ```
> [Entry]{ BatchedTransactionData={LogRecordSize=6} }
> ```
> Bookie Response:
> ```
> {ledgerId=2, entryId=3}
> ```
> Transaction Log callback:
> ```
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0, ctx}
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1, ctx}
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2, ctx}
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3, ctx}
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4, ctx}
> {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5, ctx}
> ```
> The entry(2,3) actually has 6 transaction logs, transaction_1 relations to
> [0,2,3,5] and transaction_2 relations to [1,4].
> After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are not
> needed because the transaction has already been completed, but now we could
> not delete Entry(2,3), Because the logs [1,4] are still useful that
> transaction_2 is not finished and we still need them for the recovery
> operation.
> The BatchIndex and BatchSize can clearly indicate the location of each
> transaction log in the ledger. When the transaction log is no longer used,
> Users can accurately delete it according to position and batchIndex.
>
> ### <p id="librarySupports">Library support for Compatibility with older
> versions Broker</p>
> In broker.conf we can configure the
> [transactionMetadataStoreProviderClassName](
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535)
> to replace the implementation of TransactionLog, we can also configure the
> [transactionPendingAckStoreProviderClassName](
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549)
> to replace the implementation of PendingAckStore,
> We will provide a library containing the classes which can read batched
> transaction logs and pending ack logs:
>
> #### TransactionLogBatchEnryReadableImpl
> ```java
> public class TransactionLogBatchEnryReadableImpl extends
> MLTransactionLogImpl {
>     /**
>      * Different from the parent class, when this method reads an Entry, it
> can identify
>      * whether the Entry is transction log or batched transaction log. If
> the Entry is a
>     * transaction log, it maintains the same logic as the parent class. If
> the transaction log
>     * is batched, it will be split into transaction log and processed
> according to the original
>     * logic
>      */
>     @Override
>     void replayAsync(TransactionLogReplayCallback
> transactionLogReplayCallback);
> }
> ```
>
> #### PendingAckStoreBatchEntryReadableImpl
> ```java
> public class PendingAckStoreBatchEntryReadableImpl extends
> MLPendingAckStore {
>     /**
>      * Different from the parent class, when this method reads an Entry, it
> can identify
>      * whether the Entry is pending ack log or batched pending ack log. If
> the Entry is a
>     * pending ack log, it maintains the same logic as the parent class. If
> the pending ack
>     * log is batched, it will be split into pending ack log and processed
> according to the
>     * original logic
>      */
>     void replayAsync(pendingAckHandle, executorService);
> }
> ```
>
> How to use this library
> 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
> 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
> “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
> set transactionPendingAckStoreProviderClassName is
> “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
> 3. Restart broker.

Reply via email to