lhotari commented on code in PR #24704:
URL: https://github.com/apache/pulsar/pull/24704#discussion_r2334168814
##########
pip/pip-439.md:
##########
@@ -0,0 +1,410 @@
+# PIP-439: Adding Transaction Support to Pulsar Functions Through Managed
Transaction Wrapping
+
+# Background knowledge
+
+Apache Pulsar transactions enable atomic operations across multiple topics,
allowing producers to send messages and consumers to acknowledge messages as a
single unit
+of work. This provides the foundation for exactly-once processing semantics in
streaming applications.
+
+## Transaction Architecture
+
+Pulsar's transaction system consists of four key components:
+
+1. **Transaction Coordinator (TC)**: A broker module that manages transaction
lifecycles, allocates transaction IDs, and orchestrates the commit/abort
process.
+
+2. **Transaction Log**: A persistent topic storing transaction metadata and
state changes, enabling recovery after failures.
+
+3. **Transaction Buffer**: Temporarily stores messages produced within
transactions, making them visible to consumers only after commit.
+
+4. **Pending Acknowledge State**: Tracks message acknowledgments within
transactions, preventing conflicts between competing transactions.
+
+## Transaction Lifecycle
+
+Transactions follow a defined lifecycle:
+
+1. **OPEN**: Client obtains a transaction ID from the Transaction Coordinator.
+2. **PRODUCING/ACKNOWLEDGING**: Client registers topic
partitions/subscriptions with the TC, then produces/acknowledges messages
within the transaction.
+3. **COMMITTING/ABORTING**: Client requests to end the transaction, TC begins
two-phase commit.
+4. **COMMITTED/ABORTED**: After processing all partitions, TC finalizes the
transaction state.
+5. **TIMED_OUT**: Transactions exceeding their timeout are automatically
aborted.
+
+## Transaction Guarantees
+
+Pulsar transactions provide:
+- Atomic writes across multiple topics
+- Conditional acknowledgment to prevent duplicate processing by "zombie"
instances
+- Visibility control ensuring consumers only see committed transaction messages
+- Support for exactly-once processing in consume-transform-produce patterns
+
+## Pulsar Functions
+
+Pulsar Functions is a lightweight compute framework integrated with Apache
Pulsar that
+enables stream processing without managing infrastructure. Key characteristics
include:
+ - Simple Programming Model: Functions receive messages, process them, and
optionally
+produce output
+ - Processing Patterns: Supports both synchronous and asynchronous message
processing
+ - Context Object: Provides access to message metadata, output production, and
state
+storage
+ - Integration: Natively integrated with Pulsar's pub-sub messaging system
+ - Deployment: Managed by Pulsar with automatic scaling and fault tolerance
+
+Functions operate on a per-message basis, making them ideal for implementing
stream
+processing with exactly-once semantics when combined with transactions.
+
+# Motivation
+
+Currently, Pulsar Functions cannot publish to multiple topics transactionally,
which is a significant limitation for use cases requiring atomic multi-topic
+publishing. For instance, if a function processes an input message and needs
to publish related updates to several output topics, there's no guarantee that
all
+operations will succeed atomically.
+
+This limitation prevents building robust stream processing applications that
require exactly-once semantics across multiple input and output topics. Without
+transaction support in Functions, developers must implement their own error
handling and retry mechanisms, which can be complex and error-prone.
+
+Adding transaction support to Pulsar Functions would finally ensure message
processing atomicity.
+
+# Goals
+
+## In Scope
+
+1. Enable automatic transaction support for Pulsar Functions through
configuration
+2. Allow Functions to publish messages to multiple topics within a single
transaction
+3. Support transactional acknowledgment of input messages
+4. Ensure transactions are committed only if message processing completes
successfully
+5. Provide transaction timeout configuration for Functions
+6. Add transaction support for async functions
+7. Handling multiple transactions in batches to improve performance, added in
a later phase of implementation
+
+## Out of Scope
+
+1. Exposing explicit transaction management APIs in the Functions interface
+2. Supporting multi-function transactions (transactions spanning multiple
function invocations)
+3. Adding transaction support to Pulsar IO connectors
+4. Changes to the Function interface itself
+
+# High Level Design
+
+The proposed solution introduces managed transaction wrapping for Pulsar
Functions through configuration settings. When enabled, each function execution
will be automatically wrapped in a transaction without requiring code changes
to the function implementation.
+
+The general flow will be:
+1. Function is configured with `transactionMode: MANAGED`
+2. When a message arrives, the function runtime creates a new transaction
+3. The function processes the message with an enhanced Context that uses the
transaction
+4. Any output messages are published using the transaction
+5. Input message acknowledgment is performed within the transaction
+6. If the function completes successfully, the transaction is committed
+7. If the function throws an exception, the transaction is aborted
+
+This approach provides transaction support in a way that is transparent to
function implementers, requiring only configuration changes rather than code
changes.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Configuration Classes
+
+We will update the FunctionConfig to include transaction-related settings
through a new `TransactionConfig` class:
+
+```java
+public enum TransactionMode {
+ OFF,
+ MANAGED
+}
+
+public class TransactionConfig {
+ private TransactionMode transactionMode = TransactionMode.OFF;
+ private Long transactionTimeoutMs = 60000L;
+ private Integer transactionBatchingMaxEntries = 1;
+ private Long transactionBatchingQuietPeriodMs = 100L;
+
+ // Getters and setters...
+}
+
+public class FunctionConfig {
+ // Existing fields...
+
+ private TransactionConfig transaction = new TransactionConfig();
+
+ // Getter and setter ...
+}
+```
+
+```java
+We also need to update the protobuf definition for FunctionDetails to include
these fields:
+
+message TransactionSpec {
+ enum TransactionMode {
+ OFF = 0;
+ MANAGED = 1;
+ }
+ TransactionMode transactionMode = 1;
+ int64 transactionTimeoutMs = 2;
+ int64 transactionBatchingMaxEntries = 3;
+ int64 transactionBatchingQuietPeriodMs = 4;
+}
+
+message FunctionDetails {
+ // Other existing fields...
+ TransactionSpec transaction = 24;
+}
+```
+
+### Modifications to ContextImpl
+
+
+```java
+class ContextImpl implements Context, SinkContext, SourceContext,
AutoCloseable {
+ // Existing fields...
+
+ // Finds the proper transaction to tie to current function execution
(sync/async)
+ private Transaction getManagedTransaction() {
+
+ // implementation...
+ }
+
+ // Existing methods...
+
+ public void setCurrentTransaction(Transaction transaction) {
+ this.currentTransaction = transaction;
+ }
+
+ @Override
+ public <T> TypedMessageBuilder<T> newOutputMessage(String topicName,
Schema<T> schema)
+ throws PulsarClientException {
+ MessageBuilderImpl<T> messageBuilder = new MessageBuilderImpl<>();
+ TypedMessageBuilder<T> typedMessageBuilder;
+ Producer<T> producer = getProducer(topicName, schema);
+ Transaction managedTransaction = getManagedTransaction();
+
+ if (currentTransaction != null) {
+ if (schema != null) {
+ // Uses the new API that supports both schema and transaction
+ typedMessageBuilder = producer.newMessage(schema,
managedTransaction);
+ } else {
+ typedMessageBuilder = producer.newMessage(managedTransaction);
+ }
+ } else if (schema != null) {
+ typedMessageBuilder = producer.newMessage(schema);
+ } else {
+ typedMessageBuilder = producer.newMessage();
+ }
+
+ messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
+ return messageBuilder;
+ }
+}
+```
+
+## Asynchronous Functions Support
+
+It's important to note that Pulsar Functions supports asynchronous processing,
where functions can return `CompletableFuture` objects. This proposal ensures
that transaction support works seamlessly with both synchronous and
asynchronous functions.
+
+For asynchronous functions:
+1. The transaction is created at the beginning of message processing, just
like for synchronous functions
+2. When the function returns a `CompletableFuture`, the transaction is
maintained until the future completes
+ - Any Context-related operations inside of the returned 'CompletableFuture'
objects are tied to the correct transaction
+3. When the future completes successfully, the transaction is committed
+4. If the future completes exceptionally, the transaction is aborted
+
+## Batch Processing of Transactions
+
+To optimize performance and reduce the overhead on the Transaction
Coordinator, this proposal introduces transaction batching.
+Transaction batching allows multiple incoming messages to be processed within
the same transaction, reducing the total number of
+transactions created.
+
+## Transaction Batching Concept
+
+Transaction batching is distinct from Pulsar's message batching. While message
batching combines multiple messages into a single "batch
+message" for efficient network transfer, transaction batching processes
multiple incoming messages (or batch messages) within the scope
+of a single transaction.
+
+Key benefits of transaction batching include:
+1. **Reduced Load on Transaction Coordinator**: Fewer transactions means less
coordination overhead
+2. **Improved Throughput**: Higher message processing capacity with lower
per-message overhead
+3. **Optimized Resource Usage**: Better utilization of transaction resources
+4. **Consistent Performance at Scale**: Maintains performance characteristics
under high load
+
+### Transaction Batching Parameters
+
+Transaction batching is controlled by two main parameters:
+
+1. **`transactionBatchingMaxEntries`**: The maximum number of entries
(incoming messages or batch messages) to process within a single
+transaction before committing it and starting a new one.
+ - An "entry" refers to an incoming batch message which could itself contain
multiple individual messages
+ - Default: 1 (recommended minimum to ensure batch index acknowledgment state
doesn't span transactions)
+ - Setting this to higher values increases throughput but may impact latency
+
+2. **`transactionBatchingQuietPeriodMs`**: The maximum amount of time to wait
for additional messages before committing a transaction if
+`transactionBatchingMaxEntries` is not reached.
+ - Default: 100ms
Review Comment:
```suggestion
- Default: 1ms
```
It's better to have a shorter default. For Pulsar Client producer, the
default `batchingMaxPublishDelay` value is 1ms so that's why I'd go with 1ms.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]