DanielCarter-stack commented on PR #10605:
URL: https://github.com/apache/seatunnel/pull/10605#issuecomment-4069306484

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10605", "part": 1, 
"total": 1} -->
   ### Issue 1: MultiTablePartitionDiscoverer Concurrency Safety Issue
   
   **Location**: 
`seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/MultiTablePartitionDiscoverer.java:42`
   
   ```java
   public class MultiTablePartitionDiscoverer implements PulsarDiscoverer {
       private final List<TableDiscovererPair> discovererPairs;
       private final Map<TopicPartition, TablePath> partitionToTablePath = new 
HashMap<>();  // Not thread-safe
   
       @Override
       public Set<TopicPartition> getSubscribedTopicPartitions(PulsarAdmin 
admin) {
           Set<TopicPartition> allPartitions = new HashSet<>();
           partitionToTablePath.clear();  // ⚠️ Concurrent calls may cause data 
loss
   
           for (TableDiscovererPair pair : discovererPairs) {
               Set<TopicPartition> partitions = 
pair.discoverer.getSubscribedTopicPartitions(admin);
               for (TopicPartition tp : partitions) {
                   TablePath existing = partitionToTablePath.putIfAbsent(tp, 
pair.tablePath);
                   // ...
               }
               allPartitions.addAll(partitions);
           }
           return allPartitions;
       }
   }
   ```
   
   **Related Context**:
   - Caller: `PulsarSplitEnumerator.discoverySplits()` (line ~170)
   - Concurrency control: `PulsarSplitEnumerator` uses `synchronized 
(stateLock)` to protect `discoverySplits()`
   - Scheduled task: `executor.scheduleAtFixedRate(...)` single-threaded 
scheduling
   
   **Problem Description**:
   Although `discoverySplits()` is protected by `synchronized` in the current 
implementation, and the scheduled task is single-threaded, 
`partitionToTablePath` is an instance variable that gets cleared every time 
`getSubscribedTopicPartitions()` is called. If other code paths call this 
method concurrently in the future, it will lead to:
   
   1. `partitionToTablePath.clear()` causes loss of already-built mappings
   2. `putIfAbsent()` may produce inconsistent tablePath mappings in concurrent 
scenarios
   
   **Potential Risks**:
   - **Risk 1**: If `PulsarSplitEnumerator`'s concurrency control logic changes 
(e.g., removing `synchronized`), it will cause runtime errors
   - **Risk 2**: Other code directly calling 
`MultiTablePartitionDiscoverer.getSubscribedTopicPartitions()` will trigger 
race conditions
   
   **Impact Scope**:
   - Direct impact: `MultiTablePartitionDiscoverer`'s multi-table routing logic
   - Indirect impact: All subsequent split allocations that depend on 
`partitionToTablePath`
   - Affected area: Limited to Pulsar Connector, multi-table mode
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   ```java
   // Option 1: Use ConcurrentHashMap + remove instead of clear
   private final Map<TopicPartition, TablePath> partitionToTablePath = new 
ConcurrentHashMap<>();
   
   @Override
   public Set<TopicPartition> getSubscribedTopicPartitions(PulsarAdmin admin) {
       Set<TopicPartition> allPartitions = new HashSet<>();
       partitionToTablePath.keySet().removeAll(partitionToTablePath.keySet());  
// Clear but keep object reference
       
       // Or more efficient: directly create new Map (if memory is not a 
concern)
       Map<TopicPartition, TablePath> newMapping = new HashMap<>();
       for (TableDiscovererPair pair : discovererPairs) {
           Set<TopicPartition> partitions = 
pair.discoverer.getSubscribedTopicPartitions(admin);
           for (TopicPartition tp : partitions) {
               newMapping.putIfAbsent(tp, pair.tablePath);
           }
           allPartitions.addAll(partitions);
       }
       partitionToTablePath = newMapping;  // Atomic replacement
       return allPartitions;
   }
   
   // Option 2: Change partitionToTablePath to local variable (recommended)
   @Override
   public Set<TopicPartition> getSubscribedTopicPartitions(PulsarAdmin admin) {
       Set<TopicPartition> allPartitions = new HashSet<>();
       Map<TopicPartition, TablePath> partitionToTablePath = new HashMap<>();  
// Local variable
       
       for (TableDiscovererPair pair : discovererPairs) {
           // ...
       }
       
       // If caching is needed for getTablePath(), use immutable final field
       this.cachedPartitionToTablePath = 
Collections.unmodifiableMap(partitionToTablePath);
       return allPartitions;
   }
   ```
   
   **Rationale**:
   - Option 1 uses `ConcurrentHashMap` to provide basic thread safety, but 
`clear()` still needs careful handling
   - Option 2 is more thorough: change `partitionToTablePath` to a method-local 
variable, avoiding state management complexity
   - If `getTablePath()` needs to persist between method calls, use an 
immutable cache and update via CAS
   
   ---
   
   ### Issue 2: PulsarSourceReader Concurrent Map Creation Overhead
   
   **Location**: 
`seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java:118`
   
   ```java
   @Override
   public void pollNext(Collector<T> output) throws Exception {
       Map<TablePath, Collector<T>> collectorCache = new HashMap<>();  // ⚠️ 
Creates new Map on every pollNext
       for (int i = 0; i < batchSize; i++) {
           // ...
           Collector<T> collector = resolveCollector(tablePath, output, 
collectorCache);
           // ...
       }
   }
   ```
   
   **Related Context**:
   - `pollNext()` will be called frequently (controlled by `pollInterval`, 
default may be very short)
   - `batchSize` default value needs to check configuration (possibly in the 
100-1000 range)
   
   **Problem Description**:
   Although `collectorCache`'s design is to avoid creating 
`TableIdInjectingCollector` instances frequently within the loop, each 
`pollNext()` call creates a new `HashMap` instance, which generates unnecessary 
GC pressure in high-frequency call scenarios.
   
   **Potential Risks**:
   - **Risk 1**: High-frequency GC pressure, affecting throughput
   - **Risk 2**: If `batchSize` is large (1000+), Map expansion operations may 
increase latency
   
   **Impact Scope**:
   - Direct impact: `PulsarSourceReader.pollNext()` performance
   - Indirect impact: Overall job throughput
   - Affected area: All Pulsar Source jobs (single-table and multi-table)
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   ```java
   public class PulsarSourceReader<T> implements SourceReader<T, 
PulsarPartitionSplit> {
       // Promote collectorCache to instance field (only used in multi-table 
mode)
       private final Map<TablePath, Collector<T>> collectorCache;
       
       public PulsarSourceReader(...) {
           // ...
           this.collectorCache = injectTableIdForRouting ? new HashMap<>() : 
Collections.emptyMap();
       }
       
       @Override
       public void pollNext(Collector<T> output) throws Exception {
           if (injectTableIdForRouting) {
               collectorCache.clear();  // Reuse Map instance
           }
           
           for (int i = 0; i < batchSize; i++) {
               // ...
               Collector<T> collector = resolveCollector(tablePath, output, 
collectorCache);
               // ...
           }
       }
   }
   ```
   
   **Rationale**:
   - In single-table mode `injectTableIdForRouting = false`, 
`resolveCollector()` directly returns the original output without using cache
   - Only multi-table mode needs caching to reuse Map instances and avoid GC
   - `clear()` operation is more efficient than creating a new `HashMap`
   
   ---
   
   ### Issue 4: PulsarSplitEnumerator.hasTopicPattern() Duplicate Type Checking
   
   **Location**: 
`seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java:268`
   
   ```java
   private boolean hasTopicPattern(PulsarDiscoverer discoverer) {
       if (discoverer instanceof TopicPatternDiscoverer) {
           return true;
       }
       return discoverer instanceof MultiTablePartitionDiscoverer
               && ((MultiTablePartitionDiscoverer) 
discoverer).hasTopicPattern();
   }
   ```
   
   **Related Context**:
   - This method is called in two places: constructor and 
`validateBoundedDiscovery()`
   - Validation logic in the constructor duplicates this method (dev branch's 
`PulsarSource` constructor also has similar logic)
   
   **Problem Description**:
   `hasTopicPattern()`'s logic duplicates with 
`PulsarSource.validateBoundedDiscovery()`:
   ```java
   // PulsarSource.java:~191
   private void validateBoundedDiscovery() {
       boolean hasTopicPattern;
       if (partitionDiscoverer instanceof TopicPatternDiscoverer) {
           hasTopicPattern = true;
       } else if (partitionDiscoverer instanceof MultiTablePartitionDiscoverer) 
{
           hasTopicPattern = ((MultiTablePartitionDiscoverer) 
partitionDiscoverer).hasTopicPattern();
       } else {
           hasTopicPattern = false;
       }
       // ...
   }
   ```
   
   **Potential Risks**:
   - **Risk 1**: If new `PulsarDiscoverer` implementations are added in the 
future, both pieces of logic need to be modified
   - **Risk 2**: Code duplication increases maintenance costs
   
   **Impact Scope**:
   - Direct impact: PulsarSource and PulsarSplitEnumerator
   - Affected area: Pulsar Connector
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   ```java
   // Add default method in PulsarDiscoverer interface
   public interface PulsarDiscoverer extends Serializable {
       Set<TopicPartition> getSubscribedTopicPartitions(PulsarAdmin admin);
       
       default boolean hasTopicPattern() {
           return this instanceof TopicPatternDiscoverer;
       }
   }
   
   // TopicPatternDiscoverer doesn't need to override (returns true)
   // MultiTablePartitionDiscoverer overrides:
   @Override
   public boolean hasTopicPattern() {
       return 
discovererPairs.stream().anyMatch(TableDiscovererPair::isTopicPattern);
   }
   
   // Other implementations (TopicListDiscoverer) automatically return false
   
   // Simplify caller code
   private void validateBoundedDiscovery() {
       if (partitionDiscoverer.hasTopicPattern()  // Unified call
               && partitionDiscoveryIntervalMs > 0
               && Boundedness.BOUNDED == getBoundedness()) {
           throw new PulsarConnectorException(...);
       }
   }
   ```
   
   **Rationale**:
   - Follows Open-Closed Principle: when adding new `PulsarDiscoverer` 
implementations, only need to implement the interface, no need to modify the 
caller
   - Eliminates code duplication
   - Type-safe polymorphic calls
   
   ---
   
   ### Issue 5: TableIdInjectingCollector Silent Skipping for Non-SeaTunnelRow 
Records
   
   **Location**: 
`seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/TableIdInjectingCollector.java:35`
   
   ```java
   @Override
   public void collect(T record) {
       if (record instanceof SeaTunnelRow) {  // ⚠️ Type check + silent skip
           ((SeaTunnelRow) record).setTableId(tableId);
       }
       delegate.collect(record);
   }
   ```
   
   **Related Context**:
   - `PulsarSourceReader` generic type is `<T>`, but fixed to `SeaTunnelRow` in 
actual use
   - `DeserializationSchema<SeaTunnelRow>` is the deserializer type
   
   **Problem Description**:
   Although `T` is always `SeaTunnelRow` in the current implementation, the 
code uses generics and `instanceof` checks, implying theoretically other types 
might exist. If a non-`SeaTunnelRow` type is passed, the code will silently 
skip tableId setting, potentially leading to:
   
   1. In multi-table mode, some records don't have tableId, causing downstream 
routing failures
   2. Problems are difficult to troubleshoot (no logs or exceptions)
   
   **Potential Risks**:
   - **Risk 1**: If other Record types exist in the future (e.g., 
`SchemaChangeEvent`), tableId injection will fail
   - **Risk 2**: No clear error message when types don't match
   
   **Impact Scope**:
   - Direct impact: `TableIdInjectingCollector`
   - Indirect impact: Multi-table mode data routing
   - Affected area: Pulsar multi-table Source
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   ```java
   // Option 1: Remove generics, make type explicit
   public class TableIdInjectingCollector implements Collector<SeaTunnelRow> {  
// Remove <T>
       private final Collector<SeaTunnelRow> delegate;  // Explicit type
       private final String tableId;
   
       @Override
       public void collect(SeaTunnelRow record) {  // Remove generics
           record.setTableId(tableId);  // Remove instanceof check
           delegate.collect(record);
       }
   }
   
   // Option 2: Keep generics but add logging
   @Override
   public void collect(T record) {
       if (!(record instanceof SeaTunnelRow)) {
           LOG.warn("Record is not a SeaTunnelRow, cannot inject tableId. 
Record type: {}", 
                    record != null ? record.getClass().getName() : "null");
       }
       delegate.collect(record);
   }
   ```
   
   **Rationale**:
   - Option 1 is more thorough: current SeaTunnel source connectors only 
process `SeaTunnelRow`, generics are over-design
   - Option 2 has better compatibility: reserves future extensibility but 
increases observability
   - Recommend option 1, because `SchemaChangeEvent` is handled through 
`collect(SchemaChangeEvent)` overloading, no need to inject tableId
   
   ---
   
   ### Issue 6: PulsarSource.setJobContext() Batch Mode Validation Timing Issue
   
   **Location**: 
`seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java:183`
   
   ```java
   @Override
   public void setJobContext(JobContext jobContext) {
       if (multiTableConfig.isMultiTable()
               && JobMode.BATCH.equals(jobContext.getJobMode())
               && getBoundedness() == Boundedness.UNBOUNDED) {
           throw new PulsarConnectorException(
                   SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                   "Pulsar source does not support unbounded multi-table 
configuration in batch mode.");
       }
   }
   ```
   
   **Related Context**:
   - This validation depends on `jobContext.getJobMode()`, which can only be 
obtained at runtime
   - PulsarSourceFactory's `validateSourceOptions()` cannot access JobMode
   
   **Problem Description**:
   Configuration validation is delayed to `setJobContext()` execution, which 
means:
   
   1. Users will only receive errors after submitting the job, not during the 
configuration validation phase
   2. The error message mentions "multi-table configuration" and "unbounded", 
but users may not understand why this combination is not supported
   
   **Potential Risks**:
   - **Risk 1**: Poor user experience (delayed error discovery)
   - **Risk 2**: Semantic issue of using UNBOUNDED source in Batch mode: the 
job may never complete
   
   **Impact Scope**:
   - Direct impact: Batch mode + multi-table + UNBOUNDED configuration 
combination
   - Affected area: Multi-table mode users
   
   **Severity**: MAJOR (but does not block merge)
   
   **Improvement Suggestions**:
   ```java
   // Option 1: Clearly document limitations in documentation
   // docs/en/connectors/source/Pulsar.md
   /**
    * **Note**: In batch mode, all tables must be bounded (cursor.stop.mode != 
NEVER).
    * If any table uses cursor.stop.mode = NEVER, the source is considered 
unbounded
    * and the batch job will fail at runtime. Use streaming mode for unbounded 
sources.
    */
   
   // Option 2: Improve error messages, provide fix guidance
   throw new PulsarConnectorException(
           SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
           String.format(
               "Pulsar source does not support unbounded multi-table 
configuration in batch mode.%n" +
               "Current config: multi-table=%s, boundedness=%s, job-mode=%s%n" +
               "Solution: Either%n" +
               "  1. Set cursor.stop.mode to LATEST or TIMESTAMP for all tables 
in tables_configs, or%n" +
               "  2. Use streaming mode (remove 'job-mode = BATCH' from job 
config)",
               multiTableConfig.isMultiTable(), getBoundedness(), 
jobContext.getJobMode()));
   
   // Option 3: Add early warning in PulsarSourceFactory (if can be inferred)
   // But this requires parsing tables_configs in advance, which is complex
   ```
   
   **Rationale**:
   - Option 1 is the minimal change, improving user awareness
   - Option 2 provides a specific solution, reducing user troubleshooting time
   - Option 3 is ideal but complex to implement, not recommended
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to