This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 57985782d0 [INLONG-10317][Sort] Make Kafka source support report audit information exactly once (#10550) 57985782d0 is described below commit 57985782d0a074ad869d8061c3e870b6b18d3928 Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Wed Jul 3 10:10:55 2024 +0800 [INLONG-10317][Sort] Make Kafka source support report audit information exactly once (#10550) --- .../sort-flink-v1.15/sort-connectors/kafka/pom.xml | 1 + .../inlong/sort/kafka/source/KafkaSource.java | 239 +++++++++ .../sort/kafka/source/KafkaSourceBuilder.java | 534 +++++++++++++++++++++ .../kafka/source/reader/KafkaSourceReader.java | 217 +++++++++ .../table/DynamicKafkaDeserializationSchema.java | 81 ++-- .../sort/kafka/table/KafkaDynamicSource.java | 11 +- licenses/inlong-sort-connectors/LICENSE | 3 + 7 files changed, 1047 insertions(+), 39 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml index 019b0f456c..2ecb22476e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml @@ -84,6 +84,7 @@ <include>org.apache.httpcomponents:*</include> <include>org.apache.commons:commons-lang3</include> <include>com.google.protobuf:*</include> + <include>com.google.guava:*</include> <include>joda-time:*</include> <include>com.fasterxml.jackson.core:*</include> <include>com.amazonaws:*</include> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java new file mode 100644 index 0000000000..5004ec34a3 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kafka.source; + +import org.apache.inlong.sort.kafka.source.reader.KafkaSourceReader; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; +import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer; +import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; +import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics; +import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader; +import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager; +import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; +import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.UserCodeClassLoader; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collection; +import java.util.Properties; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder} to construct a {@link + * KafkaSource}. The following example shows how to create a KafkaSource emitting records of <code> + * String</code> type. + * + * <pre>{@code + * KafkaSource<String> source = KafkaSource + * .<String>builder() + * .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) + * .setGroupId("MyGroup") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializer(new TestingKafkaRecordDeserializationSchema()) + * .setStartingOffsets(OffsetsInitializer.earliest()) + * .build(); + * }</pre> + * + * <p>See {@link KafkaSourceBuilder} for more details. + * + * @param <OUT> the output type of the source. + * Copy from org.apache.flink:flink-connector-kafka:1.15.4 + * Add a variable metricSchema to report audit information + */ +@PublicEvolving +public class KafkaSource<OUT> + implements + Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>, + ResultTypeQueryable<OUT> { + + private static final long serialVersionUID = -8755372893283732098L; + // Users can choose only one of the following ways to specify the topics to consume from. + private final KafkaSubscriber subscriber; + // Users can specify the starting / stopping offset initializer. + private final OffsetsInitializer startingOffsetsInitializer; + private final OffsetsInitializer stoppingOffsetsInitializer; + // Boundedness + private final Boundedness boundedness; + private final KafkaRecordDeserializationSchema<OUT> deserializationSchema; + private final KafkaDeserializationSchema<RowData> metricSchema; + // The configurations. + private final Properties props; + + KafkaSource( + KafkaSubscriber subscriber, + OffsetsInitializer startingOffsetsInitializer, + @Nullable OffsetsInitializer stoppingOffsetsInitializer, + Boundedness boundedness, + KafkaRecordDeserializationSchema<OUT> deserializationSchema, + KafkaDeserializationSchema<RowData> metricSchema, + Properties props) { + this.subscriber = subscriber; + this.startingOffsetsInitializer = startingOffsetsInitializer; + this.stoppingOffsetsInitializer = stoppingOffsetsInitializer; + this.boundedness = boundedness; + this.deserializationSchema = deserializationSchema; + this.metricSchema = metricSchema; + this.props = props; + } + + /** + * Get a kafkaSourceBuilder to build a {@link KafkaSource}. + * + * @return a Kafka source builder. + */ + public static <OUT> KafkaSourceBuilder<OUT> builder() { + return new KafkaSourceBuilder<>(); + } + + @Override + public Boundedness getBoundedness() { + return this.boundedness; + } + + @Internal + @Override + public SourceReader<OUT, KafkaPartitionSplit> createReader(SourceReaderContext readerContext) + throws Exception { + return createReader(readerContext, (ignore) -> { + }); + } + + @VisibleForTesting + SourceReader<OUT, KafkaPartitionSplit> createReader( + SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook) + throws Exception { + FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + deserializationSchema.open( + new DeserializationSchema.InitializationContext() { + + @Override + public MetricGroup getMetricGroup() { + return readerContext.metricGroup().addGroup("deserializer"); + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + return readerContext.getUserCodeClassLoader(); + } + }); + final KafkaSourceReaderMetrics kafkaSourceReaderMetrics = + new KafkaSourceReaderMetrics(readerContext.metricGroup()); + + Supplier<KafkaPartitionSplitReader> splitReaderSupplier = + () -> new KafkaPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics); + KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema); + + return new KafkaSourceReader<>( + elementsQueue, + new KafkaSourceFetcherManager( + elementsQueue, splitReaderSupplier::get, splitFinishedHook), + recordEmitter, + toConfiguration(props), + readerContext, + kafkaSourceReaderMetrics, + metricSchema); + } + + @Internal + @Override + public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerator( + SplitEnumeratorContext<KafkaPartitionSplit> enumContext) { + return new KafkaSourceEnumerator( + subscriber, + startingOffsetsInitializer, + stoppingOffsetsInitializer, + props, + enumContext, + boundedness); + } + + @Internal + @Override + public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> restoreEnumerator( + SplitEnumeratorContext<KafkaPartitionSplit> enumContext, + KafkaSourceEnumState checkpoint) + throws IOException { + return new KafkaSourceEnumerator( + subscriber, + startingOffsetsInitializer, + stoppingOffsetsInitializer, + props, + enumContext, + boundedness, + checkpoint.assignedPartitions()); + } + + @Internal + @Override + public SimpleVersionedSerializer<KafkaPartitionSplit> getSplitSerializer() { + return new KafkaPartitionSplitSerializer(); + } + + @Internal + @Override + public SimpleVersionedSerializer<KafkaSourceEnumState> getEnumeratorCheckpointSerializer() { + return new KafkaSourceEnumStateSerializer(); + } + + @Override + public TypeInformation<OUT> getProducedType() { + return deserializationSchema.getProducedType(); + } + + // ----------- private helper methods --------------- + + private Configuration toConfiguration(Properties props) { + Configuration config = new Configuration(); + props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty(key))); + return config; + } + + @VisibleForTesting + Configuration getConfiguration() { + return toConfiguration(props); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java new file mode 100644 index 0000000000..58bb651b24 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kafka.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; +import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator; +import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.table.data.RowData; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The @builder class for {@link KafkaSource} to make it easier for the users to construct a {@link + * KafkaSource}. + * + * <p>The following example shows the minimum setup to create a KafkaSource that reads the String + * values from a Kafka topic. + * + * <pre>{@code + * KafkaSource<String> source = KafkaSource + * .<String>builder() + * .setBootstrapServers(MY_BOOTSTRAP_SERVERS) + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) + * .build(); + * }</pre> + * + * <p>The bootstrap servers, topics/partitions to consume, and the record deserializer are required + * fields that must be set. + * + * <p>To specify the starting offsets of the KafkaSource, one can call {@link + * #setStartingOffsets(OffsetsInitializer)}. + * + * <p>By default the KafkaSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never + * stops until the Flink job is canceled or fails. To let the KafkaSource run in {@link + * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link + * #setUnbounded(OffsetsInitializer)}. For example the following KafkaSource stops after it consumes + * up to the latest partition offsets at the point when the Flink started. + * + * <pre>{@code + * KafkaSource<String> source = KafkaSource + * .<String>builder() + * .setBootstrapServers(MY_BOOTSTRAP_SERVERS) + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) + * .setUnbounded(OffsetsInitializer.latest()) + * .build(); + * }</pre> + * + * <p>Check the Java docs of each individual methods to learn more about the settings to build a + * KafkaSource. + * Copy from org.apache.flink:flink-connector-kafka:1.15.4 + * Add a variable metricSchema to report audit information + */ +@PublicEvolving +public class KafkaSourceBuilder<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceBuilder.class); + private static final String[] REQUIRED_CONFIGS = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}; + // The subscriber specifies the partitions to subscribe to. + private KafkaSubscriber subscriber; + // Users can specify the starting / stopping offset initializer. + private OffsetsInitializer startingOffsetsInitializer; + private OffsetsInitializer stoppingOffsetsInitializer; + // Boundedness + private Boundedness boundedness; + private KafkaRecordDeserializationSchema<OUT> deserializationSchema; + private KafkaDeserializationSchema<RowData> metricSchema; + // The configurations. + protected Properties props; + + KafkaSourceBuilder() { + this.subscriber = null; + this.startingOffsetsInitializer = OffsetsInitializer.earliest(); + this.stoppingOffsetsInitializer = new NoStoppingOffsetsInitializer(); + this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED; + this.deserializationSchema = null; + this.metricSchema = null; + this.props = new Properties(); + } + + /** + * Sets the bootstrap servers for the KafkaConsumer of the KafkaSource. + * + * @param bootstrapServers the bootstrap servers of the Kafka cluster. + * @return this KafkaSourceBuilder. + */ + public KafkaSourceBuilder<OUT> setBootstrapServers(String bootstrapServers) { + return setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + + /** + * Sets the consumer group id of the KafkaSource. + * + * @param groupId the group id of the KafkaSource. + * @return this KafkaSourceBuilder. + */ + public KafkaSourceBuilder<OUT> setGroupId(String groupId) { + return setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + } + + /** + * Set a list of topics the KafkaSource should consume from. All the topics in the list should + * have existed in the Kafka cluster. Otherwise an exception will be thrown. To allow some of + * the topics to be created lazily, please use {@link #setTopicPattern(Pattern)} instead. + * + * @param topics the list of topics to consume from. + * @return this KafkaSourceBuilder. + * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection) + */ + public KafkaSourceBuilder<OUT> setTopics(List<String> topics) { + ensureSubscriberIsNull("topics"); + subscriber = KafkaSubscriber.getTopicListSubscriber(topics); + return this; + } + + /** + * Set a list of topics the KafkaSource should consume from. All the topics in the list should + * have existed in the Kafka cluster. Otherwise an exception will be thrown. To allow some of + * the topics to be created lazily, please use {@link #setTopicPattern(Pattern)} instead. + * + * @param topics the list of topics to consume from. + * @return this KafkaSourceBuilder. + * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection) + */ + public KafkaSourceBuilder<OUT> setTopics(String... topics) { + return setTopics(Arrays.asList(topics)); + } + + /** + * Set a topic pattern to consume from use the java {@link Pattern}. + * + * @param topicPattern the pattern of the topic name to consume from. + * @return this KafkaSourceBuilder. + * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern) + */ + public KafkaSourceBuilder<OUT> setTopicPattern(Pattern topicPattern) { + ensureSubscriberIsNull("topic pattern"); + subscriber = KafkaSubscriber.getTopicPatternSubscriber(topicPattern); + return this; + } + + /** + * Set a set of partitions to consume from. + * + * @param partitions the set of partitions to consume from. + * @return this KafkaSourceBuilder. + * @see org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection) + */ + public KafkaSourceBuilder<OUT> setPartitions(Set<TopicPartition> partitions) { + ensureSubscriberIsNull("partitions"); + subscriber = KafkaSubscriber.getPartitionSetSubscriber(partitions); + return this; + } + + /** + * Specify from which offsets the KafkaSource should start consume from by providing an {@link + * OffsetsInitializer}. + * + * <p>The following {@link OffsetsInitializer}s are commonly used and provided out of the box. + * Users can also implement their own {@link OffsetsInitializer} for custom behaviors. + * + * <ul> + * <li>{@link OffsetsInitializer#earliest()} - starting from the earliest offsets. This is + * also the default {@link OffsetsInitializer} of the KafkaSource for starting offsets. + * <li>{@link OffsetsInitializer#latest()} - starting from the latest offsets. + * <li>{@link OffsetsInitializer#committedOffsets()} - starting from the committed offsets of + * the consumer group. + * <li>{@link + * OffsetsInitializer#committedOffsets(org.apache.kafka.clients.consumer.OffsetResetStrategy)} + * - starting from the committed offsets of the consumer group. If there is no committed + * offsets, starting from the offsets specified by the {@link + * org.apache.kafka.clients.consumer.OffsetResetStrategy OffsetResetStrategy}. + * <li>{@link OffsetsInitializer#offsets(Map)} - starting from the specified offsets for each + * partition. + * <li>{@link OffsetsInitializer#timestamp(long)} - starting from the specified timestamp for + * each partition. Note that the guarantee here is that all the records in Kafka whose + * {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater than + * the given starting timestamp will be consumed. However, it is possible that some + * consumer records whose timestamp is smaller than the given starting timestamp are also + * consumed. + * </ul> + * + * @param startingOffsetsInitializer the {@link OffsetsInitializer} setting the starting offsets + * for the Source. + * @return this KafkaSourceBuilder. + */ + public KafkaSourceBuilder<OUT> setStartingOffsets( + OffsetsInitializer startingOffsetsInitializer) { + this.startingOffsetsInitializer = startingOffsetsInitializer; + return this; + } + + /** + * By default the KafkaSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner + * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run as + * a streaming source but still stops at some point, one can set an {@link OffsetsInitializer} + * to specify the stopping offsets for each partition. When all the partitions have reached + * their stopping offsets, the KafkaSource will then exit. + * + * <p>This method is different from {@link #setBounded(OffsetsInitializer)} that after setting + * the stopping offsets with this method, {@link KafkaSource#getBoundedness()} will still return + * {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will stop at the stopping offsets + * specified by the stopping offsets {@link OffsetsInitializer}. + * + * <p>The following {@link OffsetsInitializer} are commonly used and provided out of the box. + * Users can also implement their own {@link OffsetsInitializer} for custom behaviors. + * + * <ul> + * <li>{@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when + * the KafkaSource starts to run. + * <li>{@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the + * consumer group. + * <li>{@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each + * partition. + * <li>{@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each + * partition. The guarantee of setting the stopping timestamp is that no Kafka records + * whose {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater + * than the given stopping timestamp will be consumed. However, it is possible that some + * records whose timestamp is smaller than the specified stopping timestamp are not + * consumed. + * </ul> + * + * @param stoppingOffsetsInitializer The {@link OffsetsInitializer} to specify the stopping + * offset. + * @return this KafkaSourceBuilder. + * @see #setBounded(OffsetsInitializer) + */ + public KafkaSourceBuilder<OUT> setUnbounded(OffsetsInitializer stoppingOffsetsInitializer) { + this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED; + this.stoppingOffsetsInitializer = stoppingOffsetsInitializer; + return this; + } + + /** + * By default the KafkaSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner + * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run in + * {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link + * OffsetsInitializer} to specify the stopping offsets for each partition. When all the + * partitions have reached their stopping offsets, the KafkaSource will then exit. + * + * <p>This method is different from {@link #setUnbounded(OffsetsInitializer)} that after setting + * the stopping offsets with this method, {@link KafkaSource#getBoundedness()} will return + * {@link Boundedness#BOUNDED} instead of {@link Boundedness#CONTINUOUS_UNBOUNDED}. + * + * <p>The following {@link OffsetsInitializer} are commonly used and provided out of the box. + * Users can also implement their own {@link OffsetsInitializer} for custom behaviors. + * + * <ul> + * <li>{@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when + * the KafkaSource starts to run. + * <li>{@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the + * consumer group. + * <li>{@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each + * partition. + * <li>{@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each + * partition. The guarantee of setting the stopping timestamp is that no Kafka records + * whose {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater + * than the given stopping timestamp will be consumed. However, it is possible that some + * records whose timestamp is smaller than the specified stopping timestamp are not + * consumed. + * </ul> + * + * @param stoppingOffsetsInitializer the {@link OffsetsInitializer} to specify the stopping + * offsets. + * @return this KafkaSourceBuilder. + * @see #setUnbounded(OffsetsInitializer) + */ + public KafkaSourceBuilder<OUT> setBounded(OffsetsInitializer stoppingOffsetsInitializer) { + this.boundedness = Boundedness.BOUNDED; + this.stoppingOffsetsInitializer = stoppingOffsetsInitializer; + return this; + } + + /** + * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link + * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. + * + * @param recordDeserializer the deserializer for Kafka {@link + * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}. + * @return this KafkaSourceBuilder. + */ + public KafkaSourceBuilder<OUT> setDeserializer( + KafkaRecordDeserializationSchema<OUT> recordDeserializer) { + this.deserializationSchema = recordDeserializer; + return this; + } + + /** + * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link + * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. The given + * {@link DeserializationSchema} will be used to deserialize the value of ConsumerRecord. The + * other information (e.g. key) in a ConsumerRecord will be ignored. + * + * @param deserializationSchema the {@link DeserializationSchema} to use for deserialization. + * @return this KafkaSourceBuilder. + */ + public KafkaSourceBuilder<OUT> setValueOnlyDeserializer( + DeserializationSchema<OUT> deserializationSchema) { + this.deserializationSchema = + KafkaRecordDeserializationSchema.valueOnly(deserializationSchema); + return this; + } + + /** + * Sets the client id prefix of this KafkaSource. + * + * @param prefix the client id prefix to use for this KafkaSource. + * @return this KafkaSourceBuilder. + */ + public KafkaSourceBuilder<OUT> setClientIdPrefix(String prefix) { + return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix); + } + + /** + * Set an arbitrary property for the KafkaSource and KafkaConsumer. The valid keys can be found + * in {@link ConsumerConfig} and {@link KafkaSourceOptions}. + * + * <p>Note that the following keys will be overridden by the builder when the KafkaSource is + * created. + * + * <ul> + * <li><code>key.deserializer</code> is always set to {@link ByteArrayDeserializer}. + * <li><code>value.deserializer</code> is always set to {@link ByteArrayDeserializer}. + * <li><code>auto.offset.reset.strategy</code> is overridden by {@link + * OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting offsets, which is by + * default {@link OffsetsInitializer#earliest()}. + * <li><code>partition.discovery.interval.ms</code> is overridden to -1 when {@link + * #setBounded(OffsetsInitializer)} has been invoked. + * </ul> + * + * @param key the key of the property. + * @param value the value of the property. + * @return this KafkaSourceBuilder. + */ + public KafkaSourceBuilder<OUT> setProperty(String key, String value) { + props.setProperty(key, value); + return this; + } + + /** + * Set arbitrary properties for the KafkaSource and KafkaConsumer. The valid keys can be found + * in {@link ConsumerConfig} and {@link KafkaSourceOptions}. + * + * <p>Note that the following keys will be overridden by the builder when the KafkaSource is + * created. + * + * <ul> + * <li><code>key.deserializer</code> is always set to {@link ByteArrayDeserializer}. + * <li><code>value.deserializer</code> is always set to {@link ByteArrayDeserializer}. + * <li><code>auto.offset.reset.strategy</code> is overridden by {@link + * OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting offsets, which is by + * default {@link OffsetsInitializer#earliest()}. + * <li><code>partition.discovery.interval.ms</code> is overridden to -1 when {@link + * #setBounded(OffsetsInitializer)} has been invoked. + * <li><code>client.id</code> is overridden to the "client.id.prefix-RANDOM_LONG", or + * "group.id-RANDOM_LONG" if the client id prefix is not set. + * </ul> + * + * @param props the properties to set for the KafkaSource. + * @return this KafkaSourceBuilder. + */ + public KafkaSourceBuilder<OUT> setProperties(Properties props) { + this.props.putAll(props); + return this; + } + + public KafkaSourceBuilder<OUT> setMetricSchema(KafkaDeserializationSchema<RowData> metricSchema) { + this.metricSchema = metricSchema; + return this; + } + + /** + * Build the {@link KafkaSource}. + * + * @return a KafkaSource with the settings made for this builder. + */ + public KafkaSource<OUT> build() { + sanityCheck(); + parseAndSetRequiredProperties(); + return new KafkaSource<>( + subscriber, + startingOffsetsInitializer, + stoppingOffsetsInitializer, + boundedness, + deserializationSchema, + metricSchema, + props); + } + + // ------------- private helpers -------------- + + private void ensureSubscriberIsNull(String attemptingSubscribeMode) { + if (subscriber != null) { + throw new IllegalStateException( + String.format( + "Cannot use %s for consumption because a %s is already set for consumption.", + attemptingSubscribeMode, subscriber.getClass().getSimpleName())); + } + } + + private void parseAndSetRequiredProperties() { + maybeOverride( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class.getName(), + true); + maybeOverride( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + ByteArrayDeserializer.class.getName(), + true); + if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + LOG.warn( + "Offset commit on checkpoint is disabled because {} is not specified", + ConsumerConfig.GROUP_ID_CONFIG); + maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false); + } + maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); + maybeOverride( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), + true); + + // If the source is bounded, do not run periodic partition discovery. + maybeOverride( + KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), + "-1", + boundedness == Boundedness.BOUNDED); + + // If the client id prefix is not set, reuse the consumer group id as the client id prefix, + // or generate a random string if consumer group id is not specified. + maybeOverride( + KafkaSourceOptions.CLIENT_ID_PREFIX.key(), + props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) + ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) + : "KafkaSource-" + new Random().nextLong(), + false); + } + + private boolean maybeOverride(String key, String value, boolean override) { + boolean overridden = false; + String userValue = props.getProperty(key); + if (userValue != null) { + if (override) { + LOG.warn( + String.format( + "Property %s is provided but will be overridden from %s to %s", + key, userValue, value)); + props.setProperty(key, value); + overridden = true; + } + } else { + props.setProperty(key, value); + } + return overridden; + } + + private void sanityCheck() { + // Check required configs. + for (String requiredConfig : REQUIRED_CONFIGS) { + checkNotNull( + props.getProperty(requiredConfig), + String.format("Property %s is required but not provided", requiredConfig)); + } + // Check required settings. + checkNotNull( + subscriber, + "No subscribe mode is specified, " + + "should be one of topics, topic pattern and partition set."); + checkNotNull(deserializationSchema, "Deserialization schema is required but not provided."); + // Check consumer group ID + checkState( + props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || !offsetCommitEnabledManually(), + String.format( + "Property %s is required when offset commit is enabled", + ConsumerConfig.GROUP_ID_CONFIG)); + // Check offsets initializers + if (startingOffsetsInitializer instanceof OffsetsInitializerValidator) { + ((OffsetsInitializerValidator) startingOffsetsInitializer).validate(props); + } + if (stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) { + ((OffsetsInitializerValidator) stoppingOffsetsInitializer).validate(props); + } + } + + private boolean offsetCommitEnabledManually() { + boolean autoCommit = + props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) + && Boolean.parseBoolean( + props.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + boolean commitOnCheckpoint = + props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()) + && Boolean.parseBoolean( + props.getProperty( + KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())); + return autoCommit || commitOnCheckpoint; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java new file mode 100644 index 0000000000..4643887c49 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.kafka.source.reader; + +import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; +import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics; +import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager; +import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; +import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.table.data.RowData; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** The source reader for Kafka partitions. + * Copy from org.apache.flink:flink-connector-kafka:1.15.4 + * Add some method to make report audit information exactly once + * */ +@Internal +public class KafkaSourceReader<T> + extends + SingleThreadMultiplexSourceReaderBase<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplit, KafkaPartitionSplitState> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceReader.class); + // These maps need to be concurrent because it will be accessed by both the main thread + // and the split fetcher thread in the callback. + private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit; + private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits; + private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics; + private final boolean commitOffsetsOnCheckpoint; + private final KafkaDeserializationSchema<RowData> metricSchema; + + public KafkaSourceReader( + FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue, + KafkaSourceFetcherManager kafkaSourceFetcherManager, + RecordEmitter<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplitState> recordEmitter, + Configuration config, + SourceReaderContext context, + KafkaSourceReaderMetrics kafkaSourceReaderMetrics, + KafkaDeserializationSchema<RowData> metricSchema) { + super(elementsQueue, kafkaSourceFetcherManager, recordEmitter, config, context); + this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); + this.offsetsOfFinishedSplits = new ConcurrentHashMap<>(); + this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics; + this.commitOffsetsOnCheckpoint = + config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT); + this.metricSchema = metricSchema; + if (!commitOffsetsOnCheckpoint) { + LOG.warn( + "Offset commit on checkpoint is disabled. " + + "Consuming offset will not be reported back to Kafka cluster."); + } + } + + @Override + protected void onSplitFinished(Map<String, KafkaPartitionSplitState> finishedSplitIds) { + finishedSplitIds.forEach( + (ignored, splitState) -> { + if (splitState.getCurrentOffset() >= 0) { + offsetsOfFinishedSplits.put( + splitState.getTopicPartition(), + new OffsetAndMetadata(splitState.getCurrentOffset())); + } + }); + } + + @Override + public List<KafkaPartitionSplit> snapshotState(long checkpointId) { + if (metricSchema instanceof DynamicKafkaDeserializationSchema) { + ((DynamicKafkaDeserializationSchema) metricSchema).updateCurrentCheckpointId(checkpointId); + } + List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId); + if (!commitOffsetsOnCheckpoint) { + return splits; + } + + if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) { + offsetsToCommit.put(checkpointId, Collections.emptyMap()); + } else { + Map<TopicPartition, OffsetAndMetadata> offsetsMap = + offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); + // Put the offsets of the active splits. + for (KafkaPartitionSplit split : splits) { + // If the checkpoint is triggered before the partition starting offsets + // is retrieved, do not commit the offsets for those partitions. + if (split.getStartingOffset() >= 0) { + offsetsMap.put( + split.getTopicPartition(), + new OffsetAndMetadata(split.getStartingOffset())); + } + } + // Put offsets of all the finished splits. + offsetsMap.putAll(offsetsOfFinishedSplits); + } + return splits; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + LOG.debug("Committing offsets for checkpoint {}", checkpointId); + if (!commitOffsetsOnCheckpoint) { + flushAudit(checkpointId); + return; + } + + Map<TopicPartition, OffsetAndMetadata> committedPartitions = + offsetsToCommit.get(checkpointId); + if (committedPartitions == null) { + LOG.debug( + "Offsets for checkpoint {} either do not exist or have already been committed.", + checkpointId); + flushAudit(checkpointId); + return; + } + + ((KafkaSourceFetcherManager) splitFetcherManager) + .commitOffsets( + committedPartitions, + (ignored, e) -> { + // The offset commit here is needed by the external monitoring. It won't + // break Flink job's correctness if we fail to commit the offset here. + if (e != null) { + kafkaSourceReaderMetrics.recordFailedCommit(); + LOG.warn( + "Failed to commit consumer offsets for checkpoint {}", + checkpointId, + e); + } else { + LOG.debug( + "Successfully committed offsets for checkpoint {}", + checkpointId); + kafkaSourceReaderMetrics.recordSucceededCommit(); + // If the finished topic partition has been committed, we remove it + // from the offsets of the finished splits map. + committedPartitions.forEach( + (tp, offset) -> kafkaSourceReaderMetrics.recordCommittedOffset( + tp, offset.offset())); + offsetsOfFinishedSplits + .entrySet() + .removeIf( + entry -> committedPartitions.containsKey( + entry.getKey())); + while (!offsetsToCommit.isEmpty() + && offsetsToCommit.firstKey() <= checkpointId) { + offsetsToCommit.remove(offsetsToCommit.firstKey()); + } + } + }); + flushAudit(checkpointId); + } + + private void flushAudit(long checkpointId) { + if (metricSchema instanceof DynamicKafkaDeserializationSchema) { + DynamicKafkaDeserializationSchema schema = (DynamicKafkaDeserializationSchema) metricSchema; + schema.flushAudit(); + schema.updateLastCheckpointId(checkpointId); + } + } + @Override + protected KafkaPartitionSplitState initializedState(KafkaPartitionSplit split) { + return new KafkaPartitionSplitState(split); + } + + @Override + protected KafkaPartitionSplit toSplitType(String splitId, KafkaPartitionSplitState splitState) { + return splitState.toKafkaPartitionSplit(); + } + + // ------------------------ + + @VisibleForTesting + SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> getOffsetsToCommit() { + return offsetsToCommit; + } + + @VisibleForTesting + int getNumAliveFetchers() { + return splitFetcherManager.getNumAliveFetchers(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java index 4406189081..28de46fb08 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java @@ -19,7 +19,7 @@ package org.apache.inlong.sort.kafka.table; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; -import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -38,11 +38,13 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import static org.apache.inlong.sort.kafka.table.KafkaDynamicSource.ReadableMetadata.CONSUME_TIME; + /** A specific {KafkaSerializationSchema} for {KafkaDynamicSource}. * <p> * Copy from org.apache.flink:flink-connector-kafka:1.15.4 * */ -class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> { +public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> { private static final long serialVersionUID = 1L; @@ -62,9 +64,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro private final MetricOption metricOption; - private SourceMetricData sourceMetricData; - - private int consumeTimeIndex = -1; + private SourceExactlyMetric sourceExactlyMetric; DynamicKafkaDeserializationSchema( int physicalArity, @@ -76,7 +76,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro MetadataConverter[] metadataConverters, TypeInformation<RowData> producedTypeInfo, boolean upsertMode, - MetricOption metricOption) { + MetricOption metricOption, + List<String> metadataKeys) { if (upsertMode) { Preconditions.checkArgument( keyDeserialization != null && keyProjection.length > 0, @@ -92,7 +93,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro keyProjection, valueProjection, metadataConverters, - upsertMode); + upsertMode, + metadataKeys); this.producedTypeInfo = producedTypeInfo; this.upsertMode = upsertMode; this.metricOption = metricOption; @@ -105,13 +107,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro } valueDeserialization.open(context); if (metricOption != null) { - sourceMetricData = new SourceMetricData(metricOption); - } - for (int i = 0; i < outputCollector.metadataConverters.length; i++) { - if (outputCollector.metadataConverters[i] - .equals(KafkaDynamicSource.ReadableMetadata.CONSUME_TIME.converter)) { - consumeTimeIndex = i; - } + sourceExactlyMetric = new SourceExactlyMetric(metricOption); } } @@ -121,7 +117,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro } @Override - public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { + public RowData deserialize(ConsumerRecord<byte[], byte[]> record) { throw new IllegalStateException("A collector is required for deserializing."); } @@ -132,10 +128,9 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro // also not for a cartesian product with the keys if (keyDeserialization == null && !hasMetadata) { valueDeserialization.deserialize(record.value(), - sourceMetricData == null ? collector : new MetricsCollector<>(collector, sourceMetricData)); + sourceExactlyMetric == null ? collector : new MetricsCollector<>(collector, sourceExactlyMetric)); return; } - // buffer key(s) if (keyDeserialization != null) { keyDeserialization.deserialize(record.key(), keyCollector); @@ -143,10 +138,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro // project output while emitting values outputCollector.inputRecord = record; outputCollector.physicalKeyRows = keyCollector.buffer; - if (sourceMetricData != null) { - MetricsCollector<RowData> metricsCollector = new MetricsCollector<>(collector, sourceMetricData); - metricsCollector.resetTimestamp(getRecordTime(outputCollector.metadataConverters, record)); - outputCollector.outputCollector = metricsCollector; + if (sourceExactlyMetric != null) { + outputCollector.outputCollector = new MetricsCollector<>(collector, sourceExactlyMetric); } else { outputCollector.outputCollector = collector; } @@ -159,14 +152,6 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro keyCollector.buffer.clear(); } - private Long getRecordTime(MetadataConverter[] metadataConverters, - ConsumerRecord<byte[], byte[]> record) { - if (consumeTimeIndex == -1) { - return System.currentTimeMillis(); - } - return (Long) metadataConverters[consumeTimeIndex].read(record); - } - @Override public TypeInformation<RowData> getProducedType() { return producedTypeInfo; @@ -198,6 +183,24 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro } } + public void flushAudit() { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.flushAudit(); + } + } + + public void updateCurrentCheckpointId(long checkpointId) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.updateCurrentCheckpointId(checkpointId); + } + } + + public void updateLastCheckpointId(long checkpointId) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.updateLastCheckpointId(checkpointId); + } + } + // -------------------------------------------------------------------------------------------- /** @@ -236,30 +239,34 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro private transient Collector<RowData> outputCollector; + private final List<String> metadataKeys; + OutputProjectionCollector( int physicalArity, int[] keyProjection, int[] valueProjection, MetadataConverter[] metadataConverters, - boolean upsertMode) { + boolean upsertMode, + List<String> metadataKeys) { this.physicalArity = physicalArity; this.keyProjection = keyProjection; this.valueProjection = valueProjection; this.metadataConverters = metadataConverters; this.upsertMode = upsertMode; + this.metadataKeys = metadataKeys; } @Override public void collect(RowData physicalValueRow) { // no key defined if (keyProjection.length == 0) { - emitRow(null, (GenericRowData) physicalValueRow); + emitRow(null, (GenericRowData) physicalValueRow, metadataKeys); return; } // otherwise emit a value for each key for (RowData physicalKeyRow : physicalKeyRows) { - emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow); + emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow, metadataKeys); } } @@ -270,7 +277,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro private void emitRow( @Nullable GenericRowData physicalKeyRow, - @Nullable GenericRowData physicalValueRow) { + @Nullable GenericRowData physicalValueRow, + List<String> metadataKeys) { final RowKind rowKind; if (physicalValueRow == null) { if (upsertMode) { @@ -300,9 +308,14 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro } for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) { + Object metadata = metadataConverters[metadataPos].read(inputRecord); producedRow.setField( physicalArity + metadataPos, - metadataConverters[metadataPos].read(inputRecord)); + metadata); + if (CONSUME_TIME.key.equals(metadataKeys.get(metadataPos)) && + outputCollector instanceof MetricsCollector) { + ((MetricsCollector<RowData>) outputCollector).resetTimestamp((Long) metadata); + } } outputCollector.collect(producedRow); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java index 322e6fc758..9b0b0aff64 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java @@ -18,6 +18,8 @@ package org.apache.inlong.sort.kafka.table; import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.kafka.source.KafkaSource; +import org.apache.inlong.sort.kafka.source.KafkaSourceBuilder; import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter; import org.apache.inlong.sort.protocol.node.ExtractNode; @@ -26,8 +28,6 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; @@ -442,8 +442,8 @@ public class KafkaDynamicSource } kafkaSourceBuilder .setProperties(properties) - .setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer)); - + .setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer)) + .setMetricSchema(kafkaDeserializer); return kafkaSourceBuilder.build(); } @@ -502,7 +502,8 @@ public class KafkaDynamicSource metadataConverters, producedTypeInfo, upsertMode, - metricOption); + metricOption, + metadataKeys); } private @Nullable DeserializationSchema<RowData> createDeserialization( diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index e9b2f0b4cd..e4c4590fd4 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -831,6 +831,9 @@ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java Source : org.apache.flink:flink-connector-kafka:1.15.4 (Please note that the software have been modified.) License : https://github.com/apache/flink/blob/master/LICENSE