Hang Ruan created FLINK-23854:
---------------------------------
Summary: KafkaSink error when restart from the checkpoint with a
lower parallelism
Key: FLINK-23854
URL: https://issues.apache.org/jira/browse/FLINK-23854
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.14.0
Reporter: Hang Ruan
The KafkaSink throws the exception when restarted with a lower parallelism. The
exception is like this.
{code:java}
// code placeholder
java.lang.IllegalStateException: Internal error: It is expected that state from
previous executions is distributed to the same subtask id. at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at
org.apache.flink.connector.kafka.sink.KafkaWriter.recoverAndInitializeState(KafkaWriter.java:178)
at
org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:130)
at
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:99)
at
org.apache.flink.streaming.runtime.operators.sink.SinkOperator.initializeState(SinkOperator.java:134)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:690)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:666)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:785)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:638)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:572) at
java.lang.Thread.run(Thread.java:748) Suppressed:
java.lang.NullPointerException at
org.apache.flink.streaming.runtime.operators.sink.SinkOperator.close(SinkOperator.java:195)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1028)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1014)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:927)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:797)
... 4 more
{code}
I start the kafka cluster(kafka_2.13-2.8.0) and the flink cluster in my own
mac. I change the parallelism from 4 to 2 and restart the job from some
completed checkpoint. Then the error occurs.
And the cli command and the code are as follows.
{code:java}
// cli command
./bin/flink run -d -c com.test.KafkaExactlyOnceScaleDownTest -s
/Users/test/checkpointDir/ExactlyOnceTest1/67105fcc1724e147fc6208af0dd90618/chk-1
/Users/test/project/self/target/test.jar
{code}
{code:java}
public class KafkaExactlyOnceScaleDownTest {
public static void main(String[] args) throws Exception {
final String kafkaSourceTopic = "flinkSourceTest";
final String kafkaSinkTopic = "flinkSinkExactlyTest1";
final String groupId = "ExactlyOnceTest1";
final String brokers = "localhost:9092";
final String ckDir = "file:///Users/hangruan/checkpointDir/" + groupId;
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointStorage(ckDir); env.setParallelism(4);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers) .setTopics(kafkaSourceTopic) .setGroupId(groupId)
.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.setValueOnlyDeserializer(new SimpleStringSchema()) .build();
DataStream<String> flintstones = env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source"); DataStream<String> adults =
flintstones.filter(s -> s != null && s.length() > 2); Properties props = new
Properties(); props.setProperty("transaction.timeout.ms", "900000");
adults.sinkTo(KafkaSink.<String>builder() .setBootstrapServers(brokers)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("tp-test-") .setKafkaProducerConfig(props)
.setRecordSerializer(new SelfSerializationSchema(kafkaSinkTopic, new
SimpleStringSchema())) .build()); env.registerJobListener(new JobListener() {
@Override public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable
Throwable throwable) { System.out.println("======submitted======="); }
@Override public void onJobExecuted(@Nullable JobExecutionResult
jobExecutionResult, @Nullable Throwable throwable) {
System.out.println("======executed======="); } });
env.execute("ScaleDownTest"); } static class SelfSerializationSchema implements
KafkaRecordSerializationSchema<String> { private final
SerializationSchema<String> valueSerialization; private String topic;
SelfSerializationSchema(String topic, SerializationSchema<String>
valueSerialization){ this.valueSerialization = valueSerialization; this.topic =
topic; } @Override public void open(SerializationSchema.InitializationContext
context, KafkaSinkContext sinkContext) throws Exception {
KafkaRecordSerializationSchema.super.open(context, sinkContext); } @Override
public ProducerRecord<byte[], byte[]> serialize(String s, KafkaSinkContext
kafkaSinkContext, Long aLong) { final byte[] valueSerialized =
valueSerialization.serialize(s); return new ProducerRecord<>(topic,
valueSerialized); } } }
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)