Hi! I see that there is no keyBy in your user code. Is it the case that some Kafka partitions contain a lot more data than others? If so, you can try datastream.rebalance() [1] to rebalance the data between each parallelism and reduce the impact of data skew.
[1] https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#rebalance-- Terry Heathcote <terry.heathcot...@gmail.com> 于2021年12月22日周三 17:00写道: > Hi Caizhi > > Thank you for the response. Below is relevant code for the pipeline as > requested, along with the Kafka properties we set for both the > FlinkKafkaProducer and Consumer. The operator that suffers the data skew > are the sinks. > > import Models.BlueprintCacheDataType; > import PipelineBlocks.*; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment. > StreamExecutionEnvironment; > import org.apache.flink.streaming.connectors.kafka.*; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.serialization.StringDeserializer; > import org.jetbrains.annotations.Nullable; > import org.json.JSONObject; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import java.time.Duration; > > import org.apache.flink.streaming.api.datastream. > SingleOutputStreamOperator; > > import java.nio.charset.StandardCharsets; > import java.util.ArrayList; > import java.util.List; > > > public class IngestionStreamer { > private StreamExecutionEnvironment env; > private KafkaConfig config; > private String commanderUrl; > private static final Logger LOG = LoggerFactory.getLogger( > IngestionStreamer.class); > > public IngestionStreamer(StreamExecutionEnvironment env, KafkaConfig > config, String commanderUrl) { > this.env = env; > this.config = config; > this.commanderUrl = commanderUrl; > } > > public void StartStreamer(String topic) { > > FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, > new SimpleStringSchema(), config.kafkaPropertiesConsumer); > DataStream<String> kafkaStream = env.addSource(kafkaConsumer); > > DataStream<BlueprintCacheDataType> result = kafkaStream > .map(new KeySorter()).uid("Mapper") > .process(new FetchBlueprintInfoKeyed(commanderUrl))uid("BP_Fetch"); > > DataStream<ProducerRecord<byte[], byte[]>> blueprints = result.process(new > MapToBlueprints()).uid("Blueprint map"); > DataStream<ProducerRecord<byte[], byte[]>> swamp = result.process(new > ToSwamp()).uid("Swamp map"); > > blueprints.addSink(new FlinkKafkaProducer<>( > "default-blueprint", > new KafkaSerializationSchema<ProducerRecord<byte[], byte[]>>() { > @Override > public ProducerRecord<byte[], byte[]> serialize(ProducerRecord<byte[], > byte[]> producerRecord, @Nullable Long aLong) { > return new ProducerRecord<>(producerRecord.topic(), producerRecord.value > ()); > } > }, > config.kafkaPropertiesProducer, > FlinkKafkaProducer.Semantic.EXACTLY_ONCE > )).name("Blueprint sink").uid("BP_Sink"); > > swamp.addSink(new FlinkKafkaProducer<>( > "default-swamp", > new KafkaSerializationSchema<ProducerRecord<byte[], byte[]>>() { > @Override > public ProducerRecord<byte[], byte[]> serialize(ProducerRecord<byte[], > byte[]> producerRecord, @Nullable Long aLong) { > return new ProducerRecord<>(producerRecord.topic(), producerRecord.value > ()); > } > }, > config.kafkaPropertiesProducer, > FlinkKafkaProducer.Semantic.EXACTLY_ONCE > )).name("Swamp sink").uid("Swamp_Sink"); > > > } > > } > > > import org.apache.kafka.common.serialization.StringDeserializer; > import org.apache.kafka.common.serialization.StringSerializer; > > import java.util.Properties; > > public class KafkaConfig { > > public Properties kafkaPropertiesConsumer; > public Properties kafkaPropertiesProducer; > > public KafkaConfig(Main.Environment env) { > kafkaPropertiesConsumer = addKafkaConsumerProperties(KafkaProperties > (env)); > kafkaPropertiesProducer = addKafkaProducerProperties(KafkaProperties > (env)); > } > > public Properties KafkaProperties(Main.Environment env) { > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", > "kafka-0-external:9094,kafka-1-external:9094,kafka-2-external:9094,kafka-3-external:9094,kafka-4-external:9094" > ); > properties.setProperty("group.id", System.getenv("KAFKA_GROUP_ID")); > properties.setProperty("security.protocol", "SASL_PLAINTEXT"); > properties.setProperty("auto.commit.interval.ms", "1000"); > properties.setProperty("sasl.mechanism", "PLAIN"); > properties.setProperty("auto.offset.reset", "earliest"); > properties.setProperty("session.timeout.ms", "300000"); > properties.setProperty("default.api.timeout.ms", "300000"); > properties.setProperty("request.timeout.ms", "300000"); > properties.setProperty("flink.partition-discovery.interval-millis", > "50000"); > return properties; > } > > public Properties addKafkaConsumerProperties(Properties properties) { > properties.setProperty("key.deserializer", StringDeserializer.class. > getName()); > properties.setProperty("value.deserializer", StringDeserializer.class. > getName()); > properties.setProperty("max.poll.interval.ms", "300000"); > return properties; > } > > public Properties addKafkaProducerProperties(Properties properties) { > properties.setProperty("transaction.timeout.ms", "900000"); > properties.setProperty("max.block.ms", "600000"); > properties.setProperty("delivery.timeout.ms", "300000"); > return properties; > } > } > > > Best regards > Terry > ᐧ > > On Wed, Dec 22, 2021 at 3:50 AM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> From your description this is due to data skew. The common solution to >> data skew is to add a random value to your partition keys so that data can >> be distributed evenly into downstream operators. >> >> Could you provide more information about your job (preferably user code >> or SQL code), especially the operator that suffers from data skew? >> >> Terry Heathcote <terry.heathcot...@gmail.com> 于2021年12月22日周三 02:53写道: >> >>> Hi >>> >>> We are having trouble with record throughput that we believe to be a >>> result of slow checkpoint durations. The job uses Kafka as both a source >>> and sink as well as a Redis-backed service within the same cluster, used to >>> enrich the data in a transformation, before writing records back to Kafka. >>> Below is a description of the job: >>> >>> - Flink Version 12.5 >>> - Source topic = 24 partitions. >>> - Multiple sink topics. >>> - Parallelism set to 24. >>> - Operators applied are a map function and process function to fetch >>> the Redis data. >>> - EXACTLY_ONCE processing is required. >>> >>> We have switched between aligned and unaligned checkpoints but with no >>> improvement in performance. What we have witnessed is that on average the >>> majority of operators and their respective subtasks acknowledge checkpoints >>> within milliseconds but 1 or 2 subtasks wait 2 to 4 mins before >>> acknowledging the checkpoint. Also, the subtask load seems skewed after >>> applying transformations prior to the sinks (tried to rebalance and shuffle >>> here but with no improvement). Checkpoint duration can vary between 5s and >>> 7 minutes. >>> >>> We believe this is slowing our overall job throughput as Kafka >>> transaction commits are delayed by slower checkpointing, creating upstream >>> backpressure, and a buildup on the source Kafka topic offsets. We would >>> ideally like to decrease the checkpoint interval once durations are low and >>> stable. >>> >>> Any help on this would be greatly appreciated. >>> >>> Best regards >>> Terry >>> ᐧ >>> >>