This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 7b3f42f216 [INLONG-11446][TubeMQ] Remove legacy codes for flink-tube connector (#11448) 7b3f42f216 is described below commit 7b3f42f21685a8053dd92f4e9dd472a621b1e2f9 Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Fri Nov 1 16:43:42 2024 +0800 [INLONG-11446][TubeMQ] Remove legacy codes for flink-tube connector (#11448) --- inlong-tubemq/tubemq-connectors/pom.xml | 1 - .../tubemq-connector-flink/pom.xml | 106 ------ .../org/apache/flink/connectors/tubemq/Tubemq.java | 162 ---------- .../flink/connectors/tubemq/TubemqOptions.java | 61 ---- .../connectors/tubemq/TubemqSinkFunction.java | 182 ----------- .../connectors/tubemq/TubemqSourceFunction.java | 358 --------------------- .../flink/connectors/tubemq/TubemqTableSink.java | 130 -------- .../flink/connectors/tubemq/TubemqTableSource.java | 235 -------------- .../tubemq/TubemqTableSourceSinkFactory.java | 244 -------------- .../flink/connectors/tubemq/TubemqValidator.java | 77 ----- .../org.apache.flink.table.factories.TableFactory | 16 - .../apache/flink/connectors/tubemq/TubemqTest.java | 101 ------ .../src/test/resources/log4j2.properties | 21 -- 13 files changed, 1694 deletions(-) diff --git a/inlong-tubemq/tubemq-connectors/pom.xml b/inlong-tubemq/tubemq-connectors/pom.xml index e9c4b1b147..1fc71477c3 100644 --- a/inlong-tubemq/tubemq-connectors/pom.xml +++ b/inlong-tubemq/tubemq-connectors/pom.xml @@ -29,7 +29,6 @@ <name>Apache InLong - TubeMQ Connectors</name> <modules> - <module>tubemq-connector-flink</module> <module>tubemq-connector-flume</module> <module>tubemq-connector-spark</module> </modules> diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml deleted file mode 100644 index 47bb6ccce7..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml +++ /dev/null @@ -1,106 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.inlong</groupId> - <artifactId>tubemq-connectors</artifactId> - <version>2.1.0-SNAPSHOT</version> - </parent> - - <artifactId>tubemq-connector-flink</artifactId> - <name>Apache InLong - TubeMQ Connectors-flink</name> - - <properties> - <inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir> - <flink.version>1.13.5</flink.version> - <scala.binary.version>2.11</scala.binary.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>tubemq-client</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>tubemq-core</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-common</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - <optional>true</optional> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - <optional>true</optional> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-common</artifactId> - <version>${flink.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>${junit.version}</version> - <scope>test</scope> - </dependency> - </dependencies> -</project> diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java deleted file mode 100644 index 2e34c13330..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.table.descriptors.ConnectorDescriptor; -import org.apache.flink.table.descriptors.DescriptorProperties; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_GROUP; -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_MASTER; -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_PROPERTIES; -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_STREAMIDS; -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TOPIC; -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TYPE_VALUE_TUBEMQ; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The {@link ConnectorDescriptor} for tubemq sources and sinks. - */ -public class Tubemq extends ConnectorDescriptor { - - @Nullable - private boolean consumerRole = true; - - @Nullable - private String topic; - - @Nullable - private String master; - - @Nullable - private String group; - - @Nullable - private String streamIds; - - @Nonnull - private Map<String, String> properties; - - public Tubemq() { - super(CONNECTOR_TYPE_VALUE_TUBEMQ, 1, true); - - this.properties = new HashMap<>(); - } - - /** - * Sets the tubemq topic to be used. - * - * @param topic The topic name. - */ - public Tubemq topic(String topic) { - checkNotNull(topic); - - this.topic = topic; - return this; - } - - /** - * Sets the client role to be used. - * - * @param isConsumer The client role if consumer. - */ - public Tubemq asConsumer(boolean isConsumer) { - this.consumerRole = isConsumer; - return this; - } - - /** - * Sets the address of tubemq master to connect. - * - * @param master The address of tubemq master. - */ - public Tubemq master(String master) { - checkNotNull(master); - - this.master = master; - return this; - } - - /** - * Sets the tubemq (consumer or producer) group to be used. - * - * @param group The group name. - */ - public Tubemq group(String group) { - checkNotNull(group); - - this.group = group; - return this; - } - - /** - * The tubemq consumers use these streamIds to filter records reading from server. - * - * @param streamIds The filter for consume record from server. - */ - public Tubemq streamIds(String streamIds) { - - this.streamIds = streamIds; - return this; - } - - /** - * Sets the tubemq property. - * - * @param key The key of the property. - * @param value The value of the property. - */ - public Tubemq property(String key, String value) { - checkNotNull(key); - checkNotNull(value); - - properties.put(key, value); - return this; - } - - @Override - protected Map<String, String> toConnectorProperties() { - DescriptorProperties descriptorProperties = new DescriptorProperties(); - - if (topic != null) { - descriptorProperties.putString(CONNECTOR_TOPIC, topic); - } - - if (master != null) { - descriptorProperties.putString(CONNECTOR_MASTER, master); - } - if (consumerRole) { - if (group != null) { - descriptorProperties.putString(CONNECTOR_GROUP, group); - } - - if (streamIds != null) { - descriptorProperties.putString(CONNECTOR_STREAMIDS, streamIds); - } - } - - descriptorProperties.putPropertiesWithPrefix(CONNECTOR_PROPERTIES, properties); - - return descriptorProperties.asMap(); - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java deleted file mode 100644 index e94fbf86c5..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; - -/** - * The configuration options for tubemq sources and sink. - */ -public class TubemqOptions { - - public static final ConfigOption<String> SESSION_KEY = - ConfigOptions.key("session.key") - .noDefaultValue() - .withDescription("The session key for this consumer group at startup."); - - public static final ConfigOption<String> STREAM_ID = - ConfigOptions.key("topic.streamId") - .noDefaultValue() - .withDescription("The streamId owned this topic."); - - public static final ConfigOption<Integer> MAX_RETRIES = - ConfigOptions.key("max.retries") - .defaultValue(5) - .withDescription("The maximum number of retries when an " - + "exception is caught."); - - public static final ConfigOption<Boolean> BOOTSTRAP_FROM_MAX = - ConfigOptions.key("bootstrap.from.max") - .defaultValue(false) - .withDescription("True if consuming from the most recent " - + "position when the tubemq source starts.. It only takes " - + "effect when the tubemq source does not recover from " - + "checkpoints."); - - public static final ConfigOption<String> SOURCE_MAX_IDLE_TIME = - ConfigOptions.key("source.task.max.idle.time") - .defaultValue("5min") - .withDescription("The max time of the source marked as temporarily idle."); - - public static final ConfigOption<String> MESSAGE_NOT_FOUND_WAIT_PERIOD = - ConfigOptions.key("message.not.found.wait.period") - .defaultValue("350ms") - .withDescription("The time of waiting period if tubemq broker return message not found."); -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java deleted file mode 100644 index fabd43af6e..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.inlong.tubemq.client.config.TubeClientConfig; -import org.apache.inlong.tubemq.client.factory.MessageSessionFactory; -import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; -import org.apache.inlong.tubemq.client.producer.MessageProducer; -import org.apache.inlong.tubemq.client.producer.MessageSentResult; -import org.apache.inlong.tubemq.corebase.Message; - -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashSet; - -import static org.apache.flink.connectors.tubemq.TubemqOptions.MAX_RETRIES; - -public class TubemqSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction { - - private static final Logger LOG = LoggerFactory.getLogger(TubemqSinkFunction.class); - - private static final String SYSTEM_HEADER_TIME_FORMAT = "yyyyMMddHHmm"; - - /** - * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081. - */ - private final String masterAddress; - - /** - * The topic name. - */ - private final String topic; - - /** - * The streamId of this topic - */ - private final String streamId; - /** - * The serializer for the records sent to pulsar. - */ - private final SerializationSchema<T> serializationSchema; - - /** - * The tubemq producer. - */ - private transient MessageProducer producer; - - /** - * The tubemq session factory. - */ - private transient MessageSessionFactory sessionFactory; - - /** - * The maximum number of retries. - */ - private final int maxRetries; - - public TubemqSinkFunction(String topic, - String masterAddress, - SerializationSchema<T> serializationSchema, - Configuration configuration) { - Preconditions.checkNotNull(topic, - "The topic must not be null."); - Preconditions.checkNotNull(masterAddress, - "The master address must not be null."); - Preconditions.checkNotNull(serializationSchema, - "The serialization schema must not be null."); - Preconditions.checkNotNull(configuration, - "The configuration must not be null."); - - this.topic = topic; - this.masterAddress = masterAddress; - this.serializationSchema = serializationSchema; - this.streamId = configuration.getString(TubemqOptions.STREAM_ID); - this.maxRetries = configuration.getInteger(MAX_RETRIES); - } - - @Override - public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { - // Nothing to do. - } - - @Override - public void initializeState(FunctionInitializationContext functionInitializationContext) { - // Nothing to do. - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - TubeClientConfig tubeClientConfig = new TubeClientConfig(masterAddress); - this.sessionFactory = new TubeSingleSessionFactory(tubeClientConfig); - this.producer = sessionFactory.createProducer(); - HashSet<String> hashSet = new HashSet<>(); - hashSet.add(topic); - producer.publish(hashSet); - } - - @Override - public void invoke(T in, Context context) throws Exception { - - int retries = 0; - Exception exception = null; - - while (maxRetries <= 0 || retries < maxRetries) { - - try { - byte[] body = serializationSchema.serialize(in); - Message message = new Message(topic, body); - if (StringUtils.isNotBlank(streamId)) { - SimpleDateFormat sdf = new SimpleDateFormat(SYSTEM_HEADER_TIME_FORMAT); - long currTimeMillis = System.currentTimeMillis(); - message.putSystemHeader(streamId, sdf.format(new Date(currTimeMillis))); - } - - MessageSentResult sendResult = producer.sendMessage(message); - if (sendResult.isSuccess()) { - return; - } else { - LOG.warn("Send msg fail, error code: {}, error message: {}", - sendResult.getErrCode(), sendResult.getErrMsg()); - } - } catch (Exception e) { - LOG.warn("Could not properly send the message to hippo " - + "(retries: {}).", retries, e); - - retries++; - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - } - - throw new IOException("Could not properly send the message to hippo.", exception); - } - - @Override - public void close() throws Exception { - - try { - if (producer != null) { - producer.shutdown(); - producer = null; - } - if (sessionFactory != null) { - sessionFactory.shutdown(); - sessionFactory = null; - } - } catch (Throwable e) { - LOG.error("Shutdown producer error", e); - } finally { - super.close(); - } - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java deleted file mode 100644 index 4f9fc050a9..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.inlong.tubemq.client.config.ConsumerConfig; -import org.apache.inlong.tubemq.client.consumer.ConsumePosition; -import org.apache.inlong.tubemq.client.consumer.ConsumerResult; -import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer; -import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; -import org.apache.inlong.tubemq.corebase.Message; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; - -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.TimeUtils.parseDuration; - -/** - * The Flink TubeMQ Consumer. - * - * @param <T> The type of records produced by this data source - */ -public class TubemqSourceFunction<T> - extends - RichParallelSourceFunction<T> - implements - CheckpointedFunction { - - private static final Logger LOG = - LoggerFactory.getLogger(TubemqSourceFunction.class); - - private static final String TUBE_OFFSET_STATE = "tube-offset-state"; - - private static final String SPLIT_COMMA = ","; - private static final String SPLIT_COLON = ":"; - - /** - * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081. - */ - private final String masterAddress; - - /** - * The topic name. - */ - private final String topic; - - /** - * The tubemq consumers use this streamId set to filter records reading from server. - */ - private final TreeSet<String> streamIdSet; - - /** - * The consumer group name. - */ - private final String consumerGroup; - - /** - * The deserializer for records. - */ - private final DeserializationSchema<T> deserializationSchema; - - /** - * The random key for TubeMQ consumer group when startup. - */ - private final String sessionKey; - - /** - * True if consuming message from max offset. - */ - private final boolean consumeFromMax; - - /** - * The time to wait if tubemq broker returns message not found. - */ - private final Duration messageNotFoundWaitPeriod; - - /** - * The max time to marked source idle. - */ - private final Duration maxIdleTime; - - /** - * Flag indicating whether the consumer is still running. - **/ - private volatile boolean running; - - /** - * The state for the offsets of queues. - */ - private transient ListState<Tuple2<String, Long>> offsetsState; - - /** - * The current offsets of partitions which are stored in {@link #offsetsState} - * once a checkpoint is triggered. - * - *NOTE: The offsets are populated in the main thread and saved in the - * checkpoint thread. Its usage must be guarded by the checkpoint lock.</p> - */ - private transient Map<String, Long> currentOffsets; - - /** - * The TubeMQ session factory. - */ - private transient TubeSingleSessionFactory messageSessionFactory; - - /** - * The TubeMQ pull consumer. - */ - private transient PullMessageConsumer messagePullConsumer; - - /** - * Build a TubeMQ source function - * - * @param masterAddress the master address of TubeMQ - * @param topic the topic name - * @param streamIdSet the topic's filter condition items - * @param consumerGroup the consumer group name - * @param deserializationSchema the deserialize schema - * @param configuration the configure - */ - public TubemqSourceFunction( - String masterAddress, - String topic, - TreeSet<String> streamIdSet, - String consumerGroup, - DeserializationSchema<T> deserializationSchema, - Configuration configuration) { - checkNotNull(masterAddress, - "The master address must not be null."); - checkNotNull(topic, - "The topic must not be null."); - checkNotNull(streamIdSet, - "The streamId set must not be null."); - checkNotNull(consumerGroup, - "The consumer group must not be null."); - checkNotNull(deserializationSchema, - "The deserialization schema must not be null."); - checkNotNull(configuration, - "The configuration must not be null."); - - this.masterAddress = masterAddress; - this.topic = topic; - this.streamIdSet = streamIdSet; - this.consumerGroup = consumerGroup; - this.deserializationSchema = deserializationSchema; - - this.sessionKey = - configuration.getString(TubemqOptions.SESSION_KEY); - this.consumeFromMax = - configuration.getBoolean(TubemqOptions.BOOTSTRAP_FROM_MAX); - this.messageNotFoundWaitPeriod = - parseDuration( - configuration.getString( - TubemqOptions.MESSAGE_NOT_FOUND_WAIT_PERIOD)); - this.maxIdleTime = - parseDuration( - configuration.getString( - TubemqOptions.SOURCE_MAX_IDLE_TIME)); - } - - @Override - public void initializeState( - FunctionInitializationContext context) throws Exception { - - TypeInformation<Tuple2<String, Long>> typeInformation = - new TupleTypeInfo<>(STRING_TYPE_INFO, LONG_TYPE_INFO); - ListStateDescriptor<Tuple2<String, Long>> stateDescriptor = - new ListStateDescriptor<>(TUBE_OFFSET_STATE, typeInformation); - - OperatorStateStore stateStore = context.getOperatorStateStore(); - offsetsState = stateStore.getListState(stateDescriptor); - - currentOffsets = new HashMap<>(); - if (context.isRestored()) { - for (Tuple2<String, Long> tubeOffset : offsetsState.get()) { - currentOffsets.put(tubeOffset.f0, tubeOffset.f1); - } - - LOG.info("Successfully restore the offsets {}.", currentOffsets); - } else { - LOG.info("No restore offsets."); - } - } - - @Override - public void open(Configuration parameters) throws Exception { - ConsumerConfig consumerConfig = - new ConsumerConfig(masterAddress, consumerGroup); - consumerConfig.setConsumePosition(consumeFromMax - ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS - : ConsumePosition.CONSUMER_FROM_LATEST_OFFSET); - consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis()); - - final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks(); - - messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); - - messagePullConsumer = - messageSessionFactory.createPullConsumer(consumerConfig); - messagePullConsumer - .subscribe(topic, streamIdSet); - messagePullConsumer - .completeSubscribe(sessionKey, numTasks, true, currentOffsets); - - running = true; - } - - @Override - public void run(SourceContext<T> ctx) throws Exception { - - Instant lastConsumeInstant = Instant.now(); - - while (running) { - - ConsumerResult consumeResult = messagePullConsumer.getMessage(); - if (!consumeResult.isSuccess()) { - if (!(consumeResult.getErrCode() == 400 - || consumeResult.getErrCode() == 404 - || consumeResult.getErrCode() == 405 - || consumeResult.getErrCode() == 406 - || consumeResult.getErrCode() == 407 - || consumeResult.getErrCode() == 408)) { - LOG.info("Could not consume messages from tubemq (errcode: {}, " - + "errmsg: {}).", consumeResult.getErrCode(), - consumeResult.getErrMsg()); - } - - Duration idleTime = - Duration.between(lastConsumeInstant, Instant.now()); - if (idleTime.compareTo(maxIdleTime) > 0) { - ctx.markAsTemporarilyIdle(); - } - - continue; - } - - List<Message> messageList = consumeResult.getMessageList(); - - List<T> records = new ArrayList<>(); - if (messageList != null) { - lastConsumeInstant = Instant.now(); - - for (Message message : messageList) { - T record = - deserializationSchema.deserialize(message.getData()); - records.add(record); - } - } - - synchronized (ctx.getCheckpointLock()) { - - for (T record : records) { - ctx.collect(record); - } - - currentOffsets.put( - consumeResult.getPartitionKey(), - consumeResult.getCurrOffset()); - } - - ConsumerResult confirmResult = - messagePullConsumer - .confirmConsume(consumeResult.getConfirmContext(), true); - if (!confirmResult.isSuccess()) { - if (!(confirmResult.getErrCode() == 400 - || confirmResult.getErrCode() == 404 - || confirmResult.getErrCode() == 405 - || confirmResult.getErrCode() == 406 - || confirmResult.getErrCode() == 407 - || confirmResult.getErrCode() == 408)) { - LOG.warn("Could not confirm messages to tubemq (errcode: {}, " - + "errmsg: {}).", confirmResult.getErrCode(), - confirmResult.getErrMsg()); - } - } - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - - offsetsState.clear(); - for (Map.Entry<String, Long> entry : currentOffsets.entrySet()) { - offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue())); - } - - LOG.info("Successfully save the offsets in checkpoint {}: {}.", - context.getCheckpointId(), currentOffsets); - } - - @Override - public void cancel() { - running = false; - } - - @Override - public void close() throws Exception { - - cancel(); - - if (messagePullConsumer != null) { - try { - messagePullConsumer.shutdown(); - } catch (Throwable t) { - LOG.warn("Could not properly shutdown the tubemq pull consumer.", t); - } - } - - if (messageSessionFactory != null) { - try { - messageSessionFactory.shutdown(); - } catch (Throwable t) { - LOG.warn("Could not properly shutdown the tubemq session factory.", t); - } - } - - super.close(); - - LOG.info("Closed the tubemq source."); - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java deleted file mode 100644 index b9f7e5f86d..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.sinks.AppendStreamTableSink; -import org.apache.flink.table.utils.TableConnectorUtils; -import org.apache.flink.types.Row; - -import java.util.Arrays; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Tubemq {@link org.apache.flink.table.sinks.StreamTableSink}. - */ -public class TubemqTableSink implements AppendStreamTableSink<Row> { - - /** - * Serialization schema for records to tubemq. - */ - private final SerializationSchema<Row> serializationSchema; - - /** - * The schema of the table. - */ - private final TableSchema schema; - - /** - * The tubemq topic name. - */ - private final String topic; - - /** - * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081 . - */ - private final String masterAddress; - - /** - * The parameters collection for tubemq producer. - */ - private final Configuration configuration; - - public TubemqTableSink( - SerializationSchema<Row> serializationSchema, - TableSchema schema, - String topic, - String masterAddress, - Configuration configuration) { - this.serializationSchema = checkNotNull(serializationSchema, - "The deserialization schema must not be null."); - this.schema = checkNotNull(schema, - "The schema must not be null."); - this.topic = checkNotNull(topic, - "Topic must not be null."); - this.masterAddress = checkNotNull(masterAddress, - "Master address must not be null."); - this.configuration = checkNotNull(configuration, - "The configuration must not be null."); - } - - @Override - public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) { - - final SinkFunction<Row> tubemqSinkFunction = - new TubemqSinkFunction<>( - topic, - masterAddress, - serializationSchema, - configuration); - - return dataStream - .addSink(tubemqSinkFunction) - .name( - TableConnectorUtils.generateRuntimeName( - getClass(), - getFieldNames())); - } - - @Override - public TypeInformation<Row> getOutputType() { - return schema.toRowType(); - } - - @Override - public String[] getFieldNames() { - return schema.getFieldNames(); - } - - @Override - public TypeInformation<?>[] getFieldTypes() { - return schema.getFieldTypes(); - } - - @Override - public TubemqTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { - if (!Arrays.equals(getFieldNames(), fieldNames) - || !Arrays.equals(getFieldTypes(), fieldTypes)) { - throw new ValidationException("Reconfiguration with different fields is not allowed. " - + "Expected: " + Arrays.toString(getFieldNames()) - + " / " + Arrays.toString(getFieldTypes()) + ". " - + "But was: " + Arrays.toString(fieldNames) + " / " - + Arrays.toString(fieldTypes)); - } - - return this; - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java deleted file mode 100644 index 934a441687..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.Types; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.sources.DefinedFieldMapping; -import org.apache.flink.table.sources.DefinedProctimeAttribute; -import org.apache.flink.table.sources.DefinedRowtimeAttributes; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.types.Row; - -import javax.annotation.Nullable; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.TreeSet; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * TubeMQ {@link StreamTableSource}. - */ -public class TubemqTableSource - implements - StreamTableSource<Row>, - DefinedProctimeAttribute, - DefinedRowtimeAttributes, - DefinedFieldMapping { - - /** - * Deserialization schema for records from TubeMQ. - */ - private final DeserializationSchema<Row> deserializationSchema; - - /** - * The schema of the table. - */ - private final TableSchema schema; - - /** - * Field name of the processing time attribute, null if no processing time - * field is defined. - */ - private final Optional<String> proctimeAttribute; - - /** - * Descriptors for rowtime attributes. - */ - private final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors; - - /** - * Mapping for the fields of the table schema to fields of the physical - * returned type. - */ - private final Map<String, String> fieldMapping; - - /** - * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081 . - */ - private final String masterAddress; - - /** - * The TubeMQ topic name. - */ - private final String topic; - - /** - * The TubeMQ streamId filter collection. - */ - private final TreeSet<String> streamIdSet; - - /** - * The TubeMQ consumer group name. - */ - private final String consumerGroup; - - /** - * The parameters collection for TubeMQ consumer. - */ - private final Configuration configuration; - - /** - * Build TubeMQ table source - * - * @param deserializationSchema the deserialize schema - * @param schema the data schema - * @param proctimeAttribute the proc time - * @param rowtimeAttributeDescriptors the row time attribute descriptor - * @param fieldMapping the field map information - * @param masterAddress the master address - * @param topic the topic name - * @param streamIdSet the topic's filter condition items - * @param consumerGroup the consumer group - * @param configuration the configure - */ - public TubemqTableSource( - DeserializationSchema<Row> deserializationSchema, - TableSchema schema, - Optional<String> proctimeAttribute, - List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, - Map<String, String> fieldMapping, - String masterAddress, - String topic, - TreeSet<String> streamIdSet, - String consumerGroup, - Configuration configuration) { - checkNotNull(deserializationSchema, - "The deserialization schema must not be null."); - checkNotNull(schema, - "The schema must not be null."); - checkNotNull(fieldMapping, - "The field mapping must not be null."); - checkNotNull(masterAddress, - "The master address must not be null."); - checkNotNull(topic, - "The topic must not be null."); - checkNotNull(streamIdSet, - "The streamId set must not be null."); - checkNotNull(consumerGroup, - "The consumer group must not be null."); - checkNotNull(configuration, - "The configuration must not be null."); - - this.deserializationSchema = deserializationSchema; - this.schema = schema; - this.fieldMapping = fieldMapping; - this.masterAddress = masterAddress; - this.topic = topic; - this.streamIdSet = streamIdSet; - this.consumerGroup = consumerGroup; - this.configuration = configuration; - - this.proctimeAttribute = - validateProcTimeAttribute(proctimeAttribute); - this.rowtimeAttributeDescriptors = - validateRowTimeAttributeDescriptors(rowtimeAttributeDescriptors); - } - - @Override - public TableSchema getTableSchema() { - return schema; - } - - @Nullable - @Override - public String getProctimeAttribute() { - return proctimeAttribute.orElse(null); - } - - @Override - public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() { - return rowtimeAttributeDescriptors; - } - - @Override - public Map<String, String> getFieldMapping() { - return fieldMapping; - } - - @Override - public DataStream<Row> getDataStream( - StreamExecutionEnvironment streamExecutionEnvironment) { - SourceFunction<Row> sourceFunction = - new TubemqSourceFunction<>( - masterAddress, - topic, - streamIdSet, - consumerGroup, - deserializationSchema, - configuration); - - return streamExecutionEnvironment - .addSource(sourceFunction) - .name(explainSource()); - } - - private Optional<String> validateProcTimeAttribute( - Optional<String> proctimeAttribute) { - return proctimeAttribute.map((attribute) -> { - Optional<TypeInformation<?>> tpe = schema.getFieldType(attribute); - if (!tpe.isPresent()) { - throw new ValidationException("Proc time attribute '" - + attribute + "' isn't present in TableSchema."); - } else if (tpe.get() != Types.SQL_TIMESTAMP()) { - throw new ValidationException("Proc time attribute '" - + attribute + "' isn't of type SQL_TIMESTAMP."); - } - return attribute; - }); - } - - private List<RowtimeAttributeDescriptor> validateRowTimeAttributeDescriptors( - List<RowtimeAttributeDescriptor> attributeDescriptors) { - checkNotNull(attributeDescriptors); - - for (RowtimeAttributeDescriptor desc : attributeDescriptors) { - String name = desc.getAttributeName(); - Optional<TypeInformation<?>> tpe = schema.getFieldType(name); - if (!tpe.isPresent()) { - throw new ValidationException("Row time attribute '" - + name + "' is not present."); - } else if (tpe.get() != Types.SQL_TIMESTAMP()) { - throw new ValidationException("Row time attribute '" - + name + "' is not of type SQL_TIMESTAMP."); - } - } - - return attributeDescriptors; - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java deleted file mode 100644 index e28e85c1b7..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.SchemaValidator; -import org.apache.flink.table.factories.DeserializationSchemaFactory; -import org.apache.flink.table.factories.SerializationSchemaFactory; -import org.apache.flink.table.factories.StreamTableSinkFactory; -import org.apache.flink.table.factories.StreamTableSourceFactory; -import org.apache.flink.table.factories.TableFactoryService; -import org.apache.flink.table.sinks.StreamTableSink; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.types.Row; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.TreeSet; - -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; -import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; - -/** - * Factory for creating configured instances of {@link TubemqTableSource}. - */ -public class TubemqTableSourceSinkFactory - implements - StreamTableSourceFactory<Row>, - StreamTableSinkFactory<Row> { - - private static final String SPLIT_COMMA = ","; - - public TubemqTableSourceSinkFactory() { - } - - @Override - public Map<String, String> requiredContext() { - - Map<String, String> context = new HashMap<>(); - context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND); - context.put(CONNECTOR_TYPE, TubemqValidator.CONNECTOR_TYPE_VALUE_TUBEMQ); - context.put(CONNECTOR_PROPERTY_VERSION, "1"); - - return context; - } - - @Override - public List<String> supportedProperties() { - List<String> properties = new ArrayList<>(); - - // tubemq - properties.add(TubemqValidator.CONNECTOR_TOPIC); - properties.add(TubemqValidator.CONNECTOR_MASTER); - properties.add(TubemqValidator.CONNECTOR_GROUP); - properties.add(TubemqValidator.CONNECTOR_STREAMIDS); - properties.add(TubemqValidator.CONNECTOR_PROPERTIES + ".*"); - - // schema - properties.add(SCHEMA + ".#." + SCHEMA_TYPE); - properties.add(SCHEMA + ".#." + SCHEMA_NAME); - properties.add(SCHEMA + ".#." + SCHEMA_FROM); - - // time attributes - properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME); - properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE); - properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM); - properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS); - properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED); - properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE); - properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS); - properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED); - properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY); - - // format wildcard - properties.add(FORMAT + ".*"); - - return properties; - } - - @Override - public StreamTableSource<Row> createStreamTableSource( - Map<String, String> properties) { - final DeserializationSchema<Row> deserializationSchema = - getDeserializationSchema(properties); - - final DescriptorProperties descriptorProperties = - new DescriptorProperties(true); - descriptorProperties.putProperties(properties); - - validateProperties(descriptorProperties); - - final TableSchema schema = - descriptorProperties.getTableSchema(SCHEMA); - final Optional<String> proctimeAttribute = - SchemaValidator.deriveProctimeAttribute(descriptorProperties); - final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = - SchemaValidator.deriveRowtimeAttributes(descriptorProperties); - final Map<String, String> fieldMapping = - SchemaValidator.deriveFieldMapping( - descriptorProperties, - Optional.of(deserializationSchema.getProducedType())); - final String topic = - descriptorProperties.getString(TubemqValidator.CONNECTOR_TOPIC); - final String masterAddress = - descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER); - final String consumerGroup = - descriptorProperties.getString(TubemqValidator.CONNECTOR_GROUP); - final String streamIds = - descriptorProperties - .getOptionalString(TubemqValidator.CONNECTOR_STREAMIDS) - .orElse(null); - final Configuration configuration = - getConfiguration(descriptorProperties); - - TreeSet<String> streamIdSet = new TreeSet<>(); - if (streamIds != null) { - streamIdSet.addAll(Arrays.asList(streamIds.split(SPLIT_COMMA))); - } - - return new TubemqTableSource( - deserializationSchema, - schema, - proctimeAttribute, - rowtimeAttributeDescriptors, - fieldMapping, - masterAddress, - topic, - streamIdSet, - consumerGroup, - configuration); - } - - @Override - public StreamTableSink<Row> createStreamTableSink( - Map<String, String> properties) { - final SerializationSchema<Row> serializationSchema = - getSerializationSchema(properties); - - final DescriptorProperties descriptorProperties = - new DescriptorProperties(true); - descriptorProperties.putProperties(properties); - - validateProperties(descriptorProperties); - - final TableSchema tableSchema = - descriptorProperties.getTableSchema(SCHEMA); - final String topic = - descriptorProperties.getString(TubemqValidator.CONNECTOR_TOPIC); - final String masterAddress = - descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER); - - final Configuration configuration = - getConfiguration(descriptorProperties); - - return new TubemqTableSink( - serializationSchema, - tableSchema, - topic, - masterAddress, - configuration); - } - - private SerializationSchema<Row> getSerializationSchema( - Map<String, String> properties) { - @SuppressWarnings("unchecked") - final SerializationSchemaFactory<Row> formatFactory = - TableFactoryService.find( - SerializationSchemaFactory.class, - properties, - this.getClass().getClassLoader()); - - return formatFactory.createSerializationSchema(properties); - } - - private void validateProperties(DescriptorProperties descriptorProperties) { - new SchemaValidator(true, false, false).validate(descriptorProperties); - new TubemqValidator().validate(descriptorProperties); - } - - private DeserializationSchema<Row> getDeserializationSchema( - Map<String, String> properties) { - @SuppressWarnings("unchecked") - final DeserializationSchemaFactory<Row> formatFactory = - TableFactoryService.find( - DeserializationSchemaFactory.class, - properties, - this.getClass().getClassLoader()); - - return formatFactory.createDeserializationSchema(properties); - } - - private Configuration getConfiguration( - DescriptorProperties descriptorProperties) { - Map<String, String> properties = - descriptorProperties.getPropertiesWithPrefix(TubemqValidator.CONNECTOR_PROPERTIES); - - Configuration configuration = new Configuration(); - for (Map.Entry<String, String> property : properties.entrySet()) { - configuration.setString(property.getKey(), property.getValue()); - } - - return configuration; - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java deleted file mode 100644 index bddaec4eea..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; -import org.apache.flink.table.descriptors.DescriptorProperties; - -/** - * The validator for {@link Tubemq}. - */ -public class TubemqValidator extends ConnectorDescriptorValidator { - - /** - * The type of connector. - */ - public static final String CONNECTOR_TYPE_VALUE_TUBEMQ = "tubemq"; - - /** - * The address of tubemq master. - */ - public static final String CONNECTOR_MASTER = "connector.master"; - - /** - * The tubemq topic name. - */ - public static final String CONNECTOR_TOPIC = "connector.topic"; - - /** - * The tubemq (consumer or producer) group name. - */ - public static final String CONNECTOR_GROUP = "connector.group"; - - /** - * The tubemq consumers use these streamIds to filter records reading from server. - */ - public static final String CONNECTOR_STREAMIDS = "connector.stream-ids"; - - /** - * The prefix of tubemq properties (optional). - */ - public static final String CONNECTOR_PROPERTIES = "connector.properties"; - - @Override - public void validate(DescriptorProperties properties) { - super.validate(properties); - - // Validates that the connector type is tubemq. - properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_TUBEMQ, false); - - // Validate that the topic name is set. - properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE); - - // Validate that the master address is set. - properties.validateString(CONNECTOR_MASTER, false, 1, Integer.MAX_VALUE); - - // Validate that the group name is set. - properties.validateString(CONNECTOR_GROUP, false, 1, Integer.MAX_VALUE); - - // Validate that the streamIds is set. - properties.validateString(CONNECTOR_STREAMIDS, true, 1, Integer.MAX_VALUE); - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory deleted file mode 100644 index 30831743b7..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.flink.connectors.tubemq.TubemqTableSourceSinkFactory diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java deleted file mode 100644 index 7f0e68def3..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.table.descriptors.Descriptor; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.DescriptorTestBase; -import org.apache.flink.table.descriptors.DescriptorValidator; -import org.junit.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Unit tests for {@link Tubemq}. - */ -public class TubemqTest extends DescriptorTestBase { - - @Override - protected List<Descriptor> descriptors() { - final Descriptor descriptor1 = - new Tubemq() - .topic("test-topic-1") - .master("localhost:9001") - .group("test-group-1"); - - final Descriptor descriptor2 = - new Tubemq() - .topic("test-topic-2") - .master("localhost:9001") - .group("test-group-2") - .property("bootstrap.from.max", "true"); - - final Descriptor descriptor3 = - new Tubemq() - .topic("test-topic-3") - .master("localhost:9001") - .group("test-group-3") - .streamIds("test-streamId-1,test-streamId-2"); - - return Arrays.asList(descriptor1, descriptor2, descriptor3); - } - - @Override - protected List<Map<String, String>> properties() { - final Map<String, String> props1 = new HashMap<>(); - props1.put("connector.property-version", "1"); - props1.put("connector.type", "tubemq"); - props1.put("connector.master", "localhost:9001"); - props1.put("connector.topic", "test-topic-1"); - props1.put("connector.group", "test-group-1"); - - final Map<String, String> props2 = new HashMap<>(); - props2.put("connector.property-version", "1"); - props2.put("connector.type", "tubemq"); - props2.put("connector.master", "localhost:9001"); - props2.put("connector.topic", "test-topic-2"); - props2.put("connector.group", "test-group-2"); - props2.put("connector.properties.bootstrap.from.max", "true"); - - final Map<String, String> props3 = new HashMap<>(); - props3.put("connector.property-version", "1"); - props3.put("connector.type", "tubemq"); - props3.put("connector.master", "localhost:9001"); - props3.put("connector.topic", "test-topic-3"); - props3.put("connector.stream-ids", "test-streamId-1,test-streamId-2"); - props3.put("connector.group", "test-group-3"); - - return Arrays.asList(props1, props2, props3); - } - - @Override - protected DescriptorValidator validator() { - return new TubemqValidator(); - } - - @Test - public void testTubePropertiesValidator() { - DescriptorValidator validator = this.validator(); - DescriptorProperties descriptorProperties = new DescriptorProperties(); - descriptorProperties.putProperties(properties().get(0)); - validator.validate(descriptorProperties); - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/resources/log4j2.properties b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/resources/log4j2.properties deleted file mode 100644 index 1da9b515e3..0000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/resources/log4j2.properties +++ /dev/null @@ -1,21 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -rootLogger=info, A1 -# A1 is set to be a ConsoleAppender. -appender.A1.name=A1 -appender.A1.type=Console