Hi Pulsar community:

I open a pip to discuss "Batch writing ledger for transaction operation"

Proposal Link: https://github.com/apache/pulsar/issues/15370

## Motivation

Before reading the background, I suggest you read section “Transaction
Flow” of [PIP-31: Transactional Streaming](
https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
)

### <p id="normalFlowVsTransaction"> Normal Flow vs. Transaction Flow </p>
![MG3](
https://user-images.githubusercontent.com/25195800/172985866-25e496a4-ea93-42ec-aa0d-e6a02aa0635e.jpeg
)
In *Figure 1. Normal Flow vs. Transaction Flow*:
- The gray square boxes represent logical components.
- All the blue boxes represent logs. The logs are usually Managed ledger
- Each arrow represents the request flow or message flow. These operations
occur in sequence indicated by the numbers next to each arrow.
- The black arrows indicate those shared by transaction and normal flow.
- The blue arrows represent normal-message-specific flow.
- The orange arrows represent transaction-message-specific flow.
- The sections below are numbered to match the operations showed in the
diagram(differ from [PIP-31: Transactional Streaming](
https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx
))


#### 2.4a Write logs to ledger which Acknowledgement State is PENDING_ACK
[Acknowledgement State Machine](
https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.4bikq6sjiy8u)
tells about the changes of the Acknowledge State and why we need persistent
“The Log which the Acknowledgement State is PENDING_ACK”.
#### 2.4a’ Mark messages is no longer useful with current subscription
Update `Cursor` to mark the messages as DELETED. So they can be deleted.
#### 3.2b Mark messages is no longer useful with current subscription
The implementation here is exactly the same as 2.4a’, except that the
execution is triggered later, after the Transaction has been committed.


### Analyze the performance cost of transaction
As you can see <a href="#normalFlowVsTransaction">Figure 1. Normal Flow vs.
Transaction Flow]</a>: 2.4a 'and 3.2b are exactly the same logic, so the
remaining orange arrows are the additional performance overhead of all
transactions.
In terms of whether or not each transaction is executed multiple times, we
can split the flow into two classes(Optimizing a process that is executed
multiple times will yield more benefits):
- Executed once each transaction: flow-1.x and flow-3.x
- Executed multiple times each transaction: flow-2.x

So optimizing the flow 2.x with a lot of execution is a good choice. Let's
split flow-2.x into two groups: those that cost more and those that cost
less:
- No disk written: flow-2.1 and fow-2.3
- Disk written: fow-2.1a, fow-2.3a, flow-2.4a

>From the previous analysis, we found that optimizing flow-2.1a, flow-2.3a,
flow-2.4a would bring the most benefits, and batch writes would be an
excellent solution for multiple disk writes. Flow-2.1a and Flow-2.3a are
both manipulations written into the transaction log, we can combine them in
one batch; 2.4a is the operation of writing pending ACK log, we combine
multiple 2.4a's into one batch for processing.
As we can see from “Transaction Flow” of [PIP-31: Transactional Streaming](
https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#heading=h.bm5ainqxosrx),
these instructions are strictly sequential (guaranteed by the client):
- flow-1.x end before flow-2.x start
- flow-2.x end before flow-3.x start
- flow-3.1a end before flow-3.3a start

Therefore, the broker does not need to worry about the dependency of these
flows, we can also put flow-1a flow-31a and flow-3.3a into The Transaction
Log Batch too.

## Goal
Provide a mechanism for Transaction Log Store and Pending Ack Store: accept
multiple write requests, buffer all those records, and persist to a single
BK entry(aka “Batched Entry”). This will improve broker->BK throughput.
- Allow users to specify control of the max size, max record of The Buffer.
- Allow users to specify control max delay time of The Write Request.
- Multiple raw data can be recovered from Batched Entry.
- Configurable “batched implementation” and “common implementation” switch.

## Approach
### Buffer requests and write Bookie
Create a new protobuf record called “Batched Transaction Data” with an
array inside. When receive a request, we put it in the array.

Request:
```
[Request 1]{ data, callback }
[Request 2]]{ data, callback }
…
…
[Request N]]{ data, callback }
```
Buffer:
```
[BatchedTransactionData]{ list=[Request 1, Request 2 … Request N] }
```
Write Bookie:
```
LedgerHandle async write ( BatchedTransactionData to byteBuf )
LedgerHandle callback: ledgerId=1, entryId=1
```
Request-Callback:
```
Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
…
…
Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
```

### Delete BatchedTransactionMeta
[PIP 45](
https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level)
has supported batch index delete. So the Raw Data added to a batch can be
with different batch indexes but with the same ledger ID and entry ID.

Read:
```
[BatchedTransactionData]
```
After split:
```
{data 1, ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
{data 2, ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
…
{data 3, ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}
```
Users can delete whole of the batched Entry:
```java
cursor.delete( Position {ledgerId = 1, entryId = 1} )
```
Users can also delete only part of the batched Entry:
```java
cursor.delete( Position {ledgerId =1, entryId = 1, batchIndex=1} )
```

## Changes

### Protocol Changes
New protobuf record to buffer requests.

BatchedTransactionMetadataEntry
```
message BatchedTransactionMetadataEntry{
  // Array for buffer transaction log data.
  repeated TransactionMetadataEntry transaction_log = 12;
}
```

BatchedPendingAckMetadataEntry
```
message BatchedPendingAckMetadataEntry{
  // Array for buffer pending ack data.
  repeated PendingAckMetadataEntry pending_ack_log=6;
}
```

Note: To ensure forward compatibility, we need to distinguish the old
TransactionMetadataEntry/PendingAckMetadataEntry data from the new
BatchedTransactionData data, and we add A magic number in front of the
bytes that proto serializes:
```
[Magic Num] [PendingAckMetadataEntry proto bytes]  ==>  [Enrty]
```
Read Entry:
```
                           /-- true --> [BatchedTransactionMetadataEntry]
[Entry] --> has Magic Num ?
                           \-- false --> [TransactionMetadataEntry]
```


### API Changes

BatchAddDataCallback
The Transaction Coordinator does not directly operate the Managed Ledger,
uses the Transaction Log Store to operate on Managed Ledger. The Managed
Ledger write API provides a callback class: AddEntryCallback, the same
Transaction Log Store that provides bulk writes, provides a callback class:
BatchAddDataCallback. <a href="#BatchedAddDataCallbackExplains">Explains
why do we need BatchAddDataCallback </a>.

![WechatIMG7](
https://user-images.githubusercontent.com/25195800/173034341-8d44a8b1-9dde-45ee-8525-b72365def640.jpeg
)
Figure.BatchAddDataCallback in Write Flow

```java
interface BatchAddDataCallback {
    /**
     * Successed callback function for “add data asynchronously”
     *
     * @param posotion A Position is a pointer to a specific entry into the
managed ledger.
     * @param byteBuf The raw data which added.
     * @param batchIndex Raw data count in The whole Batched Entry.
     * @param batchSize The current raw data index in the batch.
     * @param ctx opaque context
     */
    void addComplete(Position position, ByteBuf byteBuf, int batchIndex,
int batchSize, Object context);
    /**
     * Failure callback function for “add data asynchronously”
     *
     * @param ctx opaque context
     */
    void addFailed(ManagedLedgerException exception, Object ctx);
}
```

### Configuration Changes
Add the Batch threshold parameters to control the refresh frequency.

broker.conf
```
transactionLogBatchedWriteEnabled = false;
transactionLogBatchedWriteMaxRecords= 512;
transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
transactionLogBatchedWriteMaxDelayInMillis= 1;

pendingAckBatchedWriteEnabled = false;
pendingAckBatchedWriteMaxRecords= 512;
pendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
pendingAckBatchedWriteMaxDelayInMillis= 1;
```

### Compatibility
After the batch feature is enabled, users can only downgrade to the larger
than “first version that supports BatchedTransactionMeta reading” to
consume data. Data in a lower version broker cannot be parsed, resulting in
data loss. We also provide <a href="#librarySupports"> Library support for
Compatibility with older versions Broker</a>, If the user uses this library
on older version Broker<sup>[0]</sup>, all new data results can be
processed correctly and none of the data will be lost.

----
**[0]old version Broker**: Not less than 2.9.2 and 2.10

### Observability
When using the Batch feature, users will adjust the frequency of disk
brushing to achieve the optimal performance. We provide two observable
indicators for users' reference

```
BatchedDataStoreMXBeanImpl {
    /** The number of logs in each batch. **/
    Rate batchRecordCount;
    /** The size of each batch. **/
    Rate batchSizeCount;
}
```

## Test plan
The test should cover the following cases:

- The batch mechanism works abides by the total count, total size, and max
delay limitation.
- The returned position for writing data is correct.
- The managedCursor can delete and mark delete the BatchedTransactionMeta.
- Performance tests and compare before-after improvement.

## The appendix

### <p id="BatchedAddDataCallbackExplains"> Explains why do we need
BatchAddDataCallback  </p>
After all produced messages and acknowledgements to all partitions are
committed or aborted, the TC writes the final COMMITTED or ABORTED
transaction status message to its transaction log, indicating that the
transaction is complete (shown as 3.3a in the diagram). At this point, all
the messages pertaining to the transaction in its transaction log can
safely be removed.
e.g. There are two transactions:
![截屏2022-06-10 11 56 49](
https://user-images.githubusercontent.com/25195800/172987382-fc4ddf9a-e21c-437f-900b-cd681d8d9364.png
)
Transaction Log Write:
```
transaction_1: start transaction
transaction_2: start transaction
transaction_1: add partition to tx
transaction_1: add subscription to tx
transaction_2: add partition to tx
transaction_1: commit
```
Bookie Write:
```
[Entry]{ BatchedTransactionData={LogRecordSize=6} }
```
Bookie Response:
```
{ledgerId=2, entryId=3}
```
Transaction Log callback:
```
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=0, ctx}
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=1, ctx}
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=2, ctx}
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=3, ctx}
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=4, ctx}
{position={ledgerId=2, entryId=3}, byteBuf, batchSize=6, batchIndex=5, ctx}
```
The entry(2,3) actually has 6 transaction logs, transaction_1 relations to
[0,2,3,5] and transaction_2 relations to [1,4].
After transaction_1 is committed,the logs [0,2,3,5] of Entry(2,3) are not
needed because the transaction has already been completed, but now we could
not delete Entry(2,3), Because the logs [1,4] are still useful that
transaction_2 is not finished and we still need them for the recovery
operation.
The BatchIndex and BatchSize can clearly indicate the location of each
transaction log in the ledger. When the transaction log is no longer used,
Users can accurately delete it according to position and batchIndex.

### <p id="librarySupports">Library support for Compatibility with older
versions Broker</p>
In broker.conf we can configure the
[transactionMetadataStoreProviderClassName](
https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2535)
to replace the implementation of TransactionLog, we can also configure the
[transactionPendingAckStoreProviderClassName](
https://github.com/apache/pulsar/blob/bbc404bf6c8778e1e788b2b48af60de925256587/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2549)
to replace the implementation of PendingAckStore,
We will provide a library containing the classes which can read batched
transaction logs and pending ack logs:

#### TransactionLogBatchEnryReadableImpl
```java
public class TransactionLogBatchEnryReadableImpl extends
MLTransactionLogImpl {
    /**
     * Different from the parent class, when this method reads an Entry, it
can identify
     * whether the Entry is transction log or batched transaction log. If
the Entry is a
    * transaction log, it maintains the same logic as the parent class. If
the transaction log
    * is batched, it will be split into transaction log and processed
according to the original
    * logic
     */
    @Override
    void replayAsync(TransactionLogReplayCallback
transactionLogReplayCallback);
}
```

#### PendingAckStoreBatchEntryReadableImpl
```java
public class PendingAckStoreBatchEntryReadableImpl extends
MLPendingAckStore {
    /**
     * Different from the parent class, when this method reads an Entry, it
can identify
     * whether the Entry is pending ack log or batched pending ack log. If
the Entry is a
    * pending ack log, it maintains the same logic as the parent class. If
the pending ack
    * log is batched, it will be split into pending ack log and processed
according to the
    * original logic
     */
    void replayAsync(pendingAckHandle, executorService);
}
```

How to use this library
1. Copy pulsar-transaction-logs-batch-support.jar to ${PULSAR_HOME}/lib
2. Edit broker.conf. Set transactionMetadataStoreProviderClassName is
“org.apache.pulsar.transaction.coordinator.impl.BatchedReadTransactionMetadataStoreProvider”,
set transactionPendingAckStoreProviderClassName is
“org.apache.pulsar.broker.transaction.pendingack.impl.BatchedPendingAckStoreProvider”.
3. Restart broker.

Reply via email to