wzhramc commented on code in PR #24704:
URL: https://github.com/apache/pulsar/pull/24704#discussion_r2329816562
##########
pip/pip-439.md:
##########
@@ -0,0 +1,573 @@
+# PIP-439: Adding Transaction Support to Pulsar Functions Through
Auto-Transaction Wrapping
+
+# Background knowledge
+
+Apache Pulsar transactions enable atomic operations across multiple topics,
allowing producers to send messages and consumers to acknowledge messages as a
single unit
+of work. This provides the foundation for exactly-once processing semantics in
streaming applications.
+
+## Transaction Architecture
+
+Pulsar's transaction system consists of four key components:
+
+1. **Transaction Coordinator (TC)**: A broker module that manages transaction
lifecycles, allocates transaction IDs, and orchestrates the commit/abort
process.
+
+2. **Transaction Log**: A persistent topic storing transaction metadata and
state changes, enabling recovery after failures.
+
+3. **Transaction Buffer**: Temporarily stores messages produced within
transactions, making them visible to consumers only after commit.
+
+4. **Pending Acknowledge State**: Tracks message acknowledgments within
transactions, preventing conflicts between competing transactions.
+
+## Transaction Lifecycle
+
+Transactions follow a defined lifecycle:
+
+1. **OPEN**: Client obtains a transaction ID from the Transaction Coordinator.
+2. **PRODUCING/ACKNOWLEDGING**: Client registers topic
partitions/subscriptions with the TC, then produces/acknowledges messages
within the transaction.
+3. **COMMITTING/ABORTING**: Client requests to end the transaction, TC begins
two-phase commit.
+4. **COMMITTED/ABORTED**: After processing all partitions, TC finalizes the
transaction state.
+5. **TIMED_OUT**: Transactions exceeding their timeout are automatically
aborted.
+
+## Transaction Guarantees
+
+Pulsar transactions provide:
+- Atomic writes across multiple topics
+- Conditional acknowledgment to prevent duplicate processing by "zombie"
instances
+- Visibility control ensuring consumers only see committed transaction messages
+- Support for exactly-once processing in consume-transform-produce patterns
+
+# Motivation
+
+Currently, Pulsar Functions cannot publish to multiple topics transactionally,
which is a significant limitation for use cases requiring atomic multi-topic
+publishing. For instance, if a function processes an input message and needs
to publish related updates to several output topics, there's no guarantee that
all
+operations will succeed atomically.
+
+This limitation prevents building robust stream processing applications that
require exactly-once semantics across multiple input and output topics. Without
+transaction support in Functions, developers must implement their own error
handling and retry mechanisms, which can be complex and error-prone.
+
+Adding transaction support to Pulsar Functions would finally ensure message
processing atomicity.
+
+# Goals
+
+## In Scope
+
+1. Enable automatic transaction support for Pulsar Functions through
configuration
+2. Allow Functions to publish messages to multiple topics within a single
transaction
+3. Support transactional acknowledgment of input messages
+4. Ensure transactions are committed only if message processing completes
successfully
+5. Provide transaction timeout configuration for Functions
+
+## Out of Scope
+
+1. Exposing explicit transaction management APIs in the Functions interface
+2. Supporting multi-function transactions (transactions spanning multiple
function invocations)
+3. Adding transaction support to Pulsar IO connectors
+4. Changes to the Function interface itself
+
+# High Level Design
+
+The proposed solution introduces automatic transaction wrapping for Pulsar
Functions through configuration settings. When enabled, each function execution
will be
+automatically wrapped in a transaction without requiring code changes to the
function implementation.
+
+The general flow will be:
+1. Function is configured with `autoTransactionsEnabled: true`
+2. When a message arrives, the function runtime creates a new transaction
+3. The function processes the message with an enhanced Context that uses the
transaction
+4. Any output messages are published using the transaction
+5. Input message acknowledgment is performed within the transaction
+6. If the function completes successfully, the transaction is committed
+7. If the function throws an exception, the transaction is aborted
+
+This approach provides transaction support in a way that is transparent to
function implementers, requiring only configuration changes rather than code
changes.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Configuration Classes
+
+First, we need to extend `FunctionConfig` to include transaction-related
settings:
+
+```java
+public class FunctionConfig {
+ // Whether to automatically wrap each function call in a transaction
+ private boolean autoTransactionsEnabled = false;
+
+ // Default transaction timeout in milliseconds
+ private long transactionTimeoutMs = 60000;
+
+ // Getters and setters
+ public boolean isAutoTransactionsEnabled() {
+ return autoTransactionsEnabled;
+ }
+
+ public void setAutoTransactionsEnabled(boolean autoTransactionsEnabled) {
+ this.autoTransactionsEnabled = autoTransactionsEnabled;
+ }
+
+ public long getTransactionTimeoutMs() {
+ return transactionTimeoutMs;
+ }
+
+ public void setTransactionTimeoutMs(long transactionTimeoutMs) {
+ this.transactionTimeoutMs = transactionTimeoutMs;
+ }
+}
+```
+
+We also need to update the protobuf definition for FunctionDetails to include
these fields:
+
+```java
+message FunctionDetails {
+ // Other existing fields...
+
+ // Whether to automatically wrap function execution in a transaction
+ bool autoTransactionsEnabled = X;
+
+ // Default transaction timeout in milliseconds
+ int64 transactionTimeoutMs = Y;
+}
+```
+
+### Modifications to ContextImpl
+
+The ContextImpl class needs to be updated to track the current transaction:
+
+```java
+class ContextImpl implements Context, SinkContext, SourceContext,
AutoCloseable {
+ // Existing fields...
+
+ private Transaction currentTransaction;
+
+ // Existing methods...
+
+ public void setCurrentTransaction(Transaction transaction) {
+ this.currentTransaction = transaction;
+ }
+
+ public Transaction getCurrentTransaction() {
+ return currentTransaction;
+ }
+
+ @Override
+ public <T> CompletableFuture<Void> publish(String topicName, T object) {
+ if (currentTransaction != null) {
+ try {
+ return newOutputMessage(topicName, null)
+ .value(object)
+ .sendAsync(currentTransaction)
+ .thenApply(msgId -> null);
+ } catch (PulsarClientException e) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+ } else {
+ // Use existing implementation
+ return publish(topicName, object, "");
+ }
+ }
+
+ @Override
+ public <T> TypedMessageBuilder<T> newOutputMessage(String topicName,
Schema<T> schema)
+ throws PulsarClientException {
+ MessageBuilderImpl<T> messageBuilder = new MessageBuilderImpl<>();
+ TypedMessageBuilder<T> typedMessageBuilder;
+ Producer<T> producer = getProducer(topicName, schema);
+
+ if (schema != null) {
+ typedMessageBuilder = producer.newMessage(schema);
+ } else {
+ typedMessageBuilder = producer.newMessage();
+ }
+
+ // If there's an active transaction, associate the message with it
+ if (currentTransaction != null) {
+ typedMessageBuilder =
typedMessageBuilder.sendTransaction(currentTransaction);
+ }
+
+ messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
+ return messageBuilder;
+ }
+
+ @Override
+ public Record<?> getCurrentRecord() {
+ Record<?> record = super.getCurrentRecord();
+ if (record != null && currentTransaction != null) {
+ return new TransactionalRecordWrapper<>(record,
currentTransaction);
+ }
+ return record;
+ }
+}
+```
+
+### TransactionalRecordWrapper Implementation
+
+```java
+public class TransactionalRecordWrapper<T> implements Record<T> {
+ private final Record<T> delegate;
+ private final Transaction transaction;
+ private static final Logger log =
LoggerFactory.getLogger(TransactionalRecordWrapper.class);
+
+ public TransactionalRecordWrapper(Record<T> delegate, Transaction
transaction) {
+ this.delegate = delegate;
+ this.transaction = transaction;
+ }
+
+ @Override
+ public void ack() {
+ if (transaction != null && delegate instanceof PulsarRecord) {
+ PulsarRecord<?> pulsarRecord = (PulsarRecord<?>) delegate;
+ pulsarRecord.getConsumer().ifPresent(consumer -> {
+ try {
+ consumer.acknowledgeAsync(pulsarRecord.getMessageId(),
transaction);
+ } catch (Exception e) {
+ log.error("Failed to transactionally acknowledge message",
e);
+ delegate.fail();
+ }
+ });
+ } else {
+ // Fall back to non-transactional ack
+ delegate.ack();
+ }
+ }
+
+ @Override
+ public void fail() {
+ delegate.fail();
+ }
+
+ // Delegate all other methods to the wrapped record
+ @Override
+ public Optional<String> getKey() {
+ return delegate.getKey();
+ }
+
+ @Override
+ public T getValue() {
+ return delegate.getValue();
+ }
+
+ @Override
+ public Optional<Long> getEventTime() {
+ return delegate.getEventTime();
+ }
+
+ @Override
+ public Optional<String> getPartitionId() {
+ return delegate.getPartitionId();
+ }
+
+ @Override
+ public Optional<String> getTopicName() {
+ return delegate.getTopicName();
+ }
+
+ @Override
+ public Schema<T> getSchema() {
+ return delegate.getSchema();
+ }
+
+ @Override
+ public Optional<Long> getRecordSequence() {
+ return delegate.getRecordSequence();
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return delegate.getProperties();
+ }
+}
+```
+
+### Modifications to JavaInstanceRunnable
+
+The JavaInstanceRunnable class needs to be updated to handle auto transactions:
+
+```java
+public class JavaInstanceRunnable implements AutoCloseable, Runnable {
+ // Existing fields...
+
+ @Override
+ public void run() {
+ try {
+ setup();
+
+ while (true) {
+ currentRecord = readInput();
+
+ // increment number of records received from source
+ stats.incrTotalReceived();
+
+ // If auto transactions are enabled, wrap the function
execution in a transaction
+ if
(instanceConfig.getFunctionDetails().getAutoTransactionsEnabled()) {
+ processWithTransaction();
+ } else {
+ processNormally();
+ }
+
+ if (deathException != null) {
+ throw deathException;
+ }
+ }
+ } catch (Throwable t) {
+ // Existing error handling...
+ } finally {
+ close();
+ }
+ }
+
+ private void processNormally() {
+ // Existing logic for non-transactional execution
+ Thread currentThread = Thread.currentThread();
+ Consumer<Throwable> asyncErrorHandler = throwable ->
currentThread.interrupt();
+
+ // set last invocation time
+ stats.setLastInvocation(System.currentTimeMillis());
+
+ // start time for process latency stat
+ stats.processTimeStart();
+
+ // process the message
+ Thread.currentThread().setContextClassLoader(functionClassLoader);
+ JavaExecutionResult result = javaInstance.handleMessage(
+ currentRecord,
+ currentRecord.getValue(),
+ this::handleResult,
+ asyncErrorHandler);
+ Thread.currentThread().setContextClassLoader(instanceClassLoader);
+
+ // register end time
+ stats.processTimeEnd();
+
+ if (result != null) {
+ // process the synchronous results
+ handleResult(currentRecord, result);
+ }
+ }
+
+ private void processWithTransaction() {
Review Comment:
I've removed that section from the proposal since I wasn't aware of async
support when I wrote the initial draft. The proposal now includes adding
transactional support for async functions as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]