liunaijie commented on code in PR #8699: URL: https://github.com/apache/seatunnel/pull/8699#discussion_r1955434129
########## seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqBaseOptions.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config; + +import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.common.config.CheckConfigUtil; + +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@Setter +@Getter +public class RabbitmqBaseOptions implements Serializable { + public static final Option<String> HOST = + Options.key("host") + .stringType() + .noDefaultValue() + .withDescription("the default host to use for connections"); + + public static final Option<Integer> PORT = + Options.key("port") + .intType() + .noDefaultValue() + .withDescription("the default port to use for connections"); + + public static final Option<String> VIRTUAL_HOST = + Options.key("virtual_host") + .stringType() + .noDefaultValue() + .withDescription("the virtual host to use when connecting to the broker"); + + public static final Option<String> USERNAME = + Options.key("username") + .stringType() + .noDefaultValue() + .withDescription("the AMQP user name to use when connecting to the broker"); + + public static final Option<String> PASSWORD = + Options.key("password") + .stringType() + .noDefaultValue() + .withDescription("the password to use when connecting to the broker"); + + public static final Option<String> QUEUE_NAME = + Options.key("queue_name") + .stringType() + .noDefaultValue() + .withDescription("the queue to write the message to"); + + public static final Option<String> URL = + Options.key("url") + .stringType() + .noDefaultValue() + .withDescription( + "convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host"); + + public static final Option<Integer> NETWORK_RECOVERY_INTERVAL = + Options.key("network_recovery_interval") + .intType() + .noDefaultValue() + .withDescription( + "how long will automatic recovery wait before attempting to reconnect, in ms"); + + public static final Option<Boolean> AUTOMATIC_RECOVERY_ENABLED = + Options.key("AUTOMATIC_RECOVERY_ENABLED") + .booleanType() + .noDefaultValue() + .withDescription("if true, enables connection recovery"); + + public static final Option<Boolean> TOPOLOGY_RECOVERY_ENABLED = + Options.key("topology_recovery_enabled") + .booleanType() + .noDefaultValue() + .withDescription("if true, enables topology recovery"); + + public static final Option<Integer> CONNECTION_TIMEOUT = + Options.key("connection_timeout") + .intType() + .noDefaultValue() + .withDescription("connection TCP establishment timeout in milliseconds"); + + public static final Option<String> ROUTING_KEY = + Options.key("routing_key") + .stringType() + .noDefaultValue() + .withDescription("the routing key to publish the message to"); + + public static final Option<String> EXCHANGE = + Options.key("exchange") + .stringType() + .noDefaultValue() + .withDescription("the exchange to publish the message to"); + + public static final Option<Boolean> FOR_E2E_TESTING = + Options.key("for_e2e_testing") + .booleanType() + .noDefaultValue() + .withDescription("use to recognize E2E mode"); + + public static final Option<Map<String, String>> RABBITMQ_CONFIG = + Options.key("rabbitmq.config") + .mapType() + .defaultValue(Collections.emptyMap()) + .withDescription( + "In addition to the above parameters that must be specified by the RabbitMQ client, the user can also specify multiple non-mandatory parameters for the client, " + + "covering [all the parameters specified in the official RabbitMQ document](https://www.rabbitmq.com/configure.html)."); + + public static final Option<Boolean> USE_CORRELATION_ID = + Options.key("use_correlation_id") + .booleanType() + .noDefaultValue() + .withDescription( + "Whether the messages received are supplied with a unique" + + " id to deduplicate messages (in case of failed acknowledgments)."); + + public static final Option<Boolean> DURABLE = + Options.key("durable") + .booleanType() + .defaultValue(true) + .withDescription( + "true: The queue will survive a server restart." + + " false: The queue will be deleted on server restart."); + + public static final Option<Boolean> EXCLUSIVE = + Options.key("exclusive") + .booleanType() + .defaultValue(false) + .withDescription( + "true: The queue is used only by the current connection and will be deleted when the connection closes." + + " false: The queue can be used by multiple connections."); + + public static final Option<Boolean> AUTO_DELETE = + Options.key("auto_delete") + .booleanType() + .defaultValue(false) + .withDescription( + "true: The queue will be deleted automatically when the last consumer unsubscribes." + + " false: The queue will not be automatically deleted."); + + private final Map<String, Object> sinkOptionProps = new HashMap<>(); Review Comment: Why do we need this parameter in `BaseOption` ########## seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqBaseOptions.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config; + +import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.common.config.CheckConfigUtil; + +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@Setter +@Getter +public class RabbitmqBaseOptions implements Serializable { + public static final Option<String> HOST = + Options.key("host") + .stringType() + .noDefaultValue() + .withDescription("the default host to use for connections"); + + public static final Option<Integer> PORT = + Options.key("port") + .intType() + .noDefaultValue() + .withDescription("the default port to use for connections"); + + public static final Option<String> VIRTUAL_HOST = + Options.key("virtual_host") + .stringType() + .noDefaultValue() + .withDescription("the virtual host to use when connecting to the broker"); + + public static final Option<String> USERNAME = + Options.key("username") + .stringType() + .noDefaultValue() + .withDescription("the AMQP user name to use when connecting to the broker"); + + public static final Option<String> PASSWORD = + Options.key("password") + .stringType() + .noDefaultValue() + .withDescription("the password to use when connecting to the broker"); + + public static final Option<String> QUEUE_NAME = + Options.key("queue_name") + .stringType() + .noDefaultValue() + .withDescription("the queue to write the message to"); + + public static final Option<String> URL = + Options.key("url") + .stringType() + .noDefaultValue() + .withDescription( + "convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host"); + + public static final Option<Integer> NETWORK_RECOVERY_INTERVAL = + Options.key("network_recovery_interval") + .intType() + .noDefaultValue() + .withDescription( + "how long will automatic recovery wait before attempting to reconnect, in ms"); + + public static final Option<Boolean> AUTOMATIC_RECOVERY_ENABLED = + Options.key("AUTOMATIC_RECOVERY_ENABLED") + .booleanType() + .noDefaultValue() + .withDescription("if true, enables connection recovery"); + + public static final Option<Boolean> TOPOLOGY_RECOVERY_ENABLED = + Options.key("topology_recovery_enabled") + .booleanType() + .noDefaultValue() + .withDescription("if true, enables topology recovery"); + + public static final Option<Integer> CONNECTION_TIMEOUT = + Options.key("connection_timeout") + .intType() + .noDefaultValue() + .withDescription("connection TCP establishment timeout in milliseconds"); + + public static final Option<String> ROUTING_KEY = + Options.key("routing_key") + .stringType() + .noDefaultValue() + .withDescription("the routing key to publish the message to"); + + public static final Option<String> EXCHANGE = + Options.key("exchange") + .stringType() + .noDefaultValue() + .withDescription("the exchange to publish the message to"); + + public static final Option<Boolean> FOR_E2E_TESTING = + Options.key("for_e2e_testing") + .booleanType() + .noDefaultValue() + .withDescription("use to recognize E2E mode"); + + public static final Option<Map<String, String>> RABBITMQ_CONFIG = + Options.key("rabbitmq.config") + .mapType() + .defaultValue(Collections.emptyMap()) + .withDescription( + "In addition to the above parameters that must be specified by the RabbitMQ client, the user can also specify multiple non-mandatory parameters for the client, " + + "covering [all the parameters specified in the official RabbitMQ document](https://www.rabbitmq.com/configure.html)."); + + public static final Option<Boolean> USE_CORRELATION_ID = + Options.key("use_correlation_id") + .booleanType() + .noDefaultValue() + .withDescription( + "Whether the messages received are supplied with a unique" + + " id to deduplicate messages (in case of failed acknowledgments)."); + + public static final Option<Boolean> DURABLE = + Options.key("durable") + .booleanType() + .defaultValue(true) + .withDescription( + "true: The queue will survive a server restart." + + " false: The queue will be deleted on server restart."); + + public static final Option<Boolean> EXCLUSIVE = + Options.key("exclusive") + .booleanType() + .defaultValue(false) + .withDescription( + "true: The queue is used only by the current connection and will be deleted when the connection closes." + + " false: The queue can be used by multiple connections."); + + public static final Option<Boolean> AUTO_DELETE = Review Comment: All parameter in `Option` should describe in the Factory `optionRule` method. Is it required, optional. Bind with other parameter etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org