yzeng1618 commented on code in PR #10557:
URL: https://github.com/apache/seatunnel/pull/10557#discussion_r2928480634
##########
docs/en/connectors/source/Rabbitmq.md:
##########
@@ -173,8 +187,62 @@ source {
}
}
```
+### Multi-table Read Example
-## Changelog
+You can use the `table_configs` option to consume messages from multiple
RabbitMQ queues simultaneously within a single job. The connector will
automatically assign the correct table identifier to each row based on the
queue it originated from, allowing you to route them to different sinks using
`plugin_input`.
Review Comment:
The parameter comment for table_configs is inconsistent with the actual
parameter name.
##########
docs/en/connectors/source/Rabbitmq.md:
##########
@@ -91,17 +92,21 @@ the exchange to publish the message to
#### fields [Config]
-the schema fields of upstream data. For more details, please refer to [Schema
Feature](../../introduction/concepts/schema-feature.md).
+the schema fields of upstream data. For more details, please refer to [Schema
Feature](../../introduction/concepts/schema-feature.md). *Note: Required if
`tables_configs` is not configured.*
+
+### tables_configs [array]
Review Comment:
Please supplement the Chinese documentation for RabbitMQ.
##########
seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSplitEnumerator.java:
##########
@@ -17,21 +17,132 @@
package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqBaseOptions;
+import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit;
+import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplitEnumeratorState;
+
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public class RabbitmqSplitEnumerator
+ implements SourceSplitEnumerator<RabbitmqSplit,
RabbitmqSplitEnumeratorState> {
+
+ private final SourceSplitEnumerator.Context<RabbitmqSplit> context;
+ private final Map<String, RabbitmqSplit> pendingSplits = new
ConcurrentHashMap<>();
+ private final Object stateLock = new Object();
+ private final Set<Integer> assignedReaders =
Collections.synchronizedSet(new HashSet<>());
+
+ /**
+ * Constructor for RabbitmqSplitEnumerator.
+ *
+ * @param context enumerator context
+ * @param rabbitmqConfig rabbitmq config
+ * @param queues list of queue names to consume
+ */
+ public RabbitmqSplitEnumerator(
+ SourceSplitEnumerator.Context<RabbitmqSplit> context,
+ RabbitmqConfig rabbitmqConfig,
+ List<String> queues) {
+ this.context = context;
+ for (String queue : queues) {
+ log.info("Discovered queue for processing: {}", queue);
+ this.pendingSplits.put(queue, new RabbitmqSplit(queue));
+ }
+ }
+
+ private void initializeSplits(ReadonlyConfig pluginConfig, RabbitmqConfig
rabbitmqConfig) {
Review Comment:
It seems this method is no longer in use; the logic of initializeSplits has
been taken over by the upper-level RabbitmqSource.
##########
docs/en/connectors/source/Rabbitmq.md:
##########
@@ -173,8 +187,62 @@ source {
}
}
```
+### Multi-table Read Example
-## Changelog
+You can use the `table_configs` option to consume messages from multiple
RabbitMQ queues simultaneously within a single job. The connector will
automatically assign the correct table identifier to each row based on the
queue it originated from, allowing you to route them to different sinks using
`plugin_input`.
-<ChangeLog />
+```hocon
+source {
+ RabbitMQ {
+ host = "localhost"
+ port = 5672
+ username = "guest"
+ password = "guest"
+
+ # Use table_configs to read from multiple queues
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]