[ https://issues.apache.org/jira/browse/FLINK-23854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401021#comment-17401021 ]
Hang Ruan edited comment on FLINK-23854 at 8/18/21, 12:41 PM: -------------------------------------------------------------- After adding log in StatefulSinkWriterStateHandler class, the distribution of the operator state changes as follow. ||parallelism 4(before)||parallelism 2(after)|| |subTask0 -> state0 subTask1 -> state1 subTask2 -> state2 subTask3 -> state3|subTask0 -> state0 & state1 subTask1 - > state2 & state3| Here are the log and the reult after changing to 2 parallelism. ||Add log in initializeState method of StatefulSinkWriterStateHandle||the log of subtask0||the log of subtask1|| |{code:java} public List<WriterStateT> initializeState(StateInitializationContext context) throws Exception { final ListState<byte[]> rawState = context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC); writerState = new SimpleVersionedListState<>(rawState, writerStateSimpleVersionedSerializer); final List<WriterStateT> writerStates = CollectionUtil.iterableToList(writerState.get()); final List<WriterStateT> states = new ArrayList<>(writerStates); // add log for(WriterStateT stateT : states) { LOG.info("Stateful Sink Writer state handler:" + stateT.toString()); } for (String previousSinkStateName : previousSinkStateNames) { ...... } return states; } {code}|2021-08-18 20:02:04,214 INFO org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler [] - Stateful Sink Writer state handler:KafkaWriterState\{subtaskId=1, transactionalIdOffset=8, transactionalIdPrefix='tp-test-'} 2021-08-18 20:02:04,214 INFO org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler [] - Stateful Sink Writer state handler:KafkaWriterState\{subtaskId=0, transactionalIdOffset=8, transactionalIdPrefix='tp-test-'}|2021-08-18 20:02:04,220 INFO org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler [] - Stateful Sink Writer state handler:KafkaWriterState\{subtaskId=2, transactionalIdOffset=8, transactionalIdPrefix='tp-test-'} 2021-08-18 20:02:04,220 INFO org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler [] - Stateful Sink Writer state handler:KafkaWriterState\{subtaskId=3, transactionalIdOffset=8, transactionalIdPrefix='tp-test-'}| was (Author: ruanhang1993): After adding log in StatefulSinkWriterStateHandler class, the distribution of the operator state as follows. ||parallelism 4||parallelism 2|| |subTask0 -> state0 subTask1 -> state1 subTask2 -> state2 subTask3 -> state3|subTask0 -> state0 & state1 subTask1 - > state2 & state3| Here are the log and the reult. ||Add log in initializeState method of StatefulSinkWriterStateHandle||the log of subtask0||the log of subtask1|| |{code:java} public List<WriterStateT> initializeState(StateInitializationContext context) throws Exception { final ListState<byte[]> rawState = context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC); writerState = new SimpleVersionedListState<>(rawState, writerStateSimpleVersionedSerializer); final List<WriterStateT> writerStates = CollectionUtil.iterableToList(writerState.get()); final List<WriterStateT> states = new ArrayList<>(writerStates); // add log for(WriterStateT stateT : states) { LOG.info("Stateful Sink Writer state handler:" + stateT.toString()); } for (String previousSinkStateName : previousSinkStateNames) { ...... } return states; } {code}|2021-08-18 20:02:04,214 INFO org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler [] - Stateful Sink Writer state handler:KafkaWriterState\{subtaskId=1, transactionalIdOffset=8, transactionalIdPrefix='tp-test-'} 2021-08-18 20:02:04,214 INFO org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler [] - Stateful Sink Writer state handler:KafkaWriterState\{subtaskId=0, transactionalIdOffset=8, transactionalIdPrefix='tp-test-'}|2021-08-18 20:02:04,220 INFO org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler [] - Stateful Sink Writer state handler:KafkaWriterState\{subtaskId=2, transactionalIdOffset=8, transactionalIdPrefix='tp-test-'} 2021-08-18 20:02:04,220 INFO org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler [] - Stateful Sink Writer state handler:KafkaWriterState\{subtaskId=3, transactionalIdOffset=8, transactionalIdPrefix='tp-test-'}| > KafkaSink error when restart from the checkpoint with a lower parallelism by > exactly-once guarantee > --------------------------------------------------------------------------------------------------- > > Key: FLINK-23854 > URL: https://issues.apache.org/jira/browse/FLINK-23854 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.14.0 > Reporter: Hang Ruan > Priority: Blocker > Labels: release-testing > > The KafkaSink throws the exception when restarted with a lower parallelism > and the exactly-once guarantee. The exception is like this. > {code:java} > // code placeholder > java.lang.IllegalStateException: Internal error: It is expected that state > from previous executions is distributed to the same subtask id. at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at > org.apache.flink.connector.kafka.sink.KafkaWriter.recoverAndInitializeState(KafkaWriter.java:178) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:130) > at > org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:99) > at > org.apache.flink.streaming.runtime.operators.sink.SinkOperator.initializeState(SinkOperator.java:134) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:690) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:666) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:785) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:638) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:572) at > java.lang.Thread.run(Thread.java:748) Suppressed: > java.lang.NullPointerException at > org.apache.flink.streaming.runtime.operators.sink.SinkOperator.close(SinkOperator.java:195) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1028) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1014) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:927) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:797) > ... 4 more > {code} > I start the kafka cluster(kafka_2.13-2.8.0) and the flink cluster in my own > mac. I change the parallelism from 4 to 2 and restart the job from some > completed checkpoint. Then the error occurs. > And the cli command and the code are as follows. > {code:java} > // cli command > ./bin/flink run -d -c com.test.KafkaExactlyOnceScaleDownTest -s > /Users/test/checkpointDir/ExactlyOnceTest1/67105fcc1724e147fc6208af0dd90618/chk-1 > /Users/test/project/self/target/test.jar > {code} > {code:java} > public class KafkaExactlyOnceScaleDownTest { > public static void main(String[] args) throws Exception { > final String kafkaSourceTopic = "flinkSourceTest"; > final String kafkaSinkTopic = "flinkSinkExactlyTest1"; > final String groupId = "ExactlyOnceTest1"; > final String brokers = "localhost:9092"; > final String ckDir = "file:///Users/test/checkpointDir/" + groupId; > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(60000); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > env.getCheckpointConfig().setCheckpointStorage(ckDir); > env.setParallelism(4); > KafkaSource<String> source = KafkaSource.<String>builder() > .setBootstrapServers(brokers) > .setTopics(kafkaSourceTopic) > .setGroupId(groupId) > .setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") > .setValueOnlyDeserializer(new SimpleStringSchema()) > .build(); > DataStream<String> flintstones = env.fromSource(source, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > DataStream<String> adults = flintstones.filter(s -> s != null && > s.length() > 2); > Properties props = new Properties(); > props.setProperty("transaction.timeout.ms", "900000"); > adults.sinkTo(KafkaSink.<String>builder() > .setBootstrapServers(brokers) > .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) > .setTransactionalIdPrefix("tp-test-") > .setKafkaProducerConfig(props) > .setRecordSerializer(new SelfSerializationSchema(kafkaSinkTopic, new > SimpleStringSchema())) > .build()); > env.execute("ScaleDownTest"); > } > static class SelfSerializationSchema implements > KafkaRecordSerializationSchema<String> { private final > SerializationSchema<String> valueSerialization; private String topic; > SelfSerializationSchema(String topic, SerializationSchema<String> > valueSerialization){ this.valueSerialization = valueSerialization; this.topic > = topic; } @Override public void > open(SerializationSchema.InitializationContext context, KafkaSinkContext > sinkContext) throws Exception { > KafkaRecordSerializationSchema.super.open(context, sinkContext); } @Override > public ProducerRecord<byte[], byte[]> serialize(String s, KafkaSinkContext > kafkaSinkContext, Long aLong) { final byte[] valueSerialized = > valueSerialization.serialize(s); return new ProducerRecord<>(topic, > valueSerialized); } } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)