Hi Caizhi Thank you for the response. Below is relevant code for the pipeline as requested, along with the Kafka properties we set for both the FlinkKafkaProducer and Consumer. The operator that suffers the data skew are the sinks.
import Models.BlueprintCacheDataType; import PipelineBlocks.*; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment ; import org.apache.flink.streaming.connectors.kafka.*; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.jetbrains.annotations.Nullable; import org.json.JSONObject; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import java.time.Duration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; public class IngestionStreamer { private StreamExecutionEnvironment env; private KafkaConfig config; private String commanderUrl; private static final Logger LOG = LoggerFactory.getLogger(IngestionStreamer. class); public IngestionStreamer(StreamExecutionEnvironment env, KafkaConfig config, String commanderUrl) { this.env = env; this.config = config; this.commanderUrl = commanderUrl; } public void StartStreamer(String topic) { FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), config.kafkaPropertiesConsumer); DataStream<String> kafkaStream = env.addSource(kafkaConsumer); DataStream<BlueprintCacheDataType> result = kafkaStream .map(new KeySorter()).uid("Mapper") .process(new FetchBlueprintInfoKeyed(commanderUrl))uid("BP_Fetch"); DataStream<ProducerRecord<byte[], byte[]>> blueprints = result.process(new MapToBlueprints()).uid("Blueprint map"); DataStream<ProducerRecord<byte[], byte[]>> swamp = result.process(new ToSwamp()).uid("Swamp map"); blueprints.addSink(new FlinkKafkaProducer<>( "default-blueprint", new KafkaSerializationSchema<ProducerRecord<byte[], byte[]>>() { @Override public ProducerRecord<byte[], byte[]> serialize(ProducerRecord<byte[], byte[]> producerRecord, @Nullable Long aLong) { return new ProducerRecord<>(producerRecord.topic(), producerRecord.value()); } }, config.kafkaPropertiesProducer, FlinkKafkaProducer.Semantic.EXACTLY_ONCE )).name("Blueprint sink").uid("BP_Sink"); swamp.addSink(new FlinkKafkaProducer<>( "default-swamp", new KafkaSerializationSchema<ProducerRecord<byte[], byte[]>>() { @Override public ProducerRecord<byte[], byte[]> serialize(ProducerRecord<byte[], byte[]> producerRecord, @Nullable Long aLong) { return new ProducerRecord<>(producerRecord.topic(), producerRecord.value()); } }, config.kafkaPropertiesProducer, FlinkKafkaProducer.Semantic.EXACTLY_ONCE )).name("Swamp sink").uid("Swamp_Sink"); } } import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaConfig { public Properties kafkaPropertiesConsumer; public Properties kafkaPropertiesProducer; public KafkaConfig(Main.Environment env) { kafkaPropertiesConsumer = addKafkaConsumerProperties(KafkaProperties(env)); kafkaPropertiesProducer = addKafkaProducerProperties(KafkaProperties(env)); } public Properties KafkaProperties(Main.Environment env) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "kafka-0-external:9094,kafka-1-external:9094,kafka-2-external:9094,kafka-3-external:9094,kafka-4-external:9094" ); properties.setProperty("group.id", System.getenv("KAFKA_GROUP_ID")); properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("sasl.mechanism", "PLAIN"); properties.setProperty("auto.offset.reset", "earliest"); properties.setProperty("session.timeout.ms", "300000"); properties.setProperty("default.api.timeout.ms", "300000"); properties.setProperty("request.timeout.ms", "300000"); properties.setProperty("flink.partition-discovery.interval-millis", "50000" ); return properties; } public Properties addKafkaConsumerProperties(Properties properties) { properties.setProperty("key.deserializer", StringDeserializer.class.getName ()); properties.setProperty("value.deserializer", StringDeserializer.class. getName()); properties.setProperty("max.poll.interval.ms", "300000"); return properties; } public Properties addKafkaProducerProperties(Properties properties) { properties.setProperty("transaction.timeout.ms", "900000"); properties.setProperty("max.block.ms", "600000"); properties.setProperty("delivery.timeout.ms", "300000"); return properties; } } Best regards Terry ᐧ On Wed, Dec 22, 2021 at 3:50 AM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > From your description this is due to data skew. The common solution to > data skew is to add a random value to your partition keys so that data can > be distributed evenly into downstream operators. > > Could you provide more information about your job (preferably user code or > SQL code), especially the operator that suffers from data skew? > > Terry Heathcote <terry.heathcot...@gmail.com> 于2021年12月22日周三 02:53写道: > >> Hi >> >> We are having trouble with record throughput that we believe to be a >> result of slow checkpoint durations. The job uses Kafka as both a source >> and sink as well as a Redis-backed service within the same cluster, used to >> enrich the data in a transformation, before writing records back to Kafka. >> Below is a description of the job: >> >> - Flink Version 12.5 >> - Source topic = 24 partitions. >> - Multiple sink topics. >> - Parallelism set to 24. >> - Operators applied are a map function and process function to fetch >> the Redis data. >> - EXACTLY_ONCE processing is required. >> >> We have switched between aligned and unaligned checkpoints but with no >> improvement in performance. What we have witnessed is that on average the >> majority of operators and their respective subtasks acknowledge checkpoints >> within milliseconds but 1 or 2 subtasks wait 2 to 4 mins before >> acknowledging the checkpoint. Also, the subtask load seems skewed after >> applying transformations prior to the sinks (tried to rebalance and shuffle >> here but with no improvement). Checkpoint duration can vary between 5s and >> 7 minutes. >> >> We believe this is slowing our overall job throughput as Kafka >> transaction commits are delayed by slower checkpointing, creating upstream >> backpressure, and a buildup on the source Kafka topic offsets. We would >> ideally like to decrease the checkpoint interval once durations are low and >> stable. >> >> Any help on this would be greatly appreciated. >> >> Best regards >> Terry >> ᐧ >> >