[ 
https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz closed FLINK-16509.
-------------------------------
    Release Note: It was a side effect. When everything goes right, we do have 
a context here.
      Resolution: Not A Problem

> FlinkKafkaConsumerBase tries to log a context that may not have been 
> initialized and fails
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-16509
>                 URL: https://issues.apache.org/jira/browse/FLINK-16509
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.10.0
>         Environment: Unit test on local cluster, calling a unit test local 
> kafka server.
>            Reporter: Arnaud Linz
>            Priority: Major
>
> New code of FlinkKafkaConsumerBase#initializeState(), logs restored state 
> with:
> {code:java}
> (...)
> LOG.info("Consumer subtask {} restored state: {}.", 
> getRuntimeContext().getIndexOfThisSubtask(), restoredState); 
> }
>  else { 
>    LOG.info("Consumer subtask {} has no restore state.", 
> getRuntimeContext().getIndexOfThisSubtask()); 
> }
>   {code}
>  
> where as old (1.8.0) class was logging without calling getRuntimeContext :
>  
> {code:java}
> (...)
>   LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
> restoredState); 
> }
> else {
>  LOG.info("No restore state for FlinkKafkaConsumer."); 
> }{code}
>  
> This causes a regression in my Kafka source unit test with exception: 
> {code:java}
> java.lang.IllegalStateException: The runtime context has not been 
> initialized.   
> at 
> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>     
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)
>     
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>     
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>     
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>     
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
>     
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>     
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>     
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>     
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>     
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>     
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)    
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)    
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> As the context is not always available at that point (initalizeState being 
> called before open I guess)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to