lhotari commented on code in PR #24704:
URL: https://github.com/apache/pulsar/pull/24704#discussion_r2333206170


##########
pip/pip-439.md:
##########
@@ -0,0 +1,370 @@
+# PIP-439: Adding Transaction Support to Pulsar Functions Through 
Auto-Transaction Wrapping
+
+# Background knowledge
+
+Apache Pulsar transactions enable atomic operations across multiple topics, 
allowing producers to send messages and consumers to acknowledge messages as a 
single unit
+of work. This provides the foundation for exactly-once processing semantics in 
streaming applications.
+
+## Transaction Architecture
+
+Pulsar's transaction system consists of four key components:
+
+1. **Transaction Coordinator (TC)**: A broker module that manages transaction 
lifecycles, allocates transaction IDs, and orchestrates the commit/abort 
process.
+
+2. **Transaction Log**: A persistent topic storing transaction metadata and 
state changes, enabling recovery after failures.
+
+3. **Transaction Buffer**: Temporarily stores messages produced within 
transactions, making them visible to consumers only after commit.
+
+4. **Pending Acknowledge State**: Tracks message acknowledgments within 
transactions, preventing conflicts between competing transactions.
+
+## Transaction Lifecycle
+
+Transactions follow a defined lifecycle:
+
+1. **OPEN**: Client obtains a transaction ID from the Transaction Coordinator.
+2. **PRODUCING/ACKNOWLEDGING**: Client registers topic 
partitions/subscriptions with the TC, then produces/acknowledges messages 
within the transaction.
+3. **COMMITTING/ABORTING**: Client requests to end the transaction, TC begins 
two-phase commit.
+4. **COMMITTED/ABORTED**: After processing all partitions, TC finalizes the 
transaction state.
+5. **TIMED_OUT**: Transactions exceeding their timeout are automatically 
aborted.
+
+## Transaction Guarantees
+
+Pulsar transactions provide:
+- Atomic writes across multiple topics
+- Conditional acknowledgment to prevent duplicate processing by "zombie" 
instances
+- Visibility control ensuring consumers only see committed transaction messages
+- Support for exactly-once processing in consume-transform-produce patterns
+
+## Pulsar Functions
+
+Pulsar Functions is a lightweight compute framework integrated with Apache 
Pulsar that
+enables stream processing without managing infrastructure. Key characteristics 
include:
+ - Simple Programming Model: Functions receive messages, process them, and 
optionally
+produce output
+ - Processing Patterns: Supports both synchronous and asynchronous message 
processing
+ - Context Object: Provides access to message metadata, output production, and 
state
+storage
+ - Integration: Natively integrated with Pulsar's pub-sub messaging system
+ - Deployment: Managed by Pulsar with automatic scaling and fault tolerance
+
+Functions operate on a per-message basis, making them ideal for implementing 
stream
+processing with exactly-once semantics when combined with transactions.
+
+# Motivation
+
+Currently, Pulsar Functions cannot publish to multiple topics transactionally, 
which is a significant limitation for use cases requiring atomic multi-topic
+publishing. For instance, if a function processes an input message and needs 
to publish related updates to several output topics, there's no guarantee that 
all
+operations will succeed atomically.
+
+This limitation prevents building robust stream processing applications that 
require exactly-once semantics across multiple input and output topics. Without
+transaction support in Functions, developers must implement their own error 
handling and retry mechanisms, which can be complex and error-prone.
+
+Adding transaction support to Pulsar Functions would finally ensure message 
processing atomicity.
+
+# Goals
+
+## In Scope
+
+1. Enable automatic transaction support for Pulsar Functions through 
configuration
+2. Allow Functions to publish messages to multiple topics within a single 
transaction
+3. Support transactional acknowledgment of input messages
+4. Ensure transactions are committed only if message processing completes 
successfully
+5. Provide transaction timeout configuration for Functions
+
+## Out of Scope
+
+1. Exposing explicit transaction management APIs in the Functions interface
+2. Supporting multi-function transactions (transactions spanning multiple 
function invocations)
+3. Adding transaction support to Pulsar IO connectors
+4. Changes to the Function interface itself
+
+# High Level Design
+
+The proposed solution introduces managed transaction wrapping for Pulsar 
Functions through configuration settings. When enabled, each function execution 
will be automatically wrapped in a transaction without requiring code changes 
to the function implementation.
+
+The general flow will be:
+1. Function is configured with `transactionMode: MANAGED`
+2. When a message arrives, the function runtime creates a new transaction
+3. The function processes the message with an enhanced Context that uses the 
transaction
+4. Any output messages are published using the transaction
+5. Input message acknowledgment is performed within the transaction
+6. If the function completes successfully, the transaction is committed
+7. If the function throws an exception, the transaction is aborted
+
+This approach provides transaction support in a way that is transparent to 
function implementers, requiring only configuration changes rather than code 
changes.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Configuration Classes
+
+We will update the FunctionConfig to include transaction-related settings 
through a new `TransactionConfig` class:
+
+```java
+public enum TransactionMode {
+  OFF,
+  MANAGED
+}
+
+public class TransactionConfig {
+  private TransactionMode transactionMode = TransactionMode.OFF;
+  private Long transactionTimeoutMs = 60000L;
+
+  // Getters and setters...
+}
+
+public class FunctionConfig {
+  // Existing fields...
+
+  private TransactionConfig transaction = new TransactionConfig();
+
+  // Getter and setter ...
+}
+```
+
+```java
+We also need to update the protobuf definition for FunctionDetails to include 
these fields:
+
+message TransactionSpec {
+  enum TransactionMode {
+      OFF = 0;
+      MANAGED = 1;
+  }
+  TransactionMode transactionMode = 1;
+  int64 transactionTimeoutMs = 2;
+}
+
+message FunctionDetails {
+  // Other existing fields...
+  TransactionSpec transaction = 24;
+}
+```
+
+### Modifications to Context Interface
+
+We will update the Context interface to expose the current transaction:
+
+```java
+public interface Context {
+      // Existing methods...
+
+    /**
+     * Returns the current transaction if function is running in managed 
transaction mode.
+     *
+     * <p>IMPORTANT: This method is not async-safe. When writing asynchronous 
functions that
+     * return CompletableFuture and need to use the transaction inside 
callbacks or chained
+     * operations, you must store a reference to the transaction locally 
before returning the future:
+     *
+     * <pre>{@code
+     * public CompletableFuture<String> process(String input, Context context) 
{
+     *     // Store transaction reference locally before async operations
+     *     Transaction txn = context.getCurrentTransaction();
+     *
+     *     return someAsyncOperation()
+     *         .thenApply(result -> {
+     *             // Use the locally stored transaction reference
+     *             // instead of calling context.getCurrentTransaction() here
+     *               return "processed";
+     *         });
+     * }
+     * }</pre>
+     *
+     * @return the current transaction, or null if transactions are disabled
+     */
+    Transaction getCurrentTransaction();
+}
+```
+
+```java
+class ContextImpl implements Context, SinkContext, SourceContext, 
AutoCloseable {
+    // Existing fields...
+
+    private Transaction currentTransaction;
+
+    // Existing methods...
+
+    public void setCurrentTransaction(Transaction transaction) {
+        this.currentTransaction = transaction;
+    }
+
+    @Override
+    public Transaction getCurrentTransaction() {
+        return currentTransaction;
+    }
+
+    @Override
+    public <T> TypedMessageBuilder<T> newOutputMessage(String topicName, 
Schema<T> schema)
+          throws PulsarClientException {
+      MessageBuilderImpl<T> messageBuilder = new MessageBuilderImpl<>();
+      TypedMessageBuilder<T> typedMessageBuilder;
+      Producer<T> producer = getProducer(topicName, schema);
+    
+      if (currentTransaction != null) {
+          if (schema != null) {
+              // Uses the new API that supports both schema and transaction
+              typedMessageBuilder = producer.newMessage(schema, 
currentTransaction);
+          } else {
+              typedMessageBuilder = producer.newMessage(currentTransaction);
+          }
+      } else if (schema != null) {
+          typedMessageBuilder = producer.newMessage(schema);
+      } else {
+          typedMessageBuilder = producer.newMessage();
+      }
+    
+      messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
+      return messageBuilder;
+    }
+}
+```
+
+## Asynchronous Functions Support
+
+It's important to note that Pulsar Functions supports asynchronous processing, 
where functions can return `CompletableFuture` objects. This proposal ensures 
that transaction support works seamlessly with both synchronous and 
asynchronous functions.
+
+For asynchronous functions:
+1. The transaction is created at the beginning of message processing, just 
like for synchronous functions
+2. When the function returns a `CompletableFuture`, the transaction is 
maintained until the future completes
+3. When the future completes successfully, the transaction is committed
+4. If the future completes exceptionally, the transaction is aborted
+
+# Public-facing Changes
+
+## Configuration
+
+Transaction support will be configured using the new TransactionConfig class:
+
+```java
+TransactionConfig txnConfig = new TransactionConfig();
+txnConfig.setTransactionMode(TransactionMode.MANAGED);
+txnConfig.setTransactionTimeoutMs(30000L); // 30 seconds
+
+FunctionConfig functionConfig = new FunctionConfig();
+functionConfig.setTransaction(txnConfig);
+// Other configuration...
+```
+
+## CLI
+
+The Pulsar Admin CLI will be updated to support these new configuration 
options:
+
+```bash
+$ pulsar-admin functions create \
+  --auto-transactions-enabled true \
+  --transaction-timeout-ms 30000 \
+  ... other options ...
+```
+
+Similarly, the CLI will support updating these options with the update command.
+
+# Metrics
+
+The following new metrics will be added to track transaction usage in 
functions:
+
+1. `pulsar_function_txn_created_total`: Counter tracking the total number of 
transactions created by functions
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Count
+
+2. `pulsar_function_txn_committed_total`: Counter tracking successfully 
committed transactions
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Count
+
+3. `pulsar_function_txn_aborted_total`: Counter tracking aborted transactions
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Count
+
+4. `pulsar_function_txn_timeout_total`: Counter tracking transactions that 
timed out
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Count
+
+5. `pulsar_function_txn_latency`: Histogram of transaction duration from 
creation to commit/abort
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Milliseconds

Review Comment:
   one concern about histograms is that they add a lot of volume to the 
metrics. it would be useful if this metric would be configurable in the 
functions worker and turned off by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to