Hi XiangYing
>  I noticed you write conf transactionMetadataStoreProviderClassName is
configured in broker.conf. Should this be added to `Configuration Changes`?

I have rewritten the design, in the new design I removed the section which
has "transactionMetadataStoreProviderClassName", instead it, I write
new sections: "Upgrade" and "Downgrade".

> This proposal will add a batch mechanism for pending ack, but this
proposal is no detailed description for pending ack similar to the
description of metadata log batch.

In the new design, I have appended "detail of pending ack" at section
"Acknowledge entry with the aggregated records".


On Fri, Jun 10, 2022 at 9:19 PM Xiangying Meng <xiangy...@apache.org> wrote:

> I think this is a nice optimization of transaction internal components.
> But I have two little questions:
> 1. I noticed you write conf transactionMetadataStoreProviderClassName is
> configured in broker.conf. Should this be added to `Configuration Changes`?
> 2. This proposal will add a batch mechanism for pending ack, but this
> proposal is no detailed description for pending ack similar to the
> description of metadata log batch.
>
> On Fri, Jun 10, 2022 at 7:15 PM Enrico Olivelli <eolive...@gmail.com>
> wrote:
>
> > I have read the PIP, and overall I agree with the design.
> > Good work !
> >
> > I am not sure I understand the part of making it configurable via a
> > classname.
> > I believe it is better to simply have a flag
> > "transactionEnableBatchWrites".
> > Otherwise the matrix of possible implementations will grow without
> limits.
> >
> > Enrico
> >
> > Il giorno ven 10 giu 2022 alle ore 11:35 Yubiao Feng
> > <yubiao.f...@streamnative.io.invalid> ha scritto:
> > >
> > > Hi Pulsar community:
> > >
> > > I open a pip to discuss "Batch writing ledger for transaction
> operation"
> > >
> > > Proposal Link: https://github.com/apache/pulsar/issues/15370
> > >
> > > ## Motivation
> > >
> > > Before reading the background, I suggest you read section “Transaction
> > > Flow” of [PIP-31: Transactional Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > > )
> > >
> > > ### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow
> > </p>
> > > ![MG3](
> > >
> >
> https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
> > > )
> > > In *Figure 1. Normal Flow vs. Transaction Flow*:
> > > - The gray square boxes represent logical components.
> > > - All the blue boxes represent logs. The logs are usually Managed
> ledger
> > > - Each arrow represents the request flow or message flow. These
> > operations
> > > occur in sequence indicated by the numbers next to each arrow.
> > > - The black arrows indicate those shared by transaction and normal
> flow.
> > > - The blue arrows represent normal-message-specific flow.
> > > - The orange arrows represent transaction-message-specific flow.
> > > - The sections below are numbered to match the operations showed in the
> > > diagram(differ from [PIP-31: Transactional Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > > ))
> > >
> > >
> > > #### 2.4a Write logs to ledger which Acknowledgement State is
> PENDING_ACK
> > > [Acknowledgement State Machine](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u
> > )
> > > tells about the changes of the Acknowledge State and why we need
> > persistent
> > > “The Log which the Acknowledgement State is PENDING_ACK”.
> > > #### 2.4a’ Mark messages is no longer useful with current subscription
> > > Update `Cursor` to mark the messages as DELETED. So they can be
> deleted.
> > > #### 3.2b Mark messages is no longer useful with current subscription
> > > The implementation here is exactly the same as 2.4a’, except that the
> > > execution is triggered later, after the Transaction has been committed.
> > >
> > >
> > > ### Analyze the performance cost of transaction
> > > As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow
> > vs.
> > > Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so
> the
> > > remaining orange arrows are the additional performance overhead of all
> > > transactions.
> > > In terms of whether or not each transaction is executed multiple times,
> > we
> > > can split the flow into two classes(Optimizing a process that is
> executed
> > > multiple times will yield more benefits):
> > > - Executed once each transaction: flow-1.x and flow-3.x
> > > - Executed multiple times each transaction: flow-2.x
> > >
> > > So optimizing the flow 2.x with a lot of execution is a good choice.
> > Let's
> > > split flow-2.x into two groups: those that cost more and those that
> cost
> > > less:
> > > - No disk written: flow-2.1 and fow-2.3
> > > - Disk written: fow-2.1a, fow-2.3a, flow-2.4a
> > >
> > > From the previous analysis, we found that optimizing flow-2.1a,
> > flow-2.3a,
> > > flow-2.4a would bring the most benefits, and batch writes would be an
> > > excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a
> are
> > > both manipulations written into the transaction log, we can combine
> them
> > in
> > > one batch; 2.4a is the operation of writing pending ACK log, we combine
> > > multiple 2.4a's into one batch for processing.
> > > As we can see from “Transaction Flow” of [PIP-31: Transactional
> > Streaming](
> > >
> >
> https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
> > ),
> > > these instructions are strictly sequential (guaranteed by the client):
> > > - flow-1.x end before flow-2.x start
> > > - flow-2.x end before flow-3.x start
> > > - flow-3.1a end before flow-3.3a start
> > >
> > > Therefore, the broker does not need to worry about the dependency of
> > these
> > > flows, we can also put flow-1a flow-31a and flow-3.3a into The
> > Transaction
> > > Log Batch too.
> > >
> > > ## Goal
> > > Provide a mechanism for Transaction Log Store and Pending Ack Store:
> > accept
> > > multiple write requests, buffer all those records, and persist to a
> > single
> > > BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
> > > - Allow users to specify control of the max size, max record of The
> > Buffer.
> > > - Allow users to specify control max delay time of The Write Request.
> > > - Multiple raw data can be recovered from Batched Entry.
> > > - Configurable “batched implementation” and “common implementation”
> > switch.
> > >
> > > ## Approach
> > > ### Buffer requests and write Bookie
> > > Create a new protobuf record called “Batched Transaction Data” with an
> > > array inside. When receive a request, we put it in the array.
> > >
> > > Request:
> > > ```
> > > [Request 1]{ data, callback }
> > > [Request 2]]{ data, callback }
> > > …
> > > …
> > > [Request N]]{ data, callback }
> > > ```
> > > Buffer:
> > > ```
> > > [BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
> > > ```
> > > Write Bookie:
> > > ```
> > > LedgerHandle async write ( BatchedTransactionData to byteBuf )
> > > LedgerHandle callback: ledgerId=1, entryId=1
> > > ```
> > > Request-Callback:
> > > ```
> > > Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > > Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > > …
> > > …
> > > Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > > ```
> > >
> > > ### Delete BatchedTransactionMeta
> > > [PIP 45](
> > >
> >
> https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level
> > )
> > > has supported batch index delete. So the Raw Data added to a batch can
> be
> > > with different batch indexes but with the same ledger ID and entry ID.
> > >
> > > Read:
> > > ```
> > > [BatchedTransactionData]
> > > ```
> > > After split:
> > > ```
> > > {data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
> > > {data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
> > > …
> > > {data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
> > > ```
> > > Users can delete whole of the batched Entry:
> > > ```java
> > > cursor.delete( Position {ledgerId = 1, entryId = 1} )
> > > ```
> > > Users can also delete only part of the batched Entry:
> > > ```java
> > > cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
> > > ```
> > >
> > > ## Changes
> > >
> > > ### Protocol Changes
> > > New protobuf record to buffer requests.
> > >
> > > BatchedTransactionMetadataEntry
> > > ```
> > > message BatchedTransactionMetadataEntry{
> > >   // Array for buffer transaction log data.
> > >   repeated TransactionMetadataEntry transaction_log = 12;
> > > }
> > > ```
> > >
> > > BatchedPendingAckMetadataEntry
> > > ```
> > > message BatchedPendingAckMetadataEntry{
> > >   // Array for buffer pending ack data.
> > >   repeated PendingAckMetadataEntry pending_ack_log=6;
> > > }
> > > ```
> > >
> > > Note: To ensure forward compatibility, we need to distinguish the old
> > > TransactionMetadataEntry/PendingAckMetadataEntry data from the new
> > > BatchedTransactionData data, and we add A magic number in front of the
> > > bytes that proto serializes:
> > > ```
> > > [Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
> > > ```
> > > Read Entry:
> > > ```
> > >                            /-- true -->
> [BatchedTransactionMetadataEntry]
> > > [Entry] --> has Magic Num ?
> > >                            \-- false --> [TransactionMetadataEntry]
> > > ```
> > >
> > >
> > > ### API Changes
> > >
> > > BatchAddDataCallback
> > > The Transaction Coordinator does not directly operate the Managed
> Ledger,
> > > uses the Transaction Log Store to operate on Managed Ledger. The
> Managed
> > > Ledger write API provides a callback class: AddEntryCallback, the same
> > > Transaction Log Store that provides bulk writes, provides a callback
> > class:
> > > BatchAddDataCallback. <a
> href="#BatchedAddDataCallbackExplains">Explains
> > > why do we need BatchAddDataCallback </a>.
> > >
> > > ![WechatIMG7](
> > >
> >
> https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
> > > )
> > > Figure.BatchAddDataCallback in Write Flow
> > >
> > > ```java
> > > interface BatchAddDataCallback {
> > >     /**
> > >      * Successed callback function for “add data asynchronously”
> > >      *
> > >      * @param posotion A Position is a pointer to a specific entry into
> > the
> > > managed ledger.
> > >      * @param byteBuf The raw data which added.
> > >      * @param batchIndex Raw data count in The whole Batched Entry.
> > >      * @param batchSize The current raw data index in the batch.
> > >      * @param ctx opaque context
> > >      */
> > >     void addComplete(Position position, ByteBuf byteBuf, int
> batchIndex,
> > > int batchSize, Object context);
> > >     /**
> > >      * Failure callback function for “add data asynchronously”
> > >      *
> > >      * @param ctx opaque context
> > >      */
> > >     void addFailed(ManagedLedgerException exception, Object ctx);
> > > }
> > > ```
> > >
> > > ### Configuration Changes
> > > Add the Batch threshold parameters to control the refresh frequency.
> > >
> > > broker.conf
> > > ```
> > > transactionLogBatchedWriteEnabled = false;
> > > transactionLogBatchedWriteMaxRecords= 512;
> > > transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
> > > transactionLogBatchedWriteMaxDelayInMillis= 1;
> > >
> > > pendingAckBatchedWriteEnabled = false;
> > > pendingAckBatchedWriteMaxRecords= 512;
> > > pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
> > > pendingAckBatchedWriteMaxDelayInMillis= 1;
> > > ```
> > >
> > > ### Compatibility
> > > After the batch feature is enabled, users can only downgrade to the
> > larger
> > > than “first version that supports BatchedTransactionMeta reading” to
> > > consume data. Data in a lower version broker cannot be parsed,
> resulting
> > in
> > > data loss. We also provide <a href="#librarySupports"> Library support
> > for
> > > Compatibility with older versions Broker</a>, If the user uses this
> > library
> > > on older version Broker<sup>[0]</sup>, all new data results can be
> > > processed correctly and none of the data will be lost.
> > >
> > > ----
> > > **[0]old version Broker**: Not less than 2.9.2 and 2.10
> > >
> > > ### Observability
> > > When using the Batch feature, users will adjust the frequency of disk
> > > brushing to achieve the optimal performance. We provide two observable
> > > indicators for users' reference
> > >
> > > ```
> > > BatchedDataStoreMXBeanImpl {
> > >     /** The number of logs in each batch. **/
> > >     Rate batchRecordCount;
> > >     /** The size of each batch. **/
> > >     Rate batchSizeCount;
> > > }
> > > ```
> > >
> > > ## Test plan
> > > The test should cover the following cases:
> > >
> > > - The batch mechanism works abides by the total count, total size, and
> > max
> > > delay limitation.
> > > - The returned position for writing data is correct.
> > > - The managedCursor can delete and mark delete the
> > BatchedTransactionMeta.
> > > - Performance tests and compare before-after improvement.
> > >
> > > ## The appendix
> > >
> > > ### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
> > > BatchAddDataCallback  </p>
> > > After all produced messages and acknowledgements to all partitions are
> > > committed or aborted, the TC writes the final COMMITTED or ABORTED
> > > transaction status message to its transaction log, indicating that the
> > > transaction is complete (shown as 3.3a in the diagram). At this point,
> > all
> > > the messages pertaining to the transaction in its transaction log can
> > > safely be removed.
> > > e.g. There are two transactions:
> > > ![截屏2022-06-10 11 56 49](
> > >
> >
> https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
> > > )
> > > Transaction Log Write:
> > > ```
> > > transaction_1: start transaction
> > > transaction_2: start transaction
> > > transaction_1: add partition to tx
> > > transaction_1: add subscription to tx
> > > transaction_2: add partition to tx
> > > transaction_1: commit
> > > ```
> > > Bookie Write:
> > > ```
> > > [Entry]{ BatchedTransactionData={LogRecordSize=6} }
> > > ```
> > > Bookie Response:
> > > ```
> > > {ledgerId=2, entryId=3}
> > > ```
> > > Transaction Log callback:
> > > ```
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4,
> > ctx}
> > > {position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5,
> > ctx}
> > > ```
> > > The entry(2,3) actually has 6 transaction logs, transaction_1 relations
> > to
> > > [0,2,3,5] and transaction_2 relations to [1,4].
> > > After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are
> not
> > > needed because the transaction has already been completed, but now we
> > could
> > > not delete Entry(2,3), Because the logs [1,4] are still useful that
> > > transaction_2 is not finished and we still need them for the recovery
> > > operation.
> > > The BatchIndex and BatchSize can clearly indicate the location of each
> > > transaction log in the ledger. When the transaction log is no longer
> > used,
> > > Users can accurately delete it according to position and batchIndex.
> > >
> > > ### <p id="librarySupports">Library support for Compatibility with
> older
> > > versions Broker</p>
> > > In broker.conf we can configure the
> > > [transactionMetadataStoreProviderClassName](
> > >
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535
> > )
> > > to replace the implementation of TransactionLog, we can also configure
> > the
> > > [transactionPendingAckStoreProviderClassName](
> > >
> >
> https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549
> > )
> > > to replace the implementation of PendingAckStore,
> > > We will provide a library containing the classes which can read batched
> > > transaction logs and pending ack logs:
> > >
> > > #### TransactionLogBatchEnryReadableImpl
> > > ```java
> > > public class TransactionLogBatchEnryReadableImpl extends
> > > MLTransactionLogImpl {
> > >     /**
> > >      * Different from the parent class, when this method reads an
> Entry,
> > it
> > > can identify
> > >      * whether the Entry is transction log or batched transaction log.
> If
> > > the Entry is a
> > >     * transaction log, it maintains the same logic as the parent class.
> > If
> > > the transaction log
> > >     * is batched, it will be split into transaction log and processed
> > > according to the original
> > >     * logic
> > >      */
> > >     @Override
> > >     void replayAsync(TransactionLogReplayCallback
> > > transactionLogReplayCallback);
> > > }
> > > ```
> > >
> > > #### PendingAckStoreBatchEntryReadableImpl
> > > ```java
> > > public class PendingAckStoreBatchEntryReadableImpl extends
> > > MLPendingAckStore {
> > >     /**
> > >      * Different from the parent class, when this method reads an
> Entry,
> > it
> > > can identify
> > >      * whether the Entry is pending ack log or batched pending ack log.
> > If
> > > the Entry is a
> > >     * pending ack log, it maintains the same logic as the parent class.
> > If
> > > the pending ack
> > >     * log is batched, it will be split into pending ack log and
> processed
> > > according to the
> > >     * original logic
> > >      */
> > >     void replayAsync(pendingAckHandle, executorService);
> > > }
> > > ```
> > >
> > > How to use this library
> > > 1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
> > > 2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
> > >
> >
> “org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
> > > set transactionPendingAckStoreProviderClassName is
> > >
> >
> “org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
> > > 3. Restart broker.
> >
>

Reply via email to