This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 3318c1c30 [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927) 3318c1c30 is described below commit 3318c1c306e72e0074b071f37937745bddfb4db5 Author: Xin Gong <genzhedangd...@gmail.com> AuthorDate: Mon Sep 19 11:37:57 2022 +0800 [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927) --- .../org/apache/inlong/sort/base/Constants.java | 4 + .../inlong/sort/base/metric/MetricState.java | 8 + .../inlong/sort/base/metric/SinkMetricData.java | 82 +- .../inlong/sort/base/util/MetricStateUtils.java | 24 + inlong-sort/sort-connectors/kafka/pom.xml | 6 + .../inlong/sort/kafka/FlinkKafkaConsumer.java | 352 +++++ .../inlong/sort/kafka/FlinkKafkaConsumerBase.java | 1350 ++++++++++++++++++++ .../inlong/sort/kafka/FlinkKafkaProducer.java | 69 +- .../table/DynamicKafkaDeserializationSchema.java | 61 +- .../sort/kafka/table/KafkaDynamicSource.java | 82 +- .../sort/cdc/debezium/DebeziumSourceFunction.java | 2 + licenses/inlong-sort-connectors/LICENSE | 12 + 12 files changed, 1933 insertions(+), 119 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 9daed86e0..93951770b 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -37,6 +37,10 @@ public final class Constants { public static final String NUM_RECORDS_OUT = "numRecordsOut"; + public static final String NUM_BYTES_OUT_FOR_METER = "numBytesOutForMeter"; + + public static final String NUM_RECORDS_OUT_FOR_METER = "numRecordsOutForMeter"; + public static final String NUM_BYTES_OUT_PER_SECOND = "numBytesOutPerSecond"; public static final String NUM_RECORDS_OUT_PER_SECOND = "numRecordsOutPerSecond"; diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java index 9240c0c8a..604800ccf 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java @@ -62,4 +62,12 @@ public class MetricState implements Serializable { } return 0L; } + + @Override + public String toString() { + return "MetricState{" + + "subtaskIndex=" + subtaskIndex + + ", metrics=" + metrics.toString() + + '}'; + } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 67b47657e..4073ddd44 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -34,8 +34,10 @@ import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES; import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND; /** @@ -50,6 +52,8 @@ public class SinkMetricData implements MetricData { private AuditImp auditImp; private Counter numRecordsOut; private Counter numBytesOut; + private Counter numRecordsOutForMeter; + private Counter numBytesOutForMeter; private Counter dirtyRecords; private Counter dirtyBytes; private Meter numRecordsOutPerSecond; @@ -76,6 +80,43 @@ public class SinkMetricData implements MetricData { } } + /** + * Default counter is {@link SimpleCounter} + * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter + * prometheus + */ + public void registerMetricsForNumRecordsOutForMeter() { + registerMetricsForNumRecordsOutForMeter(new SimpleCounter()); + } + + /** + * User can use custom counter that extends from {@link Counter} + * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter + * prometheus + */ + public void registerMetricsForNumRecordsOutForMeter(Counter counter) { + numRecordsOutForMeter = registerCounter(NUM_RECORDS_OUT_FOR_METER, counter); + } + + /** + * Default counter is {@link SimpleCounter} + * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter + * prometheus + */ + public void registerMetricsForNumBytesOutForMeter() { + registerMetricsForNumBytesOutForMeter(new SimpleCounter()); + + } + + /** + * User can use custom counter that extends from {@link Counter} + * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter + * prometheus + */ + public void registerMetricsForNumBytesOutForMeter(Counter counter) { + numBytesOutForMeter = registerCounter(NUM_BYTES_OUT_FOR_METER, counter); + } + /** * Default counter is {@link SimpleCounter} * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter @@ -114,11 +155,11 @@ public class SinkMetricData implements MetricData { } public void registerMetricsForNumRecordsOutPerSecond() { - numRecordsOutPerSecond = registerMeter(NUM_RECORDS_OUT_PER_SECOND, this.numRecordsOut); + numRecordsOutPerSecond = registerMeter(NUM_RECORDS_OUT_PER_SECOND, this.numRecordsOutForMeter); } public void registerMetricsForNumBytesOutPerSecond() { - numBytesOutPerSecond = registerMeter(NUM_BYTES_OUT_PER_SECOND, this.numBytesOut); + numBytesOutPerSecond = registerMeter(NUM_BYTES_OUT_PER_SECOND, this.numBytesOutForMeter); } public void registerMetricsForDirtyRecords() { @@ -191,10 +232,20 @@ public class SinkMetricData implements MetricData { return nodeId; } + public Counter getNumRecordsOutForMeter() { + return numRecordsOutForMeter; + } + + public Counter getNumBytesOutForMeter() { + return numBytesOutForMeter; + } + public void invokeWithEstimate(Object o) { long size = o.toString().getBytes(StandardCharsets.UTF_8).length; - getNumRecordsOut().inc(); - getNumBytesOut().inc(size); + this.numRecordsOut.inc(); + this.numBytesOut.inc(size); + this.numRecordsOutForMeter.inc(); + this.numBytesOutForMeter.inc(size); if (auditImp != null) { auditImp.add( Constants.AUDIT_SORT_OUTPUT, @@ -207,8 +258,10 @@ public class SinkMetricData implements MetricData { } public void invoke(long rowCount, long rowSize) { - getNumRecordsOut().inc(rowCount); - getNumBytesOut().inc(rowSize); + this.numRecordsOut.inc(rowCount); + this.numBytesOut.inc(rowSize); + this.numRecordsOutForMeter.inc(rowCount); + this.numBytesOutForMeter.inc(rowSize); if (auditImp != null) { auditImp.add( Constants.AUDIT_SORT_OUTPUT, @@ -219,4 +272,21 @@ public class SinkMetricData implements MetricData { rowSize); } } + + @Override + public String toString() { + return "SinkMetricData{" + + "groupId='" + groupId + '\'' + + ", streamId='" + streamId + '\'' + + ", nodeId='" + nodeId + '\'' + + ", numRecordsOut=" + numRecordsOut.getCount() + + ", numBytesOut=" + numBytesOut.getCount() + + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount() + + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount() + + ", dirtyRecords=" + dirtyRecords.getCount() + + ", dirtyBytes=" + dirtyBytes.getCount() + + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate() + + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate() + + '}'; + } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java index d878381ba..416c8b719 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java @@ -21,6 +21,7 @@ package org.apache.inlong.sort.base.util; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.state.ListState; import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SinkMetricData; import org.apache.inlong.sort.base.metric.SourceMetricData; import java.util.ArrayList; @@ -29,7 +30,9 @@ import java.util.List; import java.util.Map; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; /** * metric state for {@link MetricState} supporting snapshot and restore @@ -125,4 +128,25 @@ public class MetricStateUtils { metricStateListState.add(metricState); } + /** + * + * Snapshot metric state data for {@link SinkMetricData} + * @param metricStateListState state data list + * @param sinkMetricData {@link SinkMetricData} A collection class for handling metrics + * @param subtaskIndex subtask index + * @throws Exception throw exception when add metric state + */ + public static void snapshotMetricStateForSinkMetricData(ListState<MetricState> metricStateListState, + SinkMetricData sinkMetricData, Integer subtaskIndex) + throws Exception { + log.info("snapshotMetricStateForSinkMetricData:{}, sinkMetricData:{}, subtaskIndex:{}", + metricStateListState, sinkMetricData, subtaskIndex); + metricStateListState.clear(); + Map<String, Long> metricDataMap = new HashMap<>(); + metricDataMap.put(NUM_RECORDS_OUT, sinkMetricData.getNumRecordsOut().getCount()); + metricDataMap.put(NUM_BYTES_OUT, sinkMetricData.getNumBytesOut().getCount()); + MetricState metricState = new MetricState(subtaskIndex, metricDataMap); + metricStateListState.add(metricState); + } + } diff --git a/inlong-sort/sort-connectors/kafka/pom.xml b/inlong-sort/sort-connectors/kafka/pom.xml index f6ace504d..13a1c2098 100644 --- a/inlong-sort/sort-connectors/kafka/pom.xml +++ b/inlong-sort/sort-connectors/kafka/pom.xml @@ -92,6 +92,12 @@ </filter> </filters> <relocations> + <relocation> + <pattern>org.apache.inlong.sort.base</pattern> + <shadedPattern> + org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.base + </shadedPattern> + </relocation> <relocation> <pattern>org.apache.kafka</pattern> <shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern> diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java new file mode 100644 index 000000000..924944188 --- /dev/null +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kafka; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; +import org.apache.flink.util.PropertiesUtil; +import org.apache.flink.util.SerializedValue; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.PropertiesUtil.getBoolean; +import static org.apache.flink.util.PropertiesUtil.getLong; + +/** + * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5 + * + * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from Apache + * Kafka. The consumer can run in multiple parallel instances, each of which will pull data from one + * or more Kafka partitions. + * + * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost + * during a failure, and that the computation processes elements "exactly once". (Note: These + * guarantees naturally assume that Kafka itself does not loose any data.) + * + * <p>Please note that Flink snapshots the offsets internally as part of its distributed + * checkpoints. The offsets committed to Kafka are only to bring the outside view of progress in + * sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of how + * far the Flink Kafka consumer has consumed a topic. + * + * <p>Please refer to Kafka's documentation for the available configuration properties: + * http://kafka.apache.org/documentation.html#newconsumerconfigs + */ +@PublicEvolving +public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> { + + private static final long serialVersionUID = 1L; + + /** + * Configuration key to change the polling timeout. * + */ + public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout"; + + /** + * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not + * available. If 0, returns immediately with any records that are available now. + */ + public static final long DEFAULT_POLL_TIMEOUT = 100L; + + // ------------------------------------------------------------------------ + + /** + * User-supplied properties for Kafka. * + */ + protected final Properties properties; + + /** + * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not + * available. If 0, returns immediately with any records that are available now + */ + protected final long pollTimeout; + + // ------------------------------------------------------------------------ + + /** + * Creates a new Kafka streaming source consumer. + * + * @param topic The name of the topic that should be consumed. + * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and + * Flink's objects. + * @param props + */ + public FlinkKafkaConsumer( + String topic, DeserializationSchema<T> valueDeserializer, Properties props, String inlongMetric, + String auditHostAndPorts) { + this(Collections.singletonList(topic), valueDeserializer, props, inlongMetric, auditHostAndPorts); + } + + /** + * Creates a new Kafka streaming source consumer. + * + * <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value + * pairs, offsets, and topic names from Kafka. + * + * @param topic The name of the topic that should be consumed. + * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages + * and Flink's objects. + * @param props + */ + public FlinkKafkaConsumer( + String topic, KafkaDeserializationSchema<T> deserializer, Properties props, String inlongMetric, + String auditHostAndPorts) { + this(Collections.singletonList(topic), deserializer, props, inlongMetric, auditHostAndPorts); + } + + /** + * Creates a new Kafka streaming source consumer. + * + * <p>This constructor allows passing multiple topics to the consumer. + * + * @param topics The Kafka topics to read from. + * @param deserializer The de-/serializer used to convert between Kafka's byte messages and + * Flink's objects. + * @param props + */ + public FlinkKafkaConsumer( + List<String> topics, DeserializationSchema<T> deserializer, Properties props, String inlongMetric, + String auditHostAndPorts) { + this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props, inlongMetric, auditHostAndPorts); + } + + /** + * Creates a new Kafka streaming source consumer. + * + * <p>This constructor allows passing multiple topics and a key/value deserialization schema. + * + * @param topics The Kafka topics to read from. + * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages + * and Flink's objects. + * @param props + */ + public FlinkKafkaConsumer( + List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props, String inlongMetric, + String auditHostAndPorts) { + this(topics, null, deserializer, props, inlongMetric, auditHostAndPorts); + } + + /** + * Creates a new Kafka streaming source consumer. Use this constructor to subscribe to multiple + * topics based on a regular expression pattern. + * + * <p>If partition discovery is enabled (by setting a non-negative value for {@link + * FlinkKafkaConsumer#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics with + * names matching the pattern will also be subscribed to as they are created on the fly. + * + * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe + * to. + * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and + * Flink's objects. + * @param props + */ + public FlinkKafkaConsumer( + Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, + Properties props, String inlongMetric, String auditHostAndPorts) { + this( + null, + subscriptionPattern, + new KafkaDeserializationSchemaWrapper<>(valueDeserializer), + props, inlongMetric, auditHostAndPorts); + } + + /** + * Creates a new Kafka streaming source consumer. Use this constructor to subscribe to multiple + * topics based on a regular expression pattern. + * + * <p>If partition discovery is enabled (by setting a non-negative value for {@link + * FlinkKafkaConsumer#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics with + * names matching the pattern will also be subscribed to as they are created on the fly. + * + * <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value + * pairs, offsets, and topic names from Kafka. + * + * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe + * to. + * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages + * and Flink's objects. + * @param props + */ + public FlinkKafkaConsumer( + Pattern subscriptionPattern, + KafkaDeserializationSchema<T> deserializer, + Properties props, String inlongMetric, String auditHostAndPorts) { + this(null, subscriptionPattern, deserializer, props, inlongMetric, auditHostAndPorts); + } + + private FlinkKafkaConsumer( + List<String> topics, + Pattern subscriptionPattern, + KafkaDeserializationSchema<T> deserializer, + Properties props, String inlongMetric, + String auditHostAndPorts) { + + super( + topics, + subscriptionPattern, + deserializer, + getLong( + checkNotNull(props, "props"), + KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, + PARTITION_DISCOVERY_DISABLED), + !getBoolean(props, KEY_DISABLE_METRICS, false), inlongMetric, auditHostAndPorts); + + this.properties = props; + setDeserializer(this.properties); + + // configure the polling timeout + try { + if (properties.containsKey(KEY_POLL_TIMEOUT)) { + this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT)); + } else { + this.pollTimeout = DEFAULT_POLL_TIMEOUT; + } + } catch (Exception e) { + throw new IllegalArgumentException( + "Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e); + } + } + + @Override + protected AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, + SerializedValue<WatermarkStrategy<T>> watermarkStrategy, + StreamingRuntimeContext runtimeContext, + OffsetCommitMode offsetCommitMode, + MetricGroup consumerMetricGroup, + boolean useMetrics) + throws Exception { + + // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS; + // this overwrites whatever setting the user configured in the properties + adjustAutoCommitConfig(properties, offsetCommitMode); + + return new KafkaFetcher<>( + sourceContext, + assignedPartitionsWithInitialOffsets, + watermarkStrategy, + runtimeContext.getProcessingTimeService(), + runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), + runtimeContext.getUserCodeClassLoader(), + runtimeContext.getTaskNameWithSubtasks(), + deserializer, + properties, + pollTimeout, + runtimeContext.getMetricGroup(), + consumerMetricGroup, + useMetrics); + } + + @Override + protected AbstractPartitionDiscoverer createPartitionDiscoverer( + KafkaTopicsDescriptor topicsDescriptor, + int indexOfThisSubtask, + int numParallelSubtasks) { + + return new KafkaPartitionDiscoverer( + topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties); + } + + @Override + protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp( + Collection<KafkaTopicPartition> partitions, long timestamp) { + + Map<TopicPartition, Long> partitionOffsetsRequest = new HashMap<>(partitions.size()); + for (KafkaTopicPartition partition : partitions) { + partitionOffsetsRequest.put( + new TopicPartition(partition.getTopic(), partition.getPartition()), timestamp); + } + + final Map<KafkaTopicPartition, Long> result = new HashMap<>(partitions.size()); + // use a short-lived consumer to fetch the offsets; + // this is ok because this is a one-time operation that happens only on startup + try (KafkaConsumer<?, ?> consumer = new KafkaConsumer(properties)) { + for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset : + consumer.offsetsForTimes(partitionOffsetsRequest).entrySet()) { + + result.put( + new KafkaTopicPartition( + partitionToOffset.getKey().topic(), + partitionToOffset.getKey().partition()), + (partitionToOffset.getValue() == null) + ? null + : partitionToOffset.getValue().offset()); + } + } + return result; + } + + @Override + protected boolean getIsAutoCommitEnabled() { + return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) + && PropertiesUtil.getLong( + properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) + > 0; + } + + /** + * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties. + * + * @param props The Kafka properties to register the serializer in. + */ + private static void setDeserializer(Properties props) { + final String deSerName = ByteArrayDeserializer.class.getName(); + + Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); + + if (keyDeSer != null && !keyDeSer.equals(deSerName)) { + LOG.warn( + "Ignoring configured key DeSerializer ({})", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + } + if (valDeSer != null && !valDeSer.equals(deSerName)) { + LOG.warn( + "Ignoring configured value DeSerializer ({})", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); + } + + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName); + } +} diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java new file mode 100644 index 000000000..0d0ab4544 --- /dev/null +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java @@ -0,0 +1,1350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kafka; + +import org.apache.commons.collections.map.LinkedMap; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode; +import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; +import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter; +import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.SerializedValue; +import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.metric.ThreadSafeCounter; +import org.apache.inlong.sort.base.util.MetricStateUtils; +import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_FAILED_METRICS_COUNTER; +import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_SUCCEEDED_METRICS_COUNTER; +import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.inlong.sort.base.Constants.DELIMITER; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; + +/** + * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5 + * + * Base class of all Flink Kafka Consumer data sources. This implements the common behavior across + * all Kafka versions. + * + * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the {@link + * AbstractFetcher}. + * + * @param <T> The type of records produced by this data source + */ +@Internal +public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> + implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction { + + private static final long serialVersionUID = -6272159445203409112L; + + protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class); + + /** + * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. + */ + public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; + + /** + * The default interval to execute partition discovery, in milliseconds ({@code Long.MIN_VALUE}, + * i.e. disabled by default). + */ + public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE; + + /** + * Boolean configuration key to disable metrics tracking. * + */ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + /** + * Configuration key to define the consumer's partition discovery interval, in milliseconds. + */ + public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = + "flink.partition-discovery.interval-millis"; + + /** + * State name of the consumer's partition offset states. + */ + private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states"; + + // ------------------------------------------------------------------------ + // configuration state, set on the client relevant for all subtasks + // ------------------------------------------------------------------------ + + /** + * Describes whether we are discovering partitions for fixed topics or a topic pattern. + */ + private final KafkaTopicsDescriptor topicsDescriptor; + + /** + * The schema to convert between Kafka's byte messages, and Flink's objects. + */ + protected final KafkaDeserializationSchema<T> deserializer; + + /** + * The set of topic partitions that the source will read, with their initial offsets to start + * reading from. + */ + private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets; + + /** + * Optional watermark strategy that will be run per Kafka partition, to exploit per-partition + * timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize + * it into multiple copies. + */ + private SerializedValue<WatermarkStrategy<T>> watermarkStrategy; + + /** + * User-set flag determining whether or not to commit on checkpoints. Note: this flag does not + * represent the final offset commit mode. + */ + private boolean enableCommitOnCheckpoints = true; + + /** + * User-set flag to disable filtering restored partitions with current topics descriptor. + */ + private boolean filterRestoredPartitionsWithCurrentTopicsDescriptor = true; + + /** + * The offset commit mode for the consumer. The value of this can only be determined in {@link + * FlinkKafkaConsumerBase#open(Configuration)} since it depends on whether or not checkpointing + * is enabled for the job. + */ + private OffsetCommitMode offsetCommitMode; + + /** + * User configured value for discovery interval, in milliseconds. + */ + private final long discoveryIntervalMillis; + + /** + * The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}). + */ + private StartupMode startupMode = StartupMode.GROUP_OFFSETS; + + /** + * Specific startup offsets; only relevant when startup mode is {@link + * StartupMode#SPECIFIC_OFFSETS}. + */ + private Map<KafkaTopicPartition, Long> specificStartupOffsets; + + /** + * Timestamp to determine startup offsets; only relevant when startup mode is {@link + * StartupMode#TIMESTAMP}. + */ + private Long startupOffsetsTimestamp; + + // ------------------------------------------------------------------------ + // runtime state (used individually by each parallel subtask) + // ------------------------------------------------------------------------ + + /** + * Data for pending but uncommitted offsets. + */ + private final LinkedMap pendingOffsetsToCommit = new LinkedMap(); + + /** + * The fetcher implements the connections to the Kafka brokers. + */ + private transient volatile AbstractFetcher<T, ?> kafkaFetcher; + + /** + * The partition discoverer, used to find new partitions. + */ + private transient volatile AbstractPartitionDiscoverer partitionDiscoverer; + + /** + * The offsets to restore to, if the consumer restores state from a checkpoint. + * + * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} + * method. + * + * <p>Using a sorted map as the ordering is important when using restored state to seed the + * partition discoverer. + */ + private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState; + + /** + * Accessor for state in the operator state backend. + */ + private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; + + /** + * Discovery loop, executed in a separate thread. + */ + private transient volatile Thread discoveryLoopThread; + + /** + * Flag indicating whether the consumer is still running. + */ + private volatile boolean running = true; + + // ------------------------------------------------------------------------ + // internal metrics + // ------------------------------------------------------------------------ + + /** + * Flag indicating whether or not metrics should be exposed. If {@code true}, offset metrics + * (e.g. current offset, committed offset) and Kafka-shipped metrics will be registered. + */ + private final boolean useMetrics; + + /** + * Counter for successful Kafka offset commits. + */ + private transient Counter successfulCommits; + + /** + * Counter for failed Kafka offset commits. + */ + private transient Counter failedCommits; + + /** + * Callback interface that will be invoked upon async Kafka commit completion. Please be aware + * that default callback implementation in base class does not provide any guarantees on + * thread-safety. This is sufficient for now because current supported Kafka connectors + * guarantee no more than 1 concurrent async pending offset commit. + */ + private transient KafkaCommitCallback offsetCommitCallback; + + private transient ListState<MetricState> metricStateListState; + + private MetricState metricState; + + /** + * Metric for InLong + */ + private String inlongMetric; + /** + * audit host and ports + */ + private String inlongAudit; + + private SourceMetricData sourceMetricData; + + // ------------------------------------------------------------------------ + + /** + * Base constructor. + * + * @param topics fixed list of topics to subscribe to (null, if using topic pattern) + * @param topicPattern the topic pattern to subscribe to (null, if using fixed topics) + * @param deserializer The deserializer to turn raw byte messages into Java/Scala objects. + * @param discoveryIntervalMillis the topic / partition discovery interval, in milliseconds (0 + * if discovery is disabled). + */ + public FlinkKafkaConsumerBase( + List<String> topics, + Pattern topicPattern, + KafkaDeserializationSchema<T> deserializer, + long discoveryIntervalMillis, + boolean useMetrics, String inlongMetric, String auditHostAndPorts) { + this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern); + this.deserializer = checkNotNull(deserializer, "valueDeserializer"); + + checkArgument( + discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED + || discoveryIntervalMillis >= 0, + "Cannot define a negative value for the topic / partition discovery interval."); + this.discoveryIntervalMillis = discoveryIntervalMillis; + + this.useMetrics = useMetrics; + this.inlongMetric = inlongMetric; + this.inlongAudit = auditHostAndPorts; + } + + /** + * Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS. This + * overwrites whatever setting the user configured in the properties. + * + * @param properties - Kafka configuration properties to be adjusted + * @param offsetCommitMode offset commit mode + */ + protected static void adjustAutoCommitConfig( + Properties properties, OffsetCommitMode offsetCommitMode) { + if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS + || offsetCommitMode == OffsetCommitMode.DISABLED) { + properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + } + } + // ------------------------------------------------------------------------ + // Configuration + // ------------------------------------------------------------------------ + + /** + * Sets the given {@link WatermarkStrategy} on this consumer. These will be used to assign + * timestamps to records and generates watermarks to signal event time progress. + * + * <p>Running timestamp extractors / watermark generators directly inside the Kafka source + * (which you can do by using this method), per Kafka partition, allows users to let them + * exploit the per-partition characteristics. + * + * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams + * from the partitions are unioned in a "first come first serve" fashion. Per-partition + * characteristics are usually lost that way. For example, if the timestamps are strictly + * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink + * DataStream, if the parallel source subtask reads more than one partition. + * + * <p>Common watermark generation patterns can be found as static methods in the {@link + * WatermarkStrategy} class. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks( + WatermarkStrategy<T> watermarkStrategy) { + checkNotNull(watermarkStrategy); + + try { + ClosureCleaner.clean( + watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + this.watermarkStrategy = new SerializedValue<>(watermarkStrategy); + } catch (Exception e) { + throw new IllegalArgumentException( + "The given WatermarkStrategy is not serializable", e); + } + + return this; + } + + /** + * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated + * manner. The watermark extractor will run per Kafka partition, watermarks will be merged + * across partitions in the same way as in the Flink runtime, when streams are merged. + * + * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams + * from the partitions are unioned in a "first come first serve" fashion. Per-partition + * characteristics are usually lost that way. For example, if the timestamps are strictly + * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink + * DataStream, if the parallel source subtask reads more than one partition. + * + * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per + * Kafka partition, allows users to let them exploit the per-partition characteristics. + * + * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an {@link + * AssignerWithPeriodicWatermarks}, not both at the same time. + * + * <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link + * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new + * interfaces support watermark idleness and no longer need to differentiate between "periodic" + * and "punctuated" watermarks. + * + * @param assigner The timestamp assigner / watermark generator to use. + * @return The consumer object, to allow function chaining. + * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead. + */ + @Deprecated + public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks( + AssignerWithPunctuatedWatermarks<T> assigner) { + checkNotNull(assigner); + + if (this.watermarkStrategy != null) { + throw new IllegalStateException("Some watermark strategy has already been set."); + } + + try { + ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + final WatermarkStrategy<T> wms = + new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(assigner); + + return assignTimestampsAndWatermarks(wms); + } catch (Exception e) { + throw new IllegalArgumentException("The given assigner is not serializable", e); + } + } + + /** + * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated + * manner. The watermark extractor will run per Kafka partition, watermarks will be merged + * across partitions in the same way as in the Flink runtime, when streams are merged. + * + * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams + * from the partitions are unioned in a "first come first serve" fashion. Per-partition + * characteristics are usually lost that way. For example, if the timestamps are strictly + * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink + * DataStream, if the parallel source subtask reads more that one partition. + * + * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per + * Kafka partition, allows users to let them exploit the per-partition characteristics. + * + * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an {@link + * AssignerWithPeriodicWatermarks}, not both at the same time. + * + * <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link + * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new + * interfaces support watermark idleness and no longer need to differentiate between "periodic" + * and "punctuated" watermarks. + * + * @param assigner The timestamp assigner / watermark generator to use. + * @return The consumer object, to allow function chaining. + * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead. + */ + @Deprecated + public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks( + AssignerWithPeriodicWatermarks<T> assigner) { + checkNotNull(assigner); + + if (this.watermarkStrategy != null) { + throw new IllegalStateException("Some watermark strategy has already been set."); + } + + try { + ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + final WatermarkStrategy<T> wms = + new AssignerWithPeriodicWatermarksAdapter.Strategy<>(assigner); + + return assignTimestampsAndWatermarks(wms); + } catch (Exception e) { + throw new IllegalArgumentException("The given assigner is not serializable", e); + } + } + + /** + * Specifies whether or not the consumer should commit offsets back to Kafka on checkpoints. + * + * <p>This setting will only have effect if checkpointing is enabled for the job. If + * checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" + * (for 0.9+) property settings will be used. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) { + this.enableCommitOnCheckpoints = commitOnCheckpoints; + return this; + } + + /** + * Specifies the consumer to start reading from the earliest offset for all partitions. This + * lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + * + * <p>This method does not affect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, + * only the offsets in the restored state will be used. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> setStartFromEarliest() { + this.startupMode = StartupMode.EARLIEST; + this.startupOffsetsTimestamp = null; + this.specificStartupOffsets = null; + return this; + } + + /** + * Specifies the consumer to start reading from the latest offset for all partitions. This lets + * the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + * + * <p>This method does not affect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, + * only the offsets in the restored state will be used. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> setStartFromLatest() { + this.startupMode = StartupMode.LATEST; + this.startupOffsetsTimestamp = null; + this.specificStartupOffsets = null; + return this; + } + + /** + * Specifies the consumer to start reading partitions from a specified timestamp. The specified + * timestamp must be before the current timestamp. This lets the consumer ignore any committed + * group offsets in Zookeeper / Kafka brokers. + * + * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal to + * the specific timestamp from Kafka. If there's no such offset, the consumer will use the + * latest offset to read data from kafka. + * + * <p>This method does not affect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, + * only the offsets in the restored state will be used. + * + * @param startupOffsetsTimestamp timestamp for the startup offsets, as milliseconds from epoch. + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) { + checkArgument( + startupOffsetsTimestamp >= 0, + "The provided value for the startup offsets timestamp is invalid."); + + long currentTimestamp = System.currentTimeMillis(); + checkArgument( + startupOffsetsTimestamp <= currentTimestamp, + "Startup time[%s] must be before current time[%s].", + startupOffsetsTimestamp, + currentTimestamp); + + this.startupMode = StartupMode.TIMESTAMP; + this.startupOffsetsTimestamp = startupOffsetsTimestamp; + this.specificStartupOffsets = null; + return this; + } + + /** + * Specifies the consumer to start reading from any committed group offsets found in Zookeeper / + * Kafka brokers. The "group.id" property must be set in the configuration properties. If no + * offset can be found for a partition, the behaviour in "auto.offset.reset" set in the + * configuration properties will be used for the partition. + * + * <p>This method does not affect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, + * only the offsets in the restored state will be used. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() { + this.startupMode = StartupMode.GROUP_OFFSETS; + this.startupOffsetsTimestamp = null; + this.specificStartupOffsets = null; + return this; + } + + /** + * Specifies the consumer to start reading partitions from specific offsets, set independently + * for each partition. The specified offset should be the offset of the next record that will be + * read from partitions. This lets the consumer ignore any committed group offsets in Zookeeper + * / Kafka brokers. + * + * <p>If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not + * subscribed by the consumer, the entry will be ignored. If the consumer subscribes to a + * partition that does not exist in the provided map of offsets, the consumer will fallback to + * the default group offset behaviour (see {@link + * FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition. + * + * <p>If the specified offset for a partition is invalid, or the behaviour for that partition is + * defaulted to group offsets but still no group offset could be found for it, then the + * "auto.offset.reset" behaviour set in the configuration properties will be used for the + * partition + * + * <p>This method does not affect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, + * only the offsets in the restored state will be used. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets( + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + this.startupMode = StartupMode.SPECIFIC_OFFSETS; + this.startupOffsetsTimestamp = null; + this.specificStartupOffsets = checkNotNull(specificStartupOffsets); + return this; + } + + /** + * By default, when restoring from a checkpoint / savepoint, the consumer always ignores + * restored partitions that are no longer associated with the current specified topics or topic + * pattern to subscribe to. + * + * <p>This method configures the consumer to not filter the restored partitions, therefore + * always attempting to consume whatever partition was present in the previous execution + * regardless of the specified topics to subscribe to in the current execution. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithSubscribedTopics() { + this.filterRestoredPartitionsWithCurrentTopicsDescriptor = false; + return this; + } + + // ------------------------------------------------------------------------ + // Work methods + // ------------------------------------------------------------------------ + + @Override + public void open(Configuration configuration) throws Exception { + // determine the offset commit mode + this.offsetCommitMode = + OffsetCommitModes.fromConfiguration( + getIsAutoCommitEnabled(), + enableCommitOnCheckpoints, + ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()); + + // create the partition discoverer + this.partitionDiscoverer = + createPartitionDiscoverer( + topicsDescriptor, + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks()); + this.partitionDiscoverer.open(); + + subscribedPartitionsToStartOffsets = new HashMap<>(); + final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions(); + if (restoredState != null) { + for (KafkaTopicPartition partition : allPartitions) { + if (!restoredState.containsKey(partition)) { + restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); + } + } + + for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : + restoredState.entrySet()) { + // seed the partition discoverer with the union state while filtering out + // restored partitions that should not be subscribed by this subtask + if (KafkaTopicPartitionAssigner.assign( + restoredStateEntry.getKey(), + getRuntimeContext().getNumberOfParallelSubtasks()) + == getRuntimeContext().getIndexOfThisSubtask()) { + subscribedPartitionsToStartOffsets.put( + restoredStateEntry.getKey(), restoredStateEntry.getValue()); + } + } + + if (filterRestoredPartitionsWithCurrentTopicsDescriptor) { + subscribedPartitionsToStartOffsets + .entrySet() + .removeIf( + entry -> { + if (!topicsDescriptor.isMatchingTopic( + entry.getKey().getTopic())) { + LOG.warn( + "{} is removed from subscribed partitions since it is no longer " + + "associated with topics descriptor of current execution.", + entry.getKey()); + return true; + } + return false; + }); + } + + LOG.info( + "Consumer subtask {} will start reading {} partitions with offsets in restored state: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + subscribedPartitionsToStartOffsets); + } else { + // use the partition discoverer to fetch the initial seed partitions, + // and set their initial offsets depending on the startup mode. + // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now; + // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily + // determined + // when the partition is actually read. + switch (startupMode) { + case SPECIFIC_OFFSETS: + if (specificStartupOffsets == null) { + throw new IllegalStateException( + "Startup mode for the consumer set to " + + StartupMode.SPECIFIC_OFFSETS + + ", but no specific offsets were specified."); + } + + for (KafkaTopicPartition seedPartition : allPartitions) { + Long specificOffset = specificStartupOffsets.get(seedPartition); + if (specificOffset != null) { + // since the specified offsets represent the next record to read, we + // subtract + // it by one so that the initial state of the consumer will be correct + subscribedPartitionsToStartOffsets.put( + seedPartition, specificOffset - 1); + } else { + // default to group offset behaviour if the user-provided specific + // offsets + // do not contain a value for this partition + subscribedPartitionsToStartOffsets.put( + seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + } + } + + break; + case TIMESTAMP: + if (startupOffsetsTimestamp == null) { + throw new IllegalStateException( + "Startup mode for the consumer set to " + + StartupMode.TIMESTAMP + + ", but no startup timestamp was specified."); + } + + for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset : + fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp) + .entrySet()) { + subscribedPartitionsToStartOffsets.put( + partitionToOffset.getKey(), + (partitionToOffset.getValue() == null) + // if an offset cannot be retrieved for a partition with the + // given timestamp, + // we default to using the latest offset for the partition + ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET + // since the specified offsets represent the next record to + // read, we subtract + // it by one so that the initial state of the consumer will + // be correct + : partitionToOffset.getValue() - 1); + } + + break; + default: + for (KafkaTopicPartition seedPartition : allPartitions) { + subscribedPartitionsToStartOffsets.put( + seedPartition, startupMode.getStateSentinel()); + } + } + + if (!subscribedPartitionsToStartOffsets.isEmpty()) { + switch (startupMode) { + case EARLIEST: + LOG.info( + "Consumer subtask {} will start reading the following {} partitions from the earliest" + + " offsets: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + subscribedPartitionsToStartOffsets.keySet()); + break; + case LATEST: + LOG.info( + "Consumer subtask {} will start reading the following {} partitions from the latest " + + "offsets: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + subscribedPartitionsToStartOffsets.keySet()); + break; + case TIMESTAMP: + LOG.info( + "Consumer subtask {} will start reading the following {} partitions from timestamp " + + "{}: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + startupOffsetsTimestamp, + subscribedPartitionsToStartOffsets.keySet()); + break; + case SPECIFIC_OFFSETS: + LOG.info( + "Consumer subtask {} will start reading the following {} partitions from the " + + "specified startup offsets {}: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + specificStartupOffsets, + subscribedPartitionsToStartOffsets.keySet()); + + List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = + new ArrayList<>(subscribedPartitionsToStartOffsets.size()); + for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : + subscribedPartitionsToStartOffsets.entrySet()) { + if (subscribedPartition.getValue() + == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { + partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey()); + } + } + + if (partitionsDefaultedToGroupOffsets.size() > 0) { + LOG.warn( + "Consumer subtask {} cannot find offsets for the following {} partitions in the " + + "specified startup offsets: {}" + + "; their startup offsets will be defaulted to their committed group " + + "offsets in Kafka.", + getRuntimeContext().getIndexOfThisSubtask(), + partitionsDefaultedToGroupOffsets.size(), + partitionsDefaultedToGroupOffsets); + } + break; + case GROUP_OFFSETS: + LOG.info( + "Consumer subtask {} will start reading the following {} partitions from the " + + "committed group offsets in Kafka: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + subscribedPartitionsToStartOffsets.keySet()); + } + } else { + LOG.info( + "Consumer subtask {} initially has no partitions to read from.", + getRuntimeContext().getIndexOfThisSubtask()); + } + } + + this.deserializer.open( + RuntimeContextInitializationContextAdapters.deserializationAdapter( + getRuntimeContext(), metricGroup -> metricGroup.addGroup("user"))); + } + + @Override + public void run(SourceContext<T> sourceContext) throws Exception { + + if (StringUtils.isNotEmpty(this.inlongMetric)) { + String[] inlongMetricArray = inlongMetric.split(DELIMITER); + String groupId = inlongMetricArray[0]; + String streamId = inlongMetricArray[1]; + String nodeId = inlongMetricArray[2]; + AuditImp auditImp = null; + if (inlongAudit != null) { + AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER)))); + auditImp = AuditImp.getInstance(); + } + sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(), + auditImp); + ThreadSafeCounter recordsInCounter = new ThreadSafeCounter(); + ThreadSafeCounter bytesInCounter = new ThreadSafeCounter(); + if (metricState != null) { + recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN)); + bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN)); + } + sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter); + sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter); + sourceMetricData.registerMetricsForNumRecordsInForMeter(new ThreadSafeCounter()); + sourceMetricData.registerMetricsForNumBytesInForMeter(new ThreadSafeCounter()); + sourceMetricData.registerMetricsForNumBytesInPerSecond(); + sourceMetricData.registerMetricsForNumRecordsInPerSecond(); + if (this.deserializer instanceof DynamicKafkaDeserializationSchema) { + DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema = + (DynamicKafkaDeserializationSchema) deserializer; + dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData); + } + } + + if (subscribedPartitionsToStartOffsets == null) { + throw new Exception("The partitions were not set for the consumer"); + } + + // initialize commit metrics and default offset callback method + this.successfulCommits = + this.getRuntimeContext() + .getMetricGroup() + .counter(COMMITS_SUCCEEDED_METRICS_COUNTER); + this.failedCommits = + this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER); + final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask(); + + this.offsetCommitCallback = + new KafkaCommitCallback() { + @Override + public void onSuccess() { + successfulCommits.inc(); + } + + @Override + public void onException(Throwable cause) { + LOG.warn( + String.format( + "Consumer subtask %d failed async Kafka commit.", + subtaskIndex), + cause); + failedCommits.inc(); + } + }; + + // mark the subtask as temporarily idle if there are no initial seed partitions; + // once this subtask discovers some partitions and starts collecting records, the subtask's + // status will automatically be triggered back to be active. + if (subscribedPartitionsToStartOffsets.isEmpty()) { + sourceContext.markAsTemporarilyIdle(); + } + + LOG.info( + "Consumer subtask {} creating fetcher with offsets {}.", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets); + // from this point forward: + // - 'snapshotState' will draw offsets from the fetcher, + // instead of being built from `subscribedPartitionsToStartOffsets` + // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to + // Kafka through the fetcher, if configured to do so) + this.kafkaFetcher = + createFetcher( + sourceContext, + subscribedPartitionsToStartOffsets, + watermarkStrategy, + (StreamingRuntimeContext) getRuntimeContext(), + offsetCommitMode, + getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), + useMetrics); + + if (!running) { + return; + } + + // depending on whether we were restored with the current state version (1.3), + // remaining logic branches off into 2 paths: + // 1) New state - partition discovery loop executed as separate thread, with this + // thread running the main fetcher loop + // 2) Old state - partition discovery is disabled and only the main fetcher loop is + // executed + if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { + kafkaFetcher.runFetchLoop(); + } else { + runWithPartitionDiscovery(); + } + } + + private void runWithPartitionDiscovery() throws Exception { + final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>(); + createAndStartDiscoveryLoop(discoveryLoopErrorRef); + + kafkaFetcher.runFetchLoop(); + + // make sure that the partition discoverer is waked up so that + // the discoveryLoopThread exits + partitionDiscoverer.wakeup(); + joinDiscoveryLoopThread(); + + // rethrow any fetcher errors + final Exception discoveryLoopError = discoveryLoopErrorRef.get(); + if (discoveryLoopError != null) { + throw new RuntimeException(discoveryLoopError); + } + } + + @VisibleForTesting + void joinDiscoveryLoopThread() throws InterruptedException { + if (discoveryLoopThread != null) { + discoveryLoopThread.join(); + } + } + + private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) { + discoveryLoopThread = + new Thread( + () -> { + try { + // --------------------- partition discovery loop + // --------------------- + + // throughout the loop, we always eagerly check if we are still + // running before + // performing the next operation, so that we can escape the loop as + // soon as possible + + while (running) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Consumer subtask {} is trying to discover new partitions ...", + getRuntimeContext().getIndexOfThisSubtask()); + } + + final List<KafkaTopicPartition> discoveredPartitions; + try { + discoveredPartitions = + partitionDiscoverer.discoverPartitions(); + } catch (AbstractPartitionDiscoverer.WakeupException + | AbstractPartitionDiscoverer.ClosedException e) { + // the partition discoverer may have been closed or woken up + // before or during the discovery; + // this would only happen if the consumer was canceled; + // simply escape the loop + break; + } + + // no need to add the discovered partitions if we were closed + // during the meantime + if (running && !discoveredPartitions.isEmpty()) { + kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); + } + + // do not waste any time sleeping if we're not running anymore + if (running && discoveryIntervalMillis != 0) { + try { + Thread.sleep(discoveryIntervalMillis); + } catch (InterruptedException iex) { + // may be interrupted if the consumer was canceled + // midway; simply escape the loop + break; + } + } + } + } catch (Exception e) { + discoveryLoopErrorRef.set(e); + } finally { + // calling cancel will also let the fetcher loop escape + // (if not running, cancel() was already called) + if (running) { + cancel(); + } + } + }, + "Kafka Partition Discovery for " + + getRuntimeContext().getTaskNameWithSubtasks()); + + discoveryLoopThread.start(); + } + + @Override + public void cancel() { + // set ourselves as not running; + // this would let the main discovery loop escape as soon as possible + running = false; + + if (discoveryLoopThread != null) { + + if (partitionDiscoverer != null) { + // we cannot close the discoverer here, as it is error-prone to concurrent access; + // only wakeup the discoverer, the discovery loop will clean itself up after it + // escapes + partitionDiscoverer.wakeup(); + } + + // the discovery loop may currently be sleeping in-between + // consecutive discoveries; interrupt to shutdown faster + discoveryLoopThread.interrupt(); + } + + // abort the fetcher, if there is one + if (kafkaFetcher != null) { + kafkaFetcher.cancel(); + } + } + + @Override + public void close() throws Exception { + cancel(); + + joinDiscoveryLoopThread(); + + Exception exception = null; + if (partitionDiscoverer != null) { + try { + partitionDiscoverer.close(); + } catch (Exception e) { + exception = e; + } + } + + try { + super.close(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw exception; + } + } + + // ------------------------------------------------------------------------ + // Checkpoint and restore + // ------------------------------------------------------------------------ + + @Override + public final void initializeState(FunctionInitializationContext context) throws Exception { + + OperatorStateStore stateStore = context.getOperatorStateStore(); + + this.unionOffsetStates = + stateStore.getUnionListState( + new ListStateDescriptor<>( + OFFSETS_STATE_NAME, + createStateSerializer(getRuntimeContext().getExecutionConfig()))); + + if (this.inlongMetric != null) { + this.metricStateListState = + stateStore.getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + + if (context.isRestored()) { + restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator()); + // populate actual holder for restored state + for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) { + restoredState.put(kafkaOffset.f0, kafkaOffset.f1); + } + + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + + LOG.info( + "Consumer subtask {} restored state: {}.", + getRuntimeContext().getIndexOfThisSubtask(), + restoredState); + } else { + LOG.info( + "Consumer subtask {} has no restore state.", + getRuntimeContext().getIndexOfThisSubtask()); + } + } + + @Override + public final void snapshotState(FunctionSnapshotContext context) throws Exception { + if (!running) { + LOG.debug("snapshotState() called on closed source"); + } else { + unionOffsetStates.clear(); + + final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; + if (fetcher == null) { + // the fetcher has not yet been initialized, which means we need to return the + // originally restored offsets or the assigned partitions + for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : + subscribedPartitionsToStartOffsets.entrySet()) { + unionOffsetStates.add( + Tuple2.of( + subscribedPartition.getKey(), subscribedPartition.getValue())); + } + + if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { + // the map cannot be asynchronously updated, because only one checkpoint call + // can happen + // on this function at a time: either snapshotState() or + // notifyCheckpointComplete() + pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState); + } + } else { + HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); + + if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { + // the map cannot be asynchronously updated, because only one checkpoint call + // can happen + // on this function at a time: either snapshotState() or + // notifyCheckpointComplete() + pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); + } + + for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : + currentOffsets.entrySet()) { + unionOffsetStates.add( + Tuple2.of( + kafkaTopicPartitionLongEntry.getKey(), + kafkaTopicPartitionLongEntry.getValue())); + } + } + + if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { + // truncate the map of pending offsets to commit, to prevent infinite growth + while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { + pendingOffsetsToCommit.remove(0); + } + } + if (sourceMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData, + getRuntimeContext().getIndexOfThisSubtask()); + } + } + } + + @Override + public final void notifyCheckpointComplete(long checkpointId) throws Exception { + if (!running) { + LOG.debug("notifyCheckpointComplete() called on closed source"); + return; + } + + final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; + if (fetcher == null) { + LOG.debug("notifyCheckpointComplete() called on uninitialized source"); + return; + } + + if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { + // only one commit operation must be in progress + if (LOG.isDebugEnabled()) { + LOG.debug( + "Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.", + getRuntimeContext().getIndexOfThisSubtask(), + checkpointId); + } + + try { + final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); + if (posInMap == -1) { + LOG.warn( + "Consumer subtask {} received confirmation for unknown checkpoint id {}", + getRuntimeContext().getIndexOfThisSubtask(), + checkpointId); + return; + } + + @SuppressWarnings("unchecked") + Map<KafkaTopicPartition, Long> offsets = + (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap); + + // remove older checkpoints in map + for (int i = 0; i < posInMap; i++) { + pendingOffsetsToCommit.remove(0); + } + + if (offsets == null || offsets.size() == 0) { + LOG.debug( + "Consumer subtask {} has empty checkpoint state.", + getRuntimeContext().getIndexOfThisSubtask()); + return; + } + + fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback); + } catch (Exception e) { + if (running) { + throw e; + } + // else ignore exception if we are no longer running + } + } + } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + } + + // ------------------------------------------------------------------------ + // Kafka Consumer specific methods + // ------------------------------------------------------------------------ + + /** + * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the data, and + * emits it into the data streams. + * + * @param sourceContext The source context to emit data to. + * @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should + * handle, with their start offsets. + * @param watermarkStrategy Optional, a serialized WatermarkStrategy. + * @param runtimeContext The task's runtime context. + * @return The instantiated fetcher + * @throws Exception The method should forward exceptions + */ + protected abstract AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets, + SerializedValue<WatermarkStrategy<T>> watermarkStrategy, + StreamingRuntimeContext runtimeContext, + OffsetCommitMode offsetCommitMode, + MetricGroup kafkaMetricGroup, + boolean useMetrics) + throws Exception; + + /** + * Creates the partition discoverer that is used to find new partitions for this subtask. + * + * @param topicsDescriptor Descriptor that describes whether we are discovering partitions for + * fixed topics or a topic pattern. + * @param indexOfThisSubtask The index of this consumer subtask. + * @param numParallelSubtasks The total number of parallel consumer subtasks. + * @return The instantiated partition discoverer + */ + protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer( + KafkaTopicsDescriptor topicsDescriptor, + int indexOfThisSubtask, + int numParallelSubtasks); + + protected abstract boolean getIsAutoCommitEnabled(); + + protected abstract Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp( + Collection<KafkaTopicPartition> partitions, long timestamp); + + // ------------------------------------------------------------------------ + // ResultTypeQueryable methods + // ------------------------------------------------------------------------ + + @Override + public TypeInformation<T> getProducedType() { + return deserializer.getProducedType(); + } + + // ------------------------------------------------------------------------ + // Test utilities + // ------------------------------------------------------------------------ + + @VisibleForTesting + Map<KafkaTopicPartition, Long> getSubscribedPartitionsToStartOffsets() { + return subscribedPartitionsToStartOffsets; + } + + @VisibleForTesting + TreeMap<KafkaTopicPartition, Long> getRestoredState() { + return restoredState; + } + + @VisibleForTesting + OffsetCommitMode getOffsetCommitMode() { + return offsetCommitMode; + } + + @VisibleForTesting + LinkedMap getPendingOffsetsToCommit() { + return pendingOffsetsToCommit; + } + + @VisibleForTesting + public boolean getEnableCommitOnCheckpoints() { + return enableCommitOnCheckpoints; + } + + /** + * Creates state serializer for kafka topic partition to offset tuple. Using of the explicit + * state serializer with KryoSerializer is needed because otherwise users cannot use + * 'disableGenericTypes' properties with KafkaConsumer. + */ + @VisibleForTesting + static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateSerializer( + ExecutionConfig executionConfig) { + // explicit serializer will keep the compatibility with GenericTypeInformation and allow to + // disableGenericTypes for users + TypeSerializer<?>[] fieldSerializers = + new TypeSerializer<?>[]{ + new KryoSerializer<>(KafkaTopicPartition.class, executionConfig), + LongSerializer.INSTANCE + }; + @SuppressWarnings("unchecked") + Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = + (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class; + return new TupleSerializer<>(tupleClass, fieldSerializers); + } +} diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java index b2efd2c3e..3f0902c0c 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; @@ -57,9 +58,10 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TemporaryClassLoaderContext; import org.apache.inlong.audit.AuditImp; -import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; import org.apache.inlong.sort.base.metric.ThreadSafeCounter; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -78,7 +80,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -93,9 +94,13 @@ import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; + import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; import static org.apache.inlong.sort.base.Constants.DELIMITER; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; /** * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5 @@ -256,6 +261,10 @@ public class FlinkKafkaProducer<IN> private SinkMetricData metricData; private Long dataSize = 0L; private Long rowSize = 0L; + + private transient ListState<MetricState> metricStateListState; + + private MetricState metricState; /** * State for nextTransactionalIdHint. */ @@ -910,27 +919,27 @@ public class FlinkKafkaProducer<IN> inlongGroupId = inlongMetricArray[0]; inlongStreamId = inlongMetricArray[1]; String nodeId = inlongMetricArray[2]; - metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup()); + metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup(), + auditHostAndPorts); metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter()); metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter()); metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter()); metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter()); + metricData.registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter()); + metricData.registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter()); metricData.registerMetricsForNumBytesOutPerSecond(); metricData.registerMetricsForNumRecordsOutPerSecond(); } - - if (auditHostAndPorts != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER)))); - auditImp = AuditImp.getInstance(); + if (metricState != null && metricData != null) { + metricData.getNumBytesOut().inc(metricState.getMetricValue(NUM_BYTES_OUT)); + metricData.getNumRecordsOut().inc(metricState.getMetricValue(NUM_RECORDS_OUT)); } - super.open(configuration); } private void sendOutMetrics(Long rowSize, Long dataSize) { if (metricData != null) { - metricData.getNumRecordsOut().inc(rowSize); - metricData.getNumBytesOut().inc(dataSize); + metricData.invoke(rowSize, dataSize); } } @@ -941,23 +950,6 @@ public class FlinkKafkaProducer<IN> } } - private void outputMetricForAudit(ProducerRecord<byte[], byte[]> record) { - if (auditImp != null) { - auditImp.add( - Constants.AUDIT_SORT_OUTPUT, - inlongGroupId, - inlongStreamId, - System.currentTimeMillis(), - 1, - record.value().length); - } - } - - private void resetMetricSize() { - dataSize = 0L; - rowSize = 0L; - } - // ------------------- Logic for handling checkpoint flushing -------------------------- // @Override @@ -965,7 +957,6 @@ public class FlinkKafkaProducer<IN> FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException { checkErroneous(); - resetMetricSize(); ProducerRecord<byte[], byte[]> record; if (keyedSchema != null) { @@ -1029,10 +1020,7 @@ public class FlinkKafkaProducer<IN> + "is a bug."); } - rowSize++; - dataSize = dataSize + record.value().length; - sendOutMetrics(rowSize, dataSize); - outputMetricForAudit(record); + sendOutMetrics(1L, (long) record.value().length); pendingRecords.incrementAndGet(); transaction.producer.send(record, callback); @@ -1247,6 +1235,10 @@ public class FlinkKafkaProducer<IN> getRuntimeContext().getNumberOfParallelSubtasks(), nextFreeTransactionalId)); } + if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, + getRuntimeContext().getIndexOfThisSubtask()); + } } @Override @@ -1260,6 +1252,14 @@ public class FlinkKafkaProducer<IN> semantic = FlinkKafkaProducer.Semantic.NONE; } + if (this.inlongMetric != null) { + this.metricStateListState = + context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + nextTransactionalIdHintState = context.getOperatorStateStore() .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2); @@ -1313,6 +1313,11 @@ public class FlinkKafkaProducer<IN> } } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + super.initializeState(context); } diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java index 17e92abda..c6b5c11a9 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java @@ -28,23 +28,18 @@ import org.apache.flink.types.DeserializationException; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; -import org.apache.inlong.audit.AuditImp; -import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.kafka.clients.consumer.ConsumerRecord; import javax.annotation.Nullable; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import static org.apache.inlong.sort.base.Constants.DELIMITER; /** * deserialization schema for {@link KafkaDynamicSource}. */ -class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> { +public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> { private static final long serialVersionUID = 1L; @@ -63,18 +58,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro private final boolean upsertMode; - private final String inlongMetric; - private SourceMetricData metricData; - private String inlongGroupId; - - private String auditHostAndPorts; - - private String inlongStreamId; - - private transient AuditImp auditImp; - DynamicKafkaDeserializationSchema( int physicalArity, @Nullable DeserializationSchema<RowData> keyDeserialization, @@ -84,9 +69,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro boolean hasMetadata, MetadataConverter[] metadataConverters, TypeInformation<RowData> producedTypeInfo, - boolean upsertMode, - String inlongMetric, - String auditHostAndPorts) { + boolean upsertMode) { if (upsertMode) { Preconditions.checkArgument( keyDeserialization != null && keyProjection.length > 0, @@ -105,9 +88,10 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro upsertMode); this.producedTypeInfo = producedTypeInfo; this.upsertMode = upsertMode; - this.inlongMetric = inlongMetric; - this.auditHostAndPorts = auditHostAndPorts; + } + public void setMetricData(SourceMetricData metricData) { + this.metricData = metricData; } @Override @@ -116,21 +100,6 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro keyDeserialization.open(context); } valueDeserialization.open(context); - if (inlongMetric != null && !inlongMetric.isEmpty()) { - String[] inlongMetricArray = inlongMetric.split(DELIMITER); - inlongGroupId = inlongMetricArray[0]; - inlongStreamId = inlongMetricArray[1]; - String nodeId = inlongMetricArray[2]; - metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup()); - metricData.registerMetricsForNumBytesIn(); - metricData.registerMetricsForNumBytesInPerSecond(); - metricData.registerMetricsForNumRecordsIn(); - metricData.registerMetricsForNumRecordsInPerSecond(); - } - if (auditHostAndPorts != null) { - AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER)))); - auditImp = AuditImp.getInstance(); - } } @Override @@ -178,26 +147,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro } private void outputMetrics(ConsumerRecord<byte[], byte[]> record) { - outputMetricForFlink(record); - outputMetricForAudit(record); - } - - private void outputMetricForAudit(ConsumerRecord<byte[], byte[]> record) { - if (auditImp != null) { - auditImp.add( - Constants.AUDIT_SORT_INPUT, - inlongGroupId, - inlongStreamId, - System.currentTimeMillis(), - 1, - record.value().length); - } - } - - private void outputMetricForFlink(ConsumerRecord<byte[], byte[]> record) { if (metricData != null) { - metricData.getNumBytesIn().inc(record.value().length); - metricData.getNumRecordsIn().inc(1); + metricData.outputMetrics(1, record.value().length); } } diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java index f3580a8f1..af784aad4 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; @@ -41,13 +40,12 @@ import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.util.Preconditions; - +import org.apache.inlong.sort.kafka.FlinkKafkaConsumer; import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -74,14 +72,21 @@ public class KafkaDynamicSource // Mutable attributes // -------------------------------------------------------------------------------------------- - /** Data type that describes the final output of the source. */ + /** + * Data type that describes the final output of the source. + */ protected DataType producedDataType; - /** Metadata that is appended at the end of a physical source row. */ + /** + * Metadata that is appended at the end of a physical source row. + */ protected List<String> metadataKeys; - /** Watermark strategy that is used to generate per-partition watermark. */ - protected @Nullable WatermarkStrategy<RowData> watermarkStrategy; + /** + * Watermark strategy that is used to generate per-partition watermark. + */ + protected @Nullable + WatermarkStrategy<RowData> watermarkStrategy; // -------------------------------------------------------------------------------------------- // Format attributes @@ -89,35 +94,55 @@ public class KafkaDynamicSource private static final String VALUE_METADATA_PREFIX = "value."; - /** Data type to configure the formats. */ + /** + * Data type to configure the formats. + */ protected final DataType physicalDataType; - /** Optional format for decoding keys from Kafka. */ - protected final @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat; + /** + * Optional format for decoding keys from Kafka. + */ + protected final @Nullable + DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat; - /** Format for decoding values from Kafka. */ + /** + * Format for decoding values from Kafka. + */ protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat; - /** Indices that determine the key fields and the target position in the produced row. */ + /** + * Indices that determine the key fields and the target position in the produced row. + */ protected final int[] keyProjection; - /** Indices that determine the value fields and the target position in the produced row. */ + /** + * Indices that determine the value fields and the target position in the produced row. + */ protected final int[] valueProjection; - /** Prefix that needs to be removed from fields when constructing the physical data type. */ - protected final @Nullable String keyPrefix; + /** + * Prefix that needs to be removed from fields when constructing the physical data type. + */ + protected final @Nullable + String keyPrefix; // -------------------------------------------------------------------------------------------- // Kafka-specific attributes // -------------------------------------------------------------------------------------------- - /** The Kafka topics to consume. */ + /** + * The Kafka topics to consume. + */ protected final List<String> topics; - /** The Kafka topic pattern to consume. */ + /** + * The Kafka topic pattern to consume. + */ protected final Pattern topicPattern; - /** Properties for the Kafka consumer. */ + /** + * Properties for the Kafka consumer. + */ protected final Properties properties; /** @@ -137,7 +162,9 @@ public class KafkaDynamicSource */ protected final long startupTimestampMillis; - /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */ + /** + * Flag to determine source mode. In upsert mode, it will keep the tombstone message. * + */ protected final boolean upsertMode; protected final String inlongMetric; @@ -214,7 +241,7 @@ public class KafkaDynamicSource final FlinkKafkaConsumer<RowData> kafkaConsumer = createKafkaConsumer(keyDeserialization, valueDeserialization, - producedTypeInfo, inlongMetric, auditHostAndPorts); + producedTypeInfo, inlongMetric, auditHostAndPorts); return SourceFunctionProvider.of(kafkaConsumer, false); } @@ -350,8 +377,8 @@ public class KafkaDynamicSource DeserializationSchema<RowData> keyDeserialization, DeserializationSchema<RowData> valueDeserialization, TypeInformation<RowData> producedTypeInfo, - String inlongMetric, - String auditHostAndPorts) { + String inlongMetric, + String auditHostAndPorts) { final MetadataConverter[] metadataConverters = metadataKeys.stream() @@ -390,13 +417,15 @@ public class KafkaDynamicSource hasMetadata, metadataConverters, producedTypeInfo, - upsertMode, inlongMetric, auditHostAndPorts); + upsertMode); final FlinkKafkaConsumer<RowData> kafkaConsumer; if (topics != null) { - kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties); + kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties, inlongMetric, + auditHostAndPorts); } else { - kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties); + kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties, inlongMetric, + auditHostAndPorts); } switch (startupMode) { @@ -425,7 +454,8 @@ public class KafkaDynamicSource return kafkaConsumer; } - private @Nullable DeserializationSchema<RowData> createDeserialization( + private @Nullable + DeserializationSchema<RowData> createDeserialization( DynamicTableSource.Context context, @Nullable DecodingFormat<DeserializationSchema<RowData>> format, int[] projection, diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java index a7eebdbcd..e7084d2fa 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java @@ -429,6 +429,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp); sourceMetricData.registerMetricsForNumRecordsIn(); sourceMetricData.registerMetricsForNumBytesIn(); + sourceMetricData.registerMetricsForNumBytesInForMeter(); + sourceMetricData.registerMetricsForNumRecordsInForMeter(); sourceMetricData.registerMetricsForNumBytesInPerSecond(); sourceMetricData.registerMetricsForNumRecordsInPerSecond(); } diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 5e0bb25d3..1e8b13780 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -570,6 +570,18 @@ Source : flink-table-runtime-blink_2.11-13.2-rc2 2.2.1 (Please note that the software have been modified.) License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE + 1.3.11 inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java + inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java + inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java + inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java + inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java + inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java + inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java + inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java + inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java + Source : org.apache.flink:flink-connector-kafka_2.11:1.13.5 (Please note that the software have been modified.) + License : https://github.com/apache/flink/blob/master/LICENSE + ======================================================================= Apache InLong Subcomponents: