[ https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Arnaud Linz closed FLINK-16509. ------------------------------- Release Note: It was a side effect. When everything goes right, we do have a context here. Resolution: Not A Problem > FlinkKafkaConsumerBase tries to log a context that may not have been > initialized and fails > ------------------------------------------------------------------------------------------ > > Key: FLINK-16509 > URL: https://issues.apache.org/jira/browse/FLINK-16509 > Project: Flink > Issue Type: Bug > Affects Versions: 1.10.0 > Environment: Unit test on local cluster, calling a unit test local > kafka server. > Reporter: Arnaud Linz > Priority: Major > > New code of FlinkKafkaConsumerBase#initializeState(), logs restored state > with: > {code:java} > (...) > LOG.info("Consumer subtask {} restored state: {}.", > getRuntimeContext().getIndexOfThisSubtask(), restoredState); > } > else { > LOG.info("Consumer subtask {} has no restore state.", > getRuntimeContext().getIndexOfThisSubtask()); > } > {code} > > where as old (1.8.0) class was logging without calling getRuntimeContext : > > {code:java} > (...) > LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", > restoredState); > } > else { > LOG.info("No restore state for FlinkKafkaConsumer."); > }{code} > > This causes a regression in my Kafka source unit test with exception: > {code:java} > java.lang.IllegalStateException: The runtime context has not been > initialized. > at > org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886) > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > {code} > > As the context is not always available at that point (initalizeState being > called before open I guess) -- This message was sent by Atlassian Jira (v8.3.4#803005)