DanielCarter-stack commented on PR #10605:
URL: https://github.com/apache/seatunnel/pull/10605#issuecomment-4069306484
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10605", "part": 1,
"total": 1} -->
### Issue 1: MultiTablePartitionDiscoverer Concurrency Safety Issue
**Location**:
`seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/MultiTablePartitionDiscoverer.java:42`
```java
public class MultiTablePartitionDiscoverer implements PulsarDiscoverer {
private final List<TableDiscovererPair> discovererPairs;
private final Map<TopicPartition, TablePath> partitionToTablePath = new
HashMap<>(); // Not thread-safe
@Override
public Set<TopicPartition> getSubscribedTopicPartitions(PulsarAdmin
admin) {
Set<TopicPartition> allPartitions = new HashSet<>();
partitionToTablePath.clear(); // ⚠️ Concurrent calls may cause data
loss
for (TableDiscovererPair pair : discovererPairs) {
Set<TopicPartition> partitions =
pair.discoverer.getSubscribedTopicPartitions(admin);
for (TopicPartition tp : partitions) {
TablePath existing = partitionToTablePath.putIfAbsent(tp,
pair.tablePath);
// ...
}
allPartitions.addAll(partitions);
}
return allPartitions;
}
}
```
**Related Context**:
- Caller: `PulsarSplitEnumerator.discoverySplits()` (line ~170)
- Concurrency control: `PulsarSplitEnumerator` uses `synchronized
(stateLock)` to protect `discoverySplits()`
- Scheduled task: `executor.scheduleAtFixedRate(...)` single-threaded
scheduling
**Problem Description**:
Although `discoverySplits()` is protected by `synchronized` in the current
implementation, and the scheduled task is single-threaded,
`partitionToTablePath` is an instance variable that gets cleared every time
`getSubscribedTopicPartitions()` is called. If other code paths call this
method concurrently in the future, it will lead to:
1. `partitionToTablePath.clear()` causes loss of already-built mappings
2. `putIfAbsent()` may produce inconsistent tablePath mappings in concurrent
scenarios
**Potential Risks**:
- **Risk 1**: If `PulsarSplitEnumerator`'s concurrency control logic changes
(e.g., removing `synchronized`), it will cause runtime errors
- **Risk 2**: Other code directly calling
`MultiTablePartitionDiscoverer.getSubscribedTopicPartitions()` will trigger
race conditions
**Impact Scope**:
- Direct impact: `MultiTablePartitionDiscoverer`'s multi-table routing logic
- Indirect impact: All subsequent split allocations that depend on
`partitionToTablePath`
- Affected area: Limited to Pulsar Connector, multi-table mode
**Severity**: MINOR
**Improvement Suggestions**:
```java
// Option 1: Use ConcurrentHashMap + remove instead of clear
private final Map<TopicPartition, TablePath> partitionToTablePath = new
ConcurrentHashMap<>();
@Override
public Set<TopicPartition> getSubscribedTopicPartitions(PulsarAdmin admin) {
Set<TopicPartition> allPartitions = new HashSet<>();
partitionToTablePath.keySet().removeAll(partitionToTablePath.keySet());
// Clear but keep object reference
// Or more efficient: directly create new Map (if memory is not a
concern)
Map<TopicPartition, TablePath> newMapping = new HashMap<>();
for (TableDiscovererPair pair : discovererPairs) {
Set<TopicPartition> partitions =
pair.discoverer.getSubscribedTopicPartitions(admin);
for (TopicPartition tp : partitions) {
newMapping.putIfAbsent(tp, pair.tablePath);
}
allPartitions.addAll(partitions);
}
partitionToTablePath = newMapping; // Atomic replacement
return allPartitions;
}
// Option 2: Change partitionToTablePath to local variable (recommended)
@Override
public Set<TopicPartition> getSubscribedTopicPartitions(PulsarAdmin admin) {
Set<TopicPartition> allPartitions = new HashSet<>();
Map<TopicPartition, TablePath> partitionToTablePath = new HashMap<>();
// Local variable
for (TableDiscovererPair pair : discovererPairs) {
// ...
}
// If caching is needed for getTablePath(), use immutable final field
this.cachedPartitionToTablePath =
Collections.unmodifiableMap(partitionToTablePath);
return allPartitions;
}
```
**Rationale**:
- Option 1 uses `ConcurrentHashMap` to provide basic thread safety, but
`clear()` still needs careful handling
- Option 2 is more thorough: change `partitionToTablePath` to a method-local
variable, avoiding state management complexity
- If `getTablePath()` needs to persist between method calls, use an
immutable cache and update via CAS
---
### Issue 2: PulsarSourceReader Concurrent Map Creation Overhead
**Location**:
`seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java:118`
```java
@Override
public void pollNext(Collector<T> output) throws Exception {
Map<TablePath, Collector<T>> collectorCache = new HashMap<>(); // ⚠️
Creates new Map on every pollNext
for (int i = 0; i < batchSize; i++) {
// ...
Collector<T> collector = resolveCollector(tablePath, output,
collectorCache);
// ...
}
}
```
**Related Context**:
- `pollNext()` will be called frequently (controlled by `pollInterval`,
default may be very short)
- `batchSize` default value needs to check configuration (possibly in the
100-1000 range)
**Problem Description**:
Although `collectorCache`'s design is to avoid creating
`TableIdInjectingCollector` instances frequently within the loop, each
`pollNext()` call creates a new `HashMap` instance, which generates unnecessary
GC pressure in high-frequency call scenarios.
**Potential Risks**:
- **Risk 1**: High-frequency GC pressure, affecting throughput
- **Risk 2**: If `batchSize` is large (1000+), Map expansion operations may
increase latency
**Impact Scope**:
- Direct impact: `PulsarSourceReader.pollNext()` performance
- Indirect impact: Overall job throughput
- Affected area: All Pulsar Source jobs (single-table and multi-table)
**Severity**: MINOR
**Improvement Suggestions**:
```java
public class PulsarSourceReader<T> implements SourceReader<T,
PulsarPartitionSplit> {
// Promote collectorCache to instance field (only used in multi-table
mode)
private final Map<TablePath, Collector<T>> collectorCache;
public PulsarSourceReader(...) {
// ...
this.collectorCache = injectTableIdForRouting ? new HashMap<>() :
Collections.emptyMap();
}
@Override
public void pollNext(Collector<T> output) throws Exception {
if (injectTableIdForRouting) {
collectorCache.clear(); // Reuse Map instance
}
for (int i = 0; i < batchSize; i++) {
// ...
Collector<T> collector = resolveCollector(tablePath, output,
collectorCache);
// ...
}
}
}
```
**Rationale**:
- In single-table mode `injectTableIdForRouting = false`,
`resolveCollector()` directly returns the original output without using cache
- Only multi-table mode needs caching to reuse Map instances and avoid GC
- `clear()` operation is more efficient than creating a new `HashMap`
---
### Issue 4: PulsarSplitEnumerator.hasTopicPattern() Duplicate Type Checking
**Location**:
`seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java:268`
```java
private boolean hasTopicPattern(PulsarDiscoverer discoverer) {
if (discoverer instanceof TopicPatternDiscoverer) {
return true;
}
return discoverer instanceof MultiTablePartitionDiscoverer
&& ((MultiTablePartitionDiscoverer)
discoverer).hasTopicPattern();
}
```
**Related Context**:
- This method is called in two places: constructor and
`validateBoundedDiscovery()`
- Validation logic in the constructor duplicates this method (dev branch's
`PulsarSource` constructor also has similar logic)
**Problem Description**:
`hasTopicPattern()`'s logic duplicates with
`PulsarSource.validateBoundedDiscovery()`:
```java
// PulsarSource.java:~191
private void validateBoundedDiscovery() {
boolean hasTopicPattern;
if (partitionDiscoverer instanceof TopicPatternDiscoverer) {
hasTopicPattern = true;
} else if (partitionDiscoverer instanceof MultiTablePartitionDiscoverer)
{
hasTopicPattern = ((MultiTablePartitionDiscoverer)
partitionDiscoverer).hasTopicPattern();
} else {
hasTopicPattern = false;
}
// ...
}
```
**Potential Risks**:
- **Risk 1**: If new `PulsarDiscoverer` implementations are added in the
future, both pieces of logic need to be modified
- **Risk 2**: Code duplication increases maintenance costs
**Impact Scope**:
- Direct impact: PulsarSource and PulsarSplitEnumerator
- Affected area: Pulsar Connector
**Severity**: MINOR
**Improvement Suggestions**:
```java
// Add default method in PulsarDiscoverer interface
public interface PulsarDiscoverer extends Serializable {
Set<TopicPartition> getSubscribedTopicPartitions(PulsarAdmin admin);
default boolean hasTopicPattern() {
return this instanceof TopicPatternDiscoverer;
}
}
// TopicPatternDiscoverer doesn't need to override (returns true)
// MultiTablePartitionDiscoverer overrides:
@Override
public boolean hasTopicPattern() {
return
discovererPairs.stream().anyMatch(TableDiscovererPair::isTopicPattern);
}
// Other implementations (TopicListDiscoverer) automatically return false
// Simplify caller code
private void validateBoundedDiscovery() {
if (partitionDiscoverer.hasTopicPattern() // Unified call
&& partitionDiscoveryIntervalMs > 0
&& Boundedness.BOUNDED == getBoundedness()) {
throw new PulsarConnectorException(...);
}
}
```
**Rationale**:
- Follows Open-Closed Principle: when adding new `PulsarDiscoverer`
implementations, only need to implement the interface, no need to modify the
caller
- Eliminates code duplication
- Type-safe polymorphic calls
---
### Issue 5: TableIdInjectingCollector Silent Skipping for Non-SeaTunnelRow
Records
**Location**:
`seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/TableIdInjectingCollector.java:35`
```java
@Override
public void collect(T record) {
if (record instanceof SeaTunnelRow) { // ⚠️ Type check + silent skip
((SeaTunnelRow) record).setTableId(tableId);
}
delegate.collect(record);
}
```
**Related Context**:
- `PulsarSourceReader` generic type is `<T>`, but fixed to `SeaTunnelRow` in
actual use
- `DeserializationSchema<SeaTunnelRow>` is the deserializer type
**Problem Description**:
Although `T` is always `SeaTunnelRow` in the current implementation, the
code uses generics and `instanceof` checks, implying theoretically other types
might exist. If a non-`SeaTunnelRow` type is passed, the code will silently
skip tableId setting, potentially leading to:
1. In multi-table mode, some records don't have tableId, causing downstream
routing failures
2. Problems are difficult to troubleshoot (no logs or exceptions)
**Potential Risks**:
- **Risk 1**: If other Record types exist in the future (e.g.,
`SchemaChangeEvent`), tableId injection will fail
- **Risk 2**: No clear error message when types don't match
**Impact Scope**:
- Direct impact: `TableIdInjectingCollector`
- Indirect impact: Multi-table mode data routing
- Affected area: Pulsar multi-table Source
**Severity**: MINOR
**Improvement Suggestions**:
```java
// Option 1: Remove generics, make type explicit
public class TableIdInjectingCollector implements Collector<SeaTunnelRow> {
// Remove <T>
private final Collector<SeaTunnelRow> delegate; // Explicit type
private final String tableId;
@Override
public void collect(SeaTunnelRow record) { // Remove generics
record.setTableId(tableId); // Remove instanceof check
delegate.collect(record);
}
}
// Option 2: Keep generics but add logging
@Override
public void collect(T record) {
if (!(record instanceof SeaTunnelRow)) {
LOG.warn("Record is not a SeaTunnelRow, cannot inject tableId.
Record type: {}",
record != null ? record.getClass().getName() : "null");
}
delegate.collect(record);
}
```
**Rationale**:
- Option 1 is more thorough: current SeaTunnel source connectors only
process `SeaTunnelRow`, generics are over-design
- Option 2 has better compatibility: reserves future extensibility but
increases observability
- Recommend option 1, because `SchemaChangeEvent` is handled through
`collect(SchemaChangeEvent)` overloading, no need to inject tableId
---
### Issue 6: PulsarSource.setJobContext() Batch Mode Validation Timing Issue
**Location**:
`seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java:183`
```java
@Override
public void setJobContext(JobContext jobContext) {
if (multiTableConfig.isMultiTable()
&& JobMode.BATCH.equals(jobContext.getJobMode())
&& getBoundedness() == Boundedness.UNBOUNDED) {
throw new PulsarConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
"Pulsar source does not support unbounded multi-table
configuration in batch mode.");
}
}
```
**Related Context**:
- This validation depends on `jobContext.getJobMode()`, which can only be
obtained at runtime
- PulsarSourceFactory's `validateSourceOptions()` cannot access JobMode
**Problem Description**:
Configuration validation is delayed to `setJobContext()` execution, which
means:
1. Users will only receive errors after submitting the job, not during the
configuration validation phase
2. The error message mentions "multi-table configuration" and "unbounded",
but users may not understand why this combination is not supported
**Potential Risks**:
- **Risk 1**: Poor user experience (delayed error discovery)
- **Risk 2**: Semantic issue of using UNBOUNDED source in Batch mode: the
job may never complete
**Impact Scope**:
- Direct impact: Batch mode + multi-table + UNBOUNDED configuration
combination
- Affected area: Multi-table mode users
**Severity**: MAJOR (but does not block merge)
**Improvement Suggestions**:
```java
// Option 1: Clearly document limitations in documentation
// docs/en/connectors/source/Pulsar.md
/**
* **Note**: In batch mode, all tables must be bounded (cursor.stop.mode !=
NEVER).
* If any table uses cursor.stop.mode = NEVER, the source is considered
unbounded
* and the batch job will fail at runtime. Use streaming mode for unbounded
sources.
*/
// Option 2: Improve error messages, provide fix guidance
throw new PulsarConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"Pulsar source does not support unbounded multi-table
configuration in batch mode.%n" +
"Current config: multi-table=%s, boundedness=%s, job-mode=%s%n" +
"Solution: Either%n" +
" 1. Set cursor.stop.mode to LATEST or TIMESTAMP for all tables
in tables_configs, or%n" +
" 2. Use streaming mode (remove 'job-mode = BATCH' from job
config)",
multiTableConfig.isMultiTable(), getBoundedness(),
jobContext.getJobMode()));
// Option 3: Add early warning in PulsarSourceFactory (if can be inferred)
// But this requires parsing tables_configs in advance, which is complex
```
**Rationale**:
- Option 1 is the minimal change, improving user awareness
- Option 2 provides a specific solution, reducing user troubleshooting time
- Option 3 is ideal but complex to implement, not recommended
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]