Hi!

I see that there is no keyBy in your user code. Is it the case that some
Kafka partitions contain a lot more data than others? If so, you can try
datastream.rebalance() [1] to rebalance the data between each parallelism
and reduce the impact of data skew.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#rebalance--

Terry Heathcote <terry.heathcot...@gmail.com> 于2021年12月22日周三 17:00写道:

> Hi Caizhi
>
> Thank you for the response. Below is relevant code for the pipeline as
> requested, along with the Kafka properties we set for both the
> FlinkKafkaProducer and Consumer. The operator that suffers the data skew
> are the sinks.
>
> import Models.BlueprintCacheDataType;
> import PipelineBlocks.*;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.*;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.jetbrains.annotations.Nullable;
> import org.json.JSONObject;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import java.time.Duration;
>
> import org.apache.flink.streaming.api.datastream.
> SingleOutputStreamOperator;
>
> import java.nio.charset.StandardCharsets;
> import java.util.ArrayList;
> import java.util.List;
>
>
> public class IngestionStreamer {
> private StreamExecutionEnvironment env;
> private KafkaConfig config;
> private String commanderUrl;
> private static final Logger LOG = LoggerFactory.getLogger(
> IngestionStreamer.class);
>
> public IngestionStreamer(StreamExecutionEnvironment env, KafkaConfig
> config, String commanderUrl) {
> this.env = env;
> this.config = config;
> this.commanderUrl = commanderUrl;
> }
>
> public void StartStreamer(String topic) {
>
> FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
> new SimpleStringSchema(), config.kafkaPropertiesConsumer);
> DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
>
> DataStream<BlueprintCacheDataType> result = kafkaStream
> .map(new KeySorter()).uid("Mapper")
> .process(new FetchBlueprintInfoKeyed(commanderUrl))uid("BP_Fetch");
>
> DataStream<ProducerRecord<byte[], byte[]>> blueprints = result.process(new
> MapToBlueprints()).uid("Blueprint map");
> DataStream<ProducerRecord<byte[], byte[]>> swamp = result.process(new
> ToSwamp()).uid("Swamp map");
>
> blueprints.addSink(new FlinkKafkaProducer<>(
> "default-blueprint",
> new KafkaSerializationSchema<ProducerRecord<byte[], byte[]>>() {
> @Override
> public ProducerRecord<byte[], byte[]> serialize(ProducerRecord<byte[],
> byte[]> producerRecord, @Nullable Long aLong) {
> return new ProducerRecord<>(producerRecord.topic(), producerRecord.value
> ());
> }
> },
> config.kafkaPropertiesProducer,
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE
> )).name("Blueprint sink").uid("BP_Sink");
>
> swamp.addSink(new FlinkKafkaProducer<>(
> "default-swamp",
> new KafkaSerializationSchema<ProducerRecord<byte[], byte[]>>() {
> @Override
> public ProducerRecord<byte[], byte[]> serialize(ProducerRecord<byte[],
> byte[]> producerRecord, @Nullable Long aLong) {
> return new ProducerRecord<>(producerRecord.topic(), producerRecord.value
> ());
> }
> },
> config.kafkaPropertiesProducer,
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE
> )).name("Swamp sink").uid("Swamp_Sink");
>
>
> }
>
> }
>
>
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
>
> import java.util.Properties;
>
> public class KafkaConfig {
>
> public Properties kafkaPropertiesConsumer;
> public Properties kafkaPropertiesProducer;
>
> public KafkaConfig(Main.Environment env) {
> kafkaPropertiesConsumer = addKafkaConsumerProperties(KafkaProperties
> (env));
> kafkaPropertiesProducer = addKafkaProducerProperties(KafkaProperties
> (env));
> }
>
> public Properties KafkaProperties(Main.Environment env) {
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers",
> "kafka-0-external:9094,kafka-1-external:9094,kafka-2-external:9094,kafka-3-external:9094,kafka-4-external:9094"
> );
> properties.setProperty("group.id", System.getenv("KAFKA_GROUP_ID"));
> properties.setProperty("security.protocol", "SASL_PLAINTEXT");
> properties.setProperty("auto.commit.interval.ms", "1000");
> properties.setProperty("sasl.mechanism", "PLAIN");
> properties.setProperty("auto.offset.reset", "earliest");
> properties.setProperty("session.timeout.ms", "300000");
> properties.setProperty("default.api.timeout.ms", "300000");
> properties.setProperty("request.timeout.ms", "300000");
> properties.setProperty("flink.partition-discovery.interval-millis",
> "50000");
> return properties;
> }
>
> public Properties addKafkaConsumerProperties(Properties properties) {
> properties.setProperty("key.deserializer", StringDeserializer.class.
> getName());
> properties.setProperty("value.deserializer", StringDeserializer.class.
> getName());
> properties.setProperty("max.poll.interval.ms", "300000");
> return properties;
> }
>
> public Properties addKafkaProducerProperties(Properties properties) {
> properties.setProperty("transaction.timeout.ms", "900000");
> properties.setProperty("max.block.ms", "600000");
> properties.setProperty("delivery.timeout.ms", "300000");
> return properties;
> }
> }
>
>
> Best regards
> Terry
> ᐧ
>
> On Wed, Dec 22, 2021 at 3:50 AM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> From your description this is due to data skew. The common solution to
>> data skew is to add a random value to your partition keys so that data can
>> be distributed evenly into downstream operators.
>>
>> Could you provide more information about your job (preferably user code
>> or SQL code), especially the operator that suffers from data skew?
>>
>> Terry Heathcote <terry.heathcot...@gmail.com> 于2021年12月22日周三 02:53写道:
>>
>>> Hi
>>>
>>> We are having trouble with record throughput that we believe to be a
>>> result of slow checkpoint durations. The job uses Kafka as both a source
>>> and sink as well as a Redis-backed service within the same cluster, used to
>>> enrich the data in a transformation, before writing records back to Kafka.
>>> Below is a description of the job:
>>>
>>>    - Flink Version 12.5
>>>    - Source topic = 24 partitions.
>>>    - Multiple sink topics.
>>>    - Parallelism set to 24.
>>>    - Operators applied are a map function and process function to fetch
>>>    the Redis data.
>>>    - EXACTLY_ONCE processing is required.
>>>
>>> We have switched between aligned and unaligned checkpoints but with no
>>> improvement in performance. What we have witnessed is that on average the
>>> majority of operators and their respective subtasks acknowledge checkpoints
>>> within milliseconds but 1 or 2 subtasks wait 2 to 4 mins before
>>> acknowledging the checkpoint. Also, the subtask load seems skewed after
>>> applying transformations prior to the sinks (tried to rebalance and shuffle
>>> here but with no improvement). Checkpoint duration can vary between 5s and
>>> 7 minutes.
>>>
>>> We believe this is slowing our overall job throughput as Kafka
>>> transaction commits are delayed by slower checkpointing, creating upstream
>>> backpressure, and a buildup on the source Kafka topic offsets. We would
>>> ideally like to decrease the checkpoint interval once durations are low and
>>> stable.
>>>
>>> Any help on this would be greatly appreciated.
>>>
>>> Best regards
>>> Terry
>>> ᐧ
>>>
>>

Reply via email to