Hi  Enrico

> I am not sure I understand the part of making it configurable via a
classname.
I believe it is better to simply have a flag "transactionEnableBatchWrites".
Otherwise the matrix of possible implementations will grow without limits.

Good idea, I've modified the design and added a switch in the Configure
Changes section. Could you take a look again.

On Fri, Jun 10, 2022 at 7:14 PM Enrico Olivelli <eolive...@gmail.com> wrote:

> I have read the PIP, and overall I agree with the design.
> Good work !
>
> I am not sure I understand the part of making it configurable via a
> classname.
> I believe it is better to simply have a flag
> "transactionEnableBatchWrites".
> Otherwise the matrix of possible implementations will grow without limits.
>
> Enrico
>
> Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng
> <yubiao.f...@streamnative.io.invalid> ha scritto:
> >
> > Hi Pulsar community:
> >
> > I open a pip to discuss "Batch writing ledger for transaction operation"
> >
> > Proposal Link: https://github.com/apache/pulsar/issues/15370
> >
> > ## Motivation
> >
> > Before reading the background, I suggest you read section “Transaction
> > Flow” of [PIP-31: Transactional Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > )
> >
> > ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow
> </p>
> > ![MG3](
> >
> https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
> > )
> > In *Figure 1. Normal Flow vs. Transaction Flow*:
> > - The gray square boxes represent logical components.
> > - All the blue boxes represent logs. The logs are usually Managed ledger
> > - Each arrow represents the request flow or message flow. These
> operations
> > occur in sequence indicated by the numbers next to each arrow.
> > - The black arrows indicate those shared by transaction and normal flow.
> > - The blue arrows represent normal-message-specific flow.
> > - The orange arrows represent transaction-message-specific flow.
> > - The sections below are numbered to match the operations showed in the
> > diagram(differ from [PIP-31: Transactional Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > ))
> >
> >
> > #### 2.4a Write logs to ledger which Acknowledgement State is PENDING_ACK
> > [Acknowledgement State Machine](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u
> )
> > tells about the changes of the Acknowledge State and why we need
> persistent
> > “The Log which the Acknowledgement State is PENDING_ACK”.
> > #### 2.4a’ Mark messages is no longer useful with current subscription
> > Update `Cursor` to mark the messages as DELETED. So they can be deleted.
> > #### 3.2b Mark messages is no longer useful with current subscription
> > The implementation here is exactly the same as 2.4a’, except that the
> > execution is triggered later, after the Transaction has been committed.
> >
> >
> > ### Analyze the performance cost of transaction
> > As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow
> vs.
> > Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so the
> > remaining orange arrows are the additional performance overhead of all
> > transactions.
> > In terms of whether or not each transaction is executed multiple times,
> we
> > can split the flow into two classes(Optimizing a process that is executed
> > multiple times will yield more benefits):
> > - Executed once each transaction: flow-1.x and flow-3.x
> > - Executed multiple times each transaction: flow-2.x
> >
> > So optimizing the flow 2.x with a lot of execution is a good choice.
> Let's
> > split flow-2.x into two groups: those that cost more and those that cost
> > less:
> > - No disk written: flow-2.1 and fow-2.3
> > - Disk written: fow-2.1a, fow-2.3a, flow-2.4a
> >
> > From the previous analysis, we found that optimizing flow-2.1a,
> flow-2.3a,
> > flow-2.4a would bring the most benefits, and batch writes would be an
> > excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a are
> > both manipulations written into the transaction log, we can combine them
> in
> > one batch; 2.4a is the operation of writing pending ACK log, we combine
> > multiple 2.4a's into one batch for processing.
> > As we can see from “Transaction Flow” of [PIP-31: Transactional
> Streaming](
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> ),
> > these instructions are strictly sequential (guaranteed by the client):
> > - flow-1.x end before flow-2.x start
> > - flow-2.x end before flow-3.x start
> > - flow-3.1a end before flow-3.3a start
> >
> > Therefore, the broker does not need to worry about the dependency of
> these
> > flows, we can also put flow-1a flow-31a and flow-3.3a into The
> Transaction
> > Log Batch too.
> >
> > ## Goal
> > Provide a mechanism for Transaction Log Store and Pending Ack Store:
> accept
> > multiple write requests, buffer all those records, and persist to a
> single
> > BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
> > - Allow users to specify control of the max size, max record of The
> Buffer.
> > - Allow users to specify control max delay time of The Write Request.
> > - Multiple raw data can be recovered from Batched Entry.
> > - Configurable “batched implementation” and “common implementation”
> switch.
> >
> > ## Approach
> > ### Buffer requests and write Bookie
> > Create a new protobuf record called “Batched Transaction Data” with an
> > array inside. When receive a request, we put it in the array.
> >
> > Request:
> > ```
> > [Request 1]{ data, callback }
> > [Request 2]]{ data, callback }
> > …
> > …
> > [Request N]]{ data, callback }
> > ```
> > Buffer:
> > ```
> > [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
> > ```
> > Write Bookie:
> > ```
> > LedgerHandle async write ( BatchedTransactionData to byteBuf )
> > LedgerHandle callback: ledgerId=1, entryId=1
> > ```
> > Request-Callback:
> > ```
> > Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > …
> > …
> > Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > ```
> >
> > ### Delete BatchedTransactionMeta
> > [PIP 45](
> >
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level
> )
> > has supported batch index delete. So the Raw Data added to a batch can be
> > with different batch indexes but with the same ledger ID and entry ID.
> >
> > Read:
> > ```
> > [BatchedTransactionData]
> > ```
> > After split:
> > ```
> > {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > …
> > {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > ```
> > Users can delete whole of the batched Entry:
> > ```java
> > cursor.delete( Position {ledgerId = 1, entryId = 1} )
> > ```
> > Users can also delete only part of the batched Entry:
> > ```java
> > cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
> > ```
> >
> > ## Changes
> >
> > ### Protocol Changes
> > New protobuf record to buffer requests.
> >
> > BatchedTransactionMetadataEntry
> > ```
> > message BatchedTransactionMetadataEntry{
> >   // Array for buffer transaction log data.
> >   repeated TransactionMetadataEntry transaction_log = 12;
> > }
> > ```
> >
> > BatchedPendingAckMetadataEntry
> > ```
> > message BatchedPendingAckMetadataEntry{
> >   // Array for buffer pending ack data.
> >   repeated PendingAckMetadataEntry pending_ack_log=6;
> > }
> > ```
> >
> > Note: To ensure forward compatibility, we need to distinguish the old
> > TransactionMetadataEntry/PendingAckMetadataEntry data from the new
> > BatchedTransactionData data, and we add A magic number in front of the
> > bytes that proto serializes:
> > ```
> > [Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
> > ```
> > Read Entry:
> > ```
> >                            /-- true --> [BatchedTransactionMetadataEntry]
> > [Entry] --> has Magic Num ?
> >                            \-- false --> [TransactionMetadataEntry]
> > ```
> >
> >
> > ### API Changes
> >
> > BatchAddDataCallback
> > The Transaction Coordinator does not directly operate the Managed Ledger,
> > uses the Transaction Log Store to operate on Managed Ledger. The Managed
> > Ledger write API provides a callback class: AddEntryCallback, the same
> > Transaction Log Store that provides bulk writes, provides a callback
> class:
> > BatchAddDataCallback. <a href="#BatchedAddDataCallbackExplains">Explains
> > why do we need BatchAddDataCallback </a>.
> >
> > ![WechatIMG7](
> >
> https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
> > )
> > Figure.BatchAddDataCallback in Write Flow
> >
> > ```java
> > interface BatchAddDataCallback {
> >     /**
> >      * Successed callback function for “add data asynchronously”
> >      *
> >      * @param posotion A Position is a pointer to a specific entry into
> the
> > managed ledger.
> >      * @param byteBuf The raw data which added.
> >      * @param batchIndex Raw data count in The whole Batched Entry.
> >      * @param batchSize The current raw data index in the batch.
> >      * @param ctx opaque context
> >      */
> >     void addComplete(Position position, ByteBuf byteBuf, int batchIndex,
> > int batchSize, Object context);
> >     /**
> >      * Failure callback function for “add data asynchronously”
> >      *
> >      * @param ctx opaque context
> >      */
> >     void addFailed(ManagedLedgerException exception, Object ctx);
> > }
> > ```
> >
> > ### Configuration Changes
> > Add the Batch threshold parameters to control the refresh frequency.
> >
> > broker.conf
> > ```
> > transactionLogBatchedWriteEnabled = false;
> > transactionLogBatchedWriteMaxRecords= 512;
> > transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
> > transactionLogBatchedWriteMaxDelayInMillis= 1;
> >
> > pendingAckBatchedWriteEnabled = false;
> > pendingAckBatchedWriteMaxRecords= 512;
> > pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
> > pendingAckBatchedWriteMaxDelayInMillis= 1;
> > ```
> >
> > ### Compatibility
> > After the batch feature is enabled, users can only downgrade to the
> larger
> > than “first version that supports BatchedTransactionMeta reading” to
> > consume data. Data in a lower version broker cannot be parsed, resulting
> in
> > data loss. We also provide <a href="#librarySupports"> Library support
> for
> > Compatibility with older versions Broker</a>, If the user uses this
> library
> > on older version Broker<sup>[0]</sup>, all new data results can be
> > processed correctly and none of the data will be lost.
> >
> > ----
> > **[0]old version Broker**: Not less than 2.9.2 and 2.10
> >
> > ### Observability
> > When using the Batch feature, users will adjust the frequency of disk
> > brushing to achieve the optimal performance. We provide two observable
> > indicators for users' reference
> >
> > ```
> > BatchedDataStoreMXBeanImpl {
> >     /** The number of logs in each batch. **/
> >     Rate batchRecordCount;
> >     /** The size of each batch. **/
> >     Rate batchSizeCount;
> > }
> > ```
> >
> > ## Test plan
> > The test should cover the following cases:
> >
> > - The batch mechanism works abides by the total count, total size, and
> max
> > delay limitation.
> > - The returned position for writing data is correct.
> > - The managedCursor can delete and mark delete the
> BatchedTransactionMeta.
> > - Performance tests and compare before-after improvement.
> >
> > ## The appendix
> >
> > ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
> > BatchAddDataCallback  </p>
> > After all produced messages and acknowledgements to all partitions are
> > committed or aborted, the TC writes the final COMMITTED or ABORTED
> > transaction status message to its transaction log, indicating that the
> > transaction is complete (shown as 3.3a in the diagram). At this point,
> all
> > the messages pertaining to the transaction in its transaction log can
> > safely be removed.
> > e.g. There are two transactions:
> > ![截屏2022-06-10 11 56 49](
> >
> https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
> > )
> > Transaction Log Write:
> > ```
> > transaction_1: start transaction
> > transaction_2: start transaction
> > transaction_1: add partition to tx
> > transaction_1: add subscription to tx
> > transaction_2: add partition to tx
> > transaction_1: commit
> > ```
> > Bookie Write:
> > ```
> > [Entry]{ BatchedTransactionData={LogRecordSize=6} }
> > ```
> > Bookie Response:
> > ```
> > {ledgerId=2, entryId=3}
> > ```
> > Transaction Log callback:
> > ```
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4,
> ctx}
> > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5,
> ctx}
> > ```
> > The entry(2,3) actually has 6 transaction logs, transaction_1 relations
> to
> > [0,2,3,5] and transaction_2 relations to [1,4].
> > After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are not
> > needed because the transaction has already been completed, but now we
> could
> > not delete Entry(2,3), Because the logs [1,4] are still useful that
> > transaction_2 is not finished and we still need them for the recovery
> > operation.
> > The BatchIndex and BatchSize can clearly indicate the location of each
> > transaction log in the ledger. When the transaction log is no longer
> used,
> > Users can accurately delete it according to position and batchIndex.
> >
> > ### <p id="librarySupports">Library support for Compatibility with older
> > versions Broker</p>
> > In broker.conf we can configure the
> > [transactionMetadataStoreProviderClassName](
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535
> )
> > to replace the implementation of TransactionLog, we can also configure
> the
> > [transactionPendingAckStoreProviderClassName](
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549
> )
> > to replace the implementation of PendingAckStore,
> > We will provide a library containing the classes which can read batched
> > transaction logs and pending ack logs:
> >
> > #### TransactionLogBatchEnryReadableImpl
> > ```java
> > public class TransactionLogBatchEnryReadableImpl extends
> > MLTransactionLogImpl {
> >     /**
> >      * Different from the parent class, when this method reads an Entry,
> it
> > can identify
> >      * whether the Entry is transction log or batched transaction log. If
> > the Entry is a
> >     * transaction log, it maintains the same logic as the parent class.
> If
> > the transaction log
> >     * is batched, it will be split into transaction log and processed
> > according to the original
> >     * logic
> >      */
> >     @Override
> >     void replayAsync(TransactionLogReplayCallback
> > transactionLogReplayCallback);
> > }
> > ```
> >
> > #### PendingAckStoreBatchEntryReadableImpl
> > ```java
> > public class PendingAckStoreBatchEntryReadableImpl extends
> > MLPendingAckStore {
> >     /**
> >      * Different from the parent class, when this method reads an Entry,
> it
> > can identify
> >      * whether the Entry is pending ack log or batched pending ack log.
> If
> > the Entry is a
> >     * pending ack log, it maintains the same logic as the parent class.
> If
> > the pending ack
> >     * log is batched, it will be split into pending ack log and processed
> > according to the
> >     * original logic
> >      */
> >     void replayAsync(pendingAckHandle, executorService);
> > }
> > ```
> >
> > How to use this library
> > 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
> > 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
> >
> “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
> > set transactionPendingAckStoreProviderClassName is
> >
> “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
> > 3. Restart broker.
>

Reply via email to