yzeng1618 commented on code in PR #10557:
URL: https://github.com/apache/seatunnel/pull/10557#discussion_r2928480634


##########
docs/en/connectors/source/Rabbitmq.md:
##########
@@ -173,8 +187,62 @@ source {
     }
 }
 ```
+### Multi-table Read Example
 
-## Changelog
+You can use the `table_configs` option to consume messages from multiple 
RabbitMQ queues simultaneously within a single job. The connector will 
automatically assign the correct table identifier to each row based on the 
queue it originated from, allowing you to route them to different sinks using 
`plugin_input`.

Review Comment:
   The parameter comment for table_configs is inconsistent with the actual 
parameter name.



##########
docs/en/connectors/source/Rabbitmq.md:
##########
@@ -91,17 +92,21 @@ the exchange to publish the message to
 
 #### fields [Config]
 
-the schema fields of upstream data. For more details, please refer to [Schema 
Feature](../../introduction/concepts/schema-feature.md).
+the schema fields of upstream data. For more details, please refer to [Schema 
Feature](../../introduction/concepts/schema-feature.md). *Note: Required if 
`tables_configs` is not configured.*
+
+### tables_configs [array]

Review Comment:
   Please supplement the Chinese documentation for RabbitMQ.



##########
seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSplitEnumerator.java:
##########
@@ -17,21 +17,132 @@
 
 package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqBaseOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplitEnumeratorState;
+
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public class RabbitmqSplitEnumerator
+        implements SourceSplitEnumerator<RabbitmqSplit, 
RabbitmqSplitEnumeratorState> {
+
+    private final SourceSplitEnumerator.Context<RabbitmqSplit> context;
+    private final Map<String, RabbitmqSplit> pendingSplits = new 
ConcurrentHashMap<>();
+    private final Object stateLock = new Object();
+    private final Set<Integer> assignedReaders = 
Collections.synchronizedSet(new HashSet<>());
+
+    /**
+     * Constructor for RabbitmqSplitEnumerator.
+     *
+     * @param context enumerator context
+     * @param rabbitmqConfig rabbitmq config
+     * @param queues list of queue names to consume
+     */
+    public RabbitmqSplitEnumerator(
+            SourceSplitEnumerator.Context<RabbitmqSplit> context,
+            RabbitmqConfig rabbitmqConfig,
+            List<String> queues) {
+        this.context = context;
+        for (String queue : queues) {
+            log.info("Discovered queue for processing: {}", queue);
+            this.pendingSplits.put(queue, new RabbitmqSplit(queue));
+        }
+    }
+
+    private void initializeSplits(ReadonlyConfig pluginConfig, RabbitmqConfig 
rabbitmqConfig) {

Review Comment:
   It seems this method is no longer in use; the logic of initializeSplits has 
been taken over by the upper-level RabbitmqSource.



##########
docs/en/connectors/source/Rabbitmq.md:
##########
@@ -173,8 +187,62 @@ source {
     }
 }
 ```
+### Multi-table Read Example
 
-## Changelog
+You can use the `table_configs` option to consume messages from multiple 
RabbitMQ queues simultaneously within a single job. The connector will 
automatically assign the correct table identifier to each row based on the 
queue it originated from, allowing you to route them to different sinks using 
`plugin_input`.
 
-<ChangeLog />
+```hocon
+source {
+  RabbitMQ {
+    host = "localhost"
+    port = 5672
+    username = "guest"
+    password = "guest"
+    
+    # Use table_configs to read from multiple queues

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to