This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 4eec9be012 [Improve] rabbit mq options (#8740) 4eec9be012 is described below commit 4eec9be012a685fa2be13590c5c967b7cd6eef3b Author: Jarvis <jar...@apache.org> AuthorDate: Wed Feb 19 21:42:57 2025 +0800 [Improve] rabbit mq options (#8740) --- .../seatunnel/api/ConnectorOptionCheckTest.java | 2 - .../rabbitmq/config/RabbitmqBaseOptions.java | 135 ++++++++++ .../seatunnel/rabbitmq/config/RabbitmqConfig.java | 271 ++++----------------- .../rabbitmq/config/RabbitmqSinkOptions.java | 35 +++ .../rabbitmq/config/RabbitmqSourceOptions.java | 63 +++++ .../seatunnel/rabbitmq/sink/RabbitmqSink.java | 59 +---- .../rabbitmq/sink/RabbitmqSinkFactory.java | 54 ++-- .../seatunnel/rabbitmq/source/RabbitmqSource.java | 62 +---- .../rabbitmq/source/RabbitmqSourceFactory.java | 74 +++--- 9 files changed, 371 insertions(+), 384 deletions(-) diff --git a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java index 7dce5ee5f5..533c622602 100644 --- a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java +++ b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java @@ -171,7 +171,6 @@ public class ConnectorOptionCheckTest { Set<String> whiteList = new HashSet<>(); whiteList.add("JdbcSinkOptions"); whiteList.add("TypesenseSourceOptions"); - whiteList.add("RabbitmqSourceOptions"); whiteList.add("TypesenseSinkOptions"); whiteList.add("EmailSinkOptions"); whiteList.add("HudiSinkOptions"); @@ -188,7 +187,6 @@ public class ConnectorOptionCheckTest { whiteList.add("MongodbSinkOptions"); whiteList.add("IoTDBSinkOptions"); whiteList.add("EasysearchSourceOptions"); - whiteList.add("RabbitmqSinkOptions"); whiteList.add("IcebergSourceOptions"); whiteList.add("HbaseSourceOptions"); whiteList.add("PaimonSourceOptions"); diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqBaseOptions.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqBaseOptions.java new file mode 100644 index 0000000000..ff71f860e3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqBaseOptions.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.options.ConnectorCommonOptions; + +public class RabbitmqBaseOptions extends ConnectorCommonOptions { + + public static final Option<String> HOST = + Options.key("host") + .stringType() + .noDefaultValue() + .withDescription("the default host to use for connections"); + + public static final Option<Integer> PORT = + Options.key("port") + .intType() + .noDefaultValue() + .withDescription("the default port to use for connections"); + + public static final Option<String> VIRTUAL_HOST = + Options.key("virtual_host") + .stringType() + .noDefaultValue() + .withDescription("the virtual host to use when connecting to the broker"); + + public static final Option<String> QUEUE_NAME = + Options.key("queue_name") + .stringType() + .noDefaultValue() + .withDescription("the queue to write the message to"); + + public static final Option<String> USERNAME = + Options.key("username") + .stringType() + .noDefaultValue() + .withDescription("the AMQP user name to use when connecting to the broker"); + + public static final Option<String> PASSWORD = + Options.key("password") + .stringType() + .noDefaultValue() + .withDescription("the password to use when connecting to the broker"); + + public static final Option<String> URL = + Options.key("url") + .stringType() + .noDefaultValue() + .withDescription( + "convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host"); + + public static final Option<String> ROUTING_KEY = + Options.key("routing_key") + .stringType() + .noDefaultValue() + .withDescription("the routing key to publish the message to"); + + public static final Option<String> EXCHANGE = + Options.key("exchange") + .stringType() + .noDefaultValue() + .withDescription("the exchange to publish the message to"); + + public static final Option<Integer> NETWORK_RECOVERY_INTERVAL = + Options.key("network_recovery_interval") + .intType() + .noDefaultValue() + .withDescription( + "how long will automatic recovery wait before attempting to reconnect, in ms"); + + public static final Option<Boolean> TOPOLOGY_RECOVERY_ENABLED = + Options.key("topology_recovery_enabled") + .booleanType() + .noDefaultValue() + .withDescription("if true, enables topology recovery"); + + public static final Option<Boolean> AUTOMATIC_RECOVERY_ENABLED = + Options.key("AUTOMATIC_RECOVERY_ENABLED") + .booleanType() + .noDefaultValue() + .withDescription("if true, enables connection recovery"); + + public static final Option<Integer> CONNECTION_TIMEOUT = + Options.key("connection_timeout") + .intType() + .noDefaultValue() + .withDescription("connection TCP establishment timeout in milliseconds"); + + public static final Option<Boolean> FOR_E2E_TESTING = + Options.key("for_e2e_testing") + .booleanType() + .noDefaultValue() + .withDescription("use to recognize E2E mode"); + + public static final Option<Boolean> DURABLE = + Options.key("durable") + .booleanType() + .defaultValue(true) + .withDescription( + "true: The queue will survive a server restart." + + "false: The queue will be deleted on server restart."); + + public static final Option<Boolean> EXCLUSIVE = + Options.key("exclusive") + .booleanType() + .defaultValue(false) + .withDescription( + "true: The queue is used only by the current connection and will be deleted when the connection closes." + + "false: The queue can be used by multiple connections."); + + public static final Option<Boolean> AUTO_DELETE = + Options.key("auto_delete") + .booleanType() + .defaultValue(false) + .withDescription( + "true: The queue will be deleted automatically when the last consumer unsubscribes." + + "false: The queue will not be automatically deleted."); +} diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java index 2872c9af81..ba67ed3c69 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java @@ -17,24 +17,20 @@ package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config; -import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.configuration.Option; -import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import java.io.Serializable; -import java.util.Collections; import java.util.HashMap; import java.util.Map; @Setter @Getter +@NoArgsConstructor @AllArgsConstructor public class RabbitmqConfig implements Serializable { private String host; @@ -63,241 +59,70 @@ public class RabbitmqConfig implements Serializable { private boolean forE2ETesting = false; private boolean usesCorrelationId = false; - private final Map<String, Object> sinkOptionProps = new HashMap<>(); - - public static final Option<String> HOST = - Options.key("host") - .stringType() - .noDefaultValue() - .withDescription("the default host to use for connections"); - - public static final Option<Integer> PORT = - Options.key("port") - .intType() - .noDefaultValue() - .withDescription("the default port to use for connections"); - - public static final Option<String> VIRTUAL_HOST = - Options.key("virtual_host") - .stringType() - .noDefaultValue() - .withDescription("the virtual host to use when connecting to the broker"); - - public static final Option<String> USERNAME = - Options.key("username") - .stringType() - .noDefaultValue() - .withDescription("the AMQP user name to use when connecting to the broker"); - - public static final Option<String> PASSWORD = - Options.key("password") - .stringType() - .noDefaultValue() - .withDescription("the password to use when connecting to the broker"); - - public static final Option<String> QUEUE_NAME = - Options.key("queue_name") - .stringType() - .noDefaultValue() - .withDescription("the queue to write the message to"); - - public static final Option<String> URL = - Options.key("url") - .stringType() - .noDefaultValue() - .withDescription( - "convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host"); - - public static final Option<Integer> NETWORK_RECOVERY_INTERVAL = - Options.key("network_recovery_interval") - .intType() - .noDefaultValue() - .withDescription( - "how long will automatic recovery wait before attempting to reconnect, in ms"); - - public static final Option<Boolean> AUTOMATIC_RECOVERY_ENABLED = - Options.key("AUTOMATIC_RECOVERY_ENABLED") - .booleanType() - .noDefaultValue() - .withDescription("if true, enables connection recovery"); - - public static final Option<Boolean> TOPOLOGY_RECOVERY_ENABLED = - Options.key("topology_recovery_enabled") - .booleanType() - .noDefaultValue() - .withDescription("if true, enables topology recovery"); - - public static final Option<Integer> CONNECTION_TIMEOUT = - Options.key("connection_timeout") - .intType() - .noDefaultValue() - .withDescription("connection TCP establishment timeout in milliseconds"); - - public static final Option<Integer> REQUESTED_CHANNEL_MAX = - Options.key("requested_channel_max") - .intType() - .noDefaultValue() - .withDescription("initially requested maximum channel number"); - - public static final Option<Integer> REQUESTED_FRAME_MAX = - Options.key("requested_frame_max") - .intType() - .noDefaultValue() - .withDescription("the requested maximum frame size"); - - public static final Option<Integer> REQUESTED_HEARTBEAT = - Options.key("requested_heartbeat") - .intType() - .noDefaultValue() - .withDescription("the requested heartbeat timeout"); + private Map<String, String> sinkOptionProps = new HashMap<>(); - public static final Option<Long> PREFETCH_COUNT = - Options.key("prefetch_count") - .longType() - .noDefaultValue() - .withDescription( - "prefetchCount the max number of messages to receive without acknowledgement\n"); - - public static final Option<Integer> DELIVERY_TIMEOUT = - Options.key("delivery_timeout") - .intType() - .noDefaultValue() - .withDescription("deliveryTimeout maximum wait time"); - - public static final Option<String> ROUTING_KEY = - Options.key("routing_key") - .stringType() - .noDefaultValue() - .withDescription("the routing key to publish the message to"); - - public static final Option<String> EXCHANGE = - Options.key("exchange") - .stringType() - .noDefaultValue() - .withDescription("the exchange to publish the message to"); - - public static final Option<Boolean> FOR_E2E_TESTING = - Options.key("for_e2e_testing") - .booleanType() - .noDefaultValue() - .withDescription("use to recognize E2E mode"); - - public static final Option<Map<String, String>> RABBITMQ_CONFIG = - Options.key("rabbitmq.config") - .mapType() - .defaultValue(Collections.emptyMap()) - .withDescription( - "In addition to the above parameters that must be specified by the RabbitMQ client, the user can also specify multiple non-mandatory parameters for the client, " - + "covering [all the parameters specified in the official RabbitMQ document](https://www.rabbitmq.com/configure.html)."); - - public static final Option<Boolean> USE_CORRELATION_ID = - Options.key("use_correlation_id") - .booleanType() - .noDefaultValue() - .withDescription( - "Whether the messages received are supplied with a unique" - + "id to deduplicate messages (in case of failed acknowledgments)."); - - public static final Option<Boolean> DURABLE = - Options.key("durable") - .booleanType() - .defaultValue(true) - .withDescription( - "true: The queue will survive a server restart." - + "false: The queue will be deleted on server restart."); - - public static final Option<Boolean> EXCLUSIVE = - Options.key("exclusive") - .booleanType() - .defaultValue(false) - .withDescription( - "true: The queue is used only by the current connection and will be deleted when the connection closes." - + "false: The queue can be used by multiple connections."); - - public static final Option<Boolean> AUTO_DELETE = - Options.key("auto_delete") - .booleanType() - .defaultValue(false) - .withDescription( - "true: The queue will be deleted automatically when the last consumer unsubscribes." - + "false: The queue will not be automatically deleted."); - - private void parseSinkOptionProperties(Config pluginConfig) { - if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key())) { - pluginConfig - .getObject(RABBITMQ_CONFIG.key()) - .forEach( - (key, value) -> { - final String configKey = key.toLowerCase(); - this.sinkOptionProps.put(configKey, value.unwrapped()); - }); + public RabbitmqConfig(ReadonlyConfig config) { + this.host = config.get(RabbitmqBaseOptions.HOST); + this.port = config.get(RabbitmqBaseOptions.PORT); + this.queueName = config.get(RabbitmqBaseOptions.QUEUE_NAME); + if (config.getOptional(RabbitmqBaseOptions.USERNAME).isPresent()) { + this.username = config.get(RabbitmqBaseOptions.USERNAME); } - } - - public RabbitmqConfig(Config config) { - this.host = config.getString(HOST.key()); - this.port = config.getInt(PORT.key()); - this.queueName = config.getString(QUEUE_NAME.key()); - if (config.hasPath(USERNAME.key())) { - this.username = config.getString(USERNAME.key()); - } - if (config.hasPath(PASSWORD.key())) { - this.password = config.getString(PASSWORD.key()); + if (config.getOptional(RabbitmqBaseOptions.PASSWORD).isPresent()) { + this.password = config.get(RabbitmqBaseOptions.PASSWORD); } - if (config.hasPath(VIRTUAL_HOST.key())) { - this.virtualHost = config.getString(VIRTUAL_HOST.key()); + if (config.getOptional(RabbitmqBaseOptions.VIRTUAL_HOST).isPresent()) { + this.virtualHost = config.get(RabbitmqBaseOptions.VIRTUAL_HOST); } - if (config.hasPath(NETWORK_RECOVERY_INTERVAL.key())) { - this.networkRecoveryInterval = config.getInt(NETWORK_RECOVERY_INTERVAL.key()); + if (config.getOptional(RabbitmqBaseOptions.NETWORK_RECOVERY_INTERVAL).isPresent()) { + this.networkRecoveryInterval = + config.get(RabbitmqBaseOptions.NETWORK_RECOVERY_INTERVAL); } - if (config.hasPath(AUTOMATIC_RECOVERY_ENABLED.key())) { - this.automaticRecovery = config.getBoolean(AUTOMATIC_RECOVERY_ENABLED.key()); + if (config.getOptional(RabbitmqBaseOptions.AUTOMATIC_RECOVERY_ENABLED).isPresent()) { + this.automaticRecovery = config.get(RabbitmqBaseOptions.AUTOMATIC_RECOVERY_ENABLED); } - if (config.hasPath(TOPOLOGY_RECOVERY_ENABLED.key())) { - this.topologyRecovery = config.getBoolean(TOPOLOGY_RECOVERY_ENABLED.key()); + if (config.getOptional(RabbitmqBaseOptions.TOPOLOGY_RECOVERY_ENABLED).isPresent()) { + this.topologyRecovery = config.get(RabbitmqBaseOptions.TOPOLOGY_RECOVERY_ENABLED); } - if (config.hasPath(CONNECTION_TIMEOUT.key())) { - this.connectionTimeout = config.getInt(CONNECTION_TIMEOUT.key()); + if (config.getOptional(RabbitmqBaseOptions.CONNECTION_TIMEOUT).isPresent()) { + this.connectionTimeout = config.get(RabbitmqBaseOptions.CONNECTION_TIMEOUT); } - if (config.hasPath(REQUESTED_CHANNEL_MAX.key())) { - this.requestedChannelMax = config.getInt(REQUESTED_CHANNEL_MAX.key()); + if (config.getOptional(RabbitmqSourceOptions.REQUESTED_CHANNEL_MAX).isPresent()) { + this.requestedChannelMax = config.get(RabbitmqSourceOptions.REQUESTED_CHANNEL_MAX); } - if (config.hasPath(REQUESTED_FRAME_MAX.key())) { - this.requestedFrameMax = config.getInt(REQUESTED_FRAME_MAX.key()); + if (config.getOptional(RabbitmqSourceOptions.REQUESTED_FRAME_MAX).isPresent()) { + this.requestedFrameMax = config.get(RabbitmqSourceOptions.REQUESTED_FRAME_MAX); } - if (config.hasPath(REQUESTED_HEARTBEAT.key())) { - this.requestedHeartbeat = config.getInt(REQUESTED_HEARTBEAT.key()); + if (config.getOptional(RabbitmqSourceOptions.REQUESTED_HEARTBEAT).isPresent()) { + this.requestedHeartbeat = config.get(RabbitmqSourceOptions.REQUESTED_HEARTBEAT); } - if (config.hasPath(PREFETCH_COUNT.key())) { - this.prefetchCount = config.getInt(PREFETCH_COUNT.key()); + if (config.getOptional(RabbitmqSourceOptions.PREFETCH_COUNT).isPresent()) { + this.prefetchCount = config.get(RabbitmqSourceOptions.PREFETCH_COUNT); } - if (config.hasPath(DELIVERY_TIMEOUT.key())) { - this.deliveryTimeout = config.getInt(DELIVERY_TIMEOUT.key()); + if (config.getOptional(RabbitmqSourceOptions.DELIVERY_TIMEOUT).isPresent()) { + this.deliveryTimeout = config.get(RabbitmqSourceOptions.DELIVERY_TIMEOUT); } - if (config.hasPath(ROUTING_KEY.key())) { - this.routingKey = config.getString(ROUTING_KEY.key()); + if (config.getOptional(RabbitmqBaseOptions.ROUTING_KEY).isPresent()) { + this.routingKey = config.get(RabbitmqBaseOptions.ROUTING_KEY); } - if (config.hasPath(EXCHANGE.key())) { - this.exchange = config.getString(EXCHANGE.key()); + if (config.getOptional(RabbitmqBaseOptions.EXCHANGE).isPresent()) { + this.exchange = config.get(RabbitmqBaseOptions.EXCHANGE); } - if (config.hasPath(FOR_E2E_TESTING.key())) { - this.forE2ETesting = config.getBoolean(FOR_E2E_TESTING.key()); + if (config.getOptional(RabbitmqBaseOptions.FOR_E2E_TESTING).isPresent()) { + this.forE2ETesting = config.get(RabbitmqBaseOptions.FOR_E2E_TESTING); } - if (config.hasPath(USE_CORRELATION_ID.key())) { - this.usesCorrelationId = config.getBoolean(USE_CORRELATION_ID.key()); + if (config.getOptional(RabbitmqSourceOptions.USE_CORRELATION_ID).isPresent()) { + this.usesCorrelationId = config.get(RabbitmqSourceOptions.USE_CORRELATION_ID); } - if (config.hasPath(DURABLE.key())) { - this.durable = config.getBoolean(DURABLE.key()); + if (config.getOptional(RabbitmqBaseOptions.DURABLE).isPresent()) { + this.durable = config.get(RabbitmqBaseOptions.DURABLE); } - if (config.hasPath(EXCLUSIVE.key())) { - this.exclusive = config.getBoolean(EXCLUSIVE.key()); + if (config.getOptional(RabbitmqBaseOptions.EXCLUSIVE).isPresent()) { + this.exclusive = config.get(RabbitmqBaseOptions.EXCLUSIVE); } - if (config.hasPath(AUTO_DELETE.key())) { - this.autoDelete = config.getBoolean(AUTO_DELETE.key()); + if (config.getOptional(RabbitmqBaseOptions.AUTO_DELETE).isPresent()) { + this.autoDelete = config.get(RabbitmqBaseOptions.AUTO_DELETE); } - parseSinkOptionProperties(config); + this.sinkOptionProps = config.get(RabbitmqSinkOptions.RABBITMQ_CONFIG); } - - @VisibleForTesting - public RabbitmqConfig() {} } diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java new file mode 100644 index 0000000000..cfd602596f --- /dev/null +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import java.util.Collections; +import java.util.Map; + +public class RabbitmqSinkOptions extends RabbitmqBaseOptions { + + public static final Option<Map<String, String>> RABBITMQ_CONFIG = + Options.key("rabbitmq.config") + .mapType() + .defaultValue(Collections.emptyMap()) + .withDescription( + "In addition to the above parameters that must be specified by the RabbitMQ client, the user can also specify multiple non-mandatory parameters for the client, " + + "covering [all the parameters specified in the official RabbitMQ document](https://www.rabbitmq.com/configure.html)."); +} diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java new file mode 100644 index 0000000000..dcc72f2067 --- /dev/null +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class RabbitmqSourceOptions extends RabbitmqBaseOptions { + + public static final Option<Integer> REQUESTED_CHANNEL_MAX = + Options.key("requested_channel_max") + .intType() + .noDefaultValue() + .withDescription("initially requested maximum channel number"); + + public static final Option<Integer> REQUESTED_FRAME_MAX = + Options.key("requested_frame_max") + .intType() + .noDefaultValue() + .withDescription("the requested maximum frame size"); + + public static final Option<Integer> REQUESTED_HEARTBEAT = + Options.key("requested_heartbeat") + .intType() + .noDefaultValue() + .withDescription("the requested heartbeat timeout"); + + public static final Option<Integer> PREFETCH_COUNT = + Options.key("prefetch_count") + .intType() + .noDefaultValue() + .withDescription( + "prefetchCount the max number of messages to receive without acknowledgement\n"); + + public static final Option<Integer> DELIVERY_TIMEOUT = + Options.key("delivery_timeout") + .intType() + .noDefaultValue() + .withDescription("deliveryTimeout maximum wait time"); + + public static final Option<Boolean> USE_CORRELATION_ID = + Options.key("use_correlation_id") + .booleanType() + .noDefaultValue() + .withDescription( + "Whether the messages received are supplied with a unique" + + "id to deduplicate messages (in case of failed acknowledgments)."); +} diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java index 7d4f26272b..bfcedcc340 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java @@ -17,82 +17,39 @@ package org.apache.seatunnel.connectors.seatunnel.rabbitmq.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig; -import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException; - -import com.google.auto.service.AutoService; import java.io.IOException; import java.util.Optional; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST; - -@AutoService(SeaTunnelSink.class) public class RabbitmqSink extends AbstractSimpleSink<SeaTunnelRow, Void> { - private SeaTunnelRowType seaTunnelRowType; - private Config pluginConfig; - private RabbitmqConfig rabbitMQConfig; + + private final RabbitmqConfig rabbitMQConfig; + private final CatalogTable catalogTable; @Override public String getPluginName() { return "RabbitMQ"; } - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - this.pluginConfig = pluginConfig; - - CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, - HOST.key(), - PORT.key(), - VIRTUAL_HOST.key(), - USERNAME.key(), - PASSWORD.key(), - QUEUE_NAME.key()); - if (!result.isSuccess()) { - throw new RabbitmqConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - rabbitMQConfig = new RabbitmqConfig(pluginConfig); - } - - @Override - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; + public RabbitmqSink(RabbitmqConfig rabbitMQConfig, CatalogTable catalogTable) { + this.rabbitMQConfig = rabbitMQConfig; + this.catalogTable = catalogTable; } @Override public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException { - return new RabbitmqSinkWriter(rabbitMQConfig, seaTunnelRowType); + return new RabbitmqSinkWriter(rabbitMQConfig, catalogTable.getSeaTunnelRowType()); } @Override public Optional<CatalogTable> getWriteCatalogTable() { - return super.getWriteCatalogTable(); + return Optional.of(catalogTable); } } diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java index 4618d351d9..540707d0f0 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java @@ -18,26 +18,15 @@ package org.apache.seatunnel.connectors.seatunnel.rabbitmq.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig; +import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSinkOptions; import com.google.auto.service.AutoService; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.RABBITMQ_CONFIG; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST; - @AutoService(Factory.class) public class RabbitmqSinkFactory implements TableSinkFactory { @@ -49,17 +38,32 @@ public class RabbitmqSinkFactory implements TableSinkFactory { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(HOST, PORT, VIRTUAL_HOST, QUEUE_NAME) - .bundled(USERNAME, PASSWORD) + .required( + RabbitmqSinkOptions.HOST, + RabbitmqSinkOptions.PORT, + RabbitmqSinkOptions.VIRTUAL_HOST, + RabbitmqSinkOptions.QUEUE_NAME) + .bundled(RabbitmqSinkOptions.USERNAME, RabbitmqSinkOptions.PASSWORD) .optional( - URL, - ROUTING_KEY, - EXCHANGE, - NETWORK_RECOVERY_INTERVAL, - TOPOLOGY_RECOVERY_ENABLED, - AUTOMATIC_RECOVERY_ENABLED, - CONNECTION_TIMEOUT, - RABBITMQ_CONFIG) + RabbitmqSinkOptions.URL, + RabbitmqSinkOptions.ROUTING_KEY, + RabbitmqSinkOptions.EXCHANGE, + RabbitmqSinkOptions.NETWORK_RECOVERY_INTERVAL, + RabbitmqSinkOptions.TOPOLOGY_RECOVERY_ENABLED, + RabbitmqSinkOptions.AUTOMATIC_RECOVERY_ENABLED, + RabbitmqSinkOptions.CONNECTION_TIMEOUT, + RabbitmqSinkOptions.FOR_E2E_TESTING, + RabbitmqSinkOptions.DURABLE, + RabbitmqSinkOptions.EXCLUSIVE, + RabbitmqSinkOptions.AUTO_DELETE, + RabbitmqSinkOptions.RABBITMQ_CONFIG) .build(); } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + return () -> + new RabbitmqSink( + new RabbitmqConfig(context.getOptions()), context.getCatalogTable()); + } } diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java index 01146f26ac..afe108f229 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java @@ -17,12 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.options.ConnectorCommonOptions; import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; @@ -30,11 +26,7 @@ import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.source.SupportParallelism; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig; @@ -43,23 +35,23 @@ import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit; import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplitEnumeratorState; import org.apache.seatunnel.format.json.JsonDeserializationSchema; -import com.google.auto.service.AutoService; - -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST; +import java.util.Collections; +import java.util.List; -@AutoService(SeaTunnelSource.class) public class RabbitmqSource implements SeaTunnelSource<SeaTunnelRow, RabbitmqSplit, RabbitmqSplitEnumeratorState>, SupportParallelism { private DeserializationSchema<SeaTunnelRow> deserializationSchema; private JobContext jobContext; - private RabbitmqConfig rabbitMQConfig; + private final RabbitmqConfig rabbitMQConfig; + private final CatalogTable catalogTable; + + public RabbitmqSource(RabbitmqConfig rabbitMQConfig, CatalogTable catalogTable) { + this.rabbitMQConfig = rabbitMQConfig; + this.catalogTable = catalogTable; + this.deserializationSchema = new JsonDeserializationSchema(catalogTable, false, false); + } @Override public Boundedness getBoundedness() { @@ -79,31 +71,8 @@ public class RabbitmqSource } @Override - public void prepare(Config config) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists( - config, - HOST.key(), - PORT.key(), - VIRTUAL_HOST.key(), - USERNAME.key(), - PASSWORD.key(), - QUEUE_NAME.key(), - ConnectorCommonOptions.SCHEMA.key()); - if (!result.isSuccess()) { - throw new RabbitmqConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - this.rabbitMQConfig = new RabbitmqConfig(config); - setDeserialization(config); - } - - @Override - public SeaTunnelDataType getProducedType() { - return deserializationSchema.getProducedType(); + public List<CatalogTable> getProducedCatalogTables() { + return Collections.singletonList(catalogTable); } @Override @@ -130,11 +99,4 @@ public class RabbitmqSource public void setJobContext(JobContext jobContext) { this.jobContext = jobContext; } - - private void setDeserialization(Config config) { - // TODO: format SPI - // only support json deserializationSchema - CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config); - this.deserializationSchema = new JsonDeserializationSchema(catalogTable, false, false); - } } diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java index 212b194065..a45a2d11a1 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java @@ -18,31 +18,20 @@ package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.options.ConnectorCommonOptions; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig; +import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSinkOptions; +import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSourceOptions; import com.google.auto.service.AutoService; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.DELIVERY_TIMEOUT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PREFETCH_COUNT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_CHANNEL_MAX; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_FRAME_MAX; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_HEARTBEAT; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME; -import static org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST; +import java.io.Serializable; @AutoService(Factory.class) public class RabbitmqSourceFactory implements TableSourceFactory { @@ -54,24 +43,43 @@ public class RabbitmqSourceFactory implements TableSourceFactory { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(HOST, PORT, VIRTUAL_HOST, QUEUE_NAME, ConnectorCommonOptions.SCHEMA) - .bundled(USERNAME, PASSWORD) + .required( + RabbitmqSourceOptions.HOST, + RabbitmqSourceOptions.PORT, + RabbitmqSourceOptions.VIRTUAL_HOST, + RabbitmqSourceOptions.QUEUE_NAME, + RabbitmqSourceOptions.SCHEMA) + .bundled(RabbitmqSourceOptions.USERNAME, RabbitmqSourceOptions.PASSWORD) .optional( - URL, - ROUTING_KEY, - EXCHANGE, - NETWORK_RECOVERY_INTERVAL, - TOPOLOGY_RECOVERY_ENABLED, - AUTOMATIC_RECOVERY_ENABLED, - CONNECTION_TIMEOUT, - REQUESTED_CHANNEL_MAX, - REQUESTED_FRAME_MAX, - REQUESTED_HEARTBEAT, - PREFETCH_COUNT, - DELIVERY_TIMEOUT) + RabbitmqSourceOptions.URL, + RabbitmqSourceOptions.ROUTING_KEY, + RabbitmqSourceOptions.EXCHANGE, + RabbitmqSourceOptions.NETWORK_RECOVERY_INTERVAL, + RabbitmqSourceOptions.TOPOLOGY_RECOVERY_ENABLED, + RabbitmqSourceOptions.AUTOMATIC_RECOVERY_ENABLED, + RabbitmqSourceOptions.CONNECTION_TIMEOUT, + RabbitmqSinkOptions.FOR_E2E_TESTING, + RabbitmqSinkOptions.DURABLE, + RabbitmqSinkOptions.EXCLUSIVE, + RabbitmqSinkOptions.AUTO_DELETE, + RabbitmqSourceOptions.REQUESTED_CHANNEL_MAX, + RabbitmqSourceOptions.REQUESTED_FRAME_MAX, + RabbitmqSourceOptions.REQUESTED_HEARTBEAT, + RabbitmqSourceOptions.PREFETCH_COUNT, + RabbitmqSourceOptions.DELIVERY_TIMEOUT) .build(); } + @Override + public <T, SplitT extends SourceSplit, StateT extends Serializable> + TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource<T, SplitT, StateT>) + new RabbitmqSource( + new RabbitmqConfig(context.getOptions()), + CatalogTableUtil.buildWithConfig(context.getOptions())); + } + @Override public Class<? extends SeaTunnelSource> getSourceClass() { return RabbitmqSource.class;