Hi,

it seems from the stack trace, that you are calling the restoreState()
method yourself in ReplayTest.getKafkaSource(ReplayTest.java:50):

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.
restoreState(FlinkKafkaConsumerBase.java:388)
        at
in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.
getKafkaSource(ReplayTest.java:50)


This method is actually an internal method called by the system on the task
managers when restoring state.
You are calling the method on the client when submitting the Flink job.
That's why "The runtime context has not been initialized.".


On Wed, Aug 23, 2017 at 7:34 AM, sohimankotia <sohimanko...@gmail.com>
wrote:

> Hi,
>
> I am trying to replay kafka logs from specific offset . But I am not able
> to
> make it work .
>
>
> Using Ref :
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/connectors/kafka.html#kafka-consumers-
> start-position-configuration
>
> My Code :
>
>
>
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
> import
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>
> import java.util.*;
>
>
> public class ReplayTest {
>
>         public static void main(String[] args) throws Exception {
>
>                 StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>                 env.setParallelism(1);
>
>                 final FlinkKafkaConsumer010<String> kafkaSource =
> getKafkaSource();
>                 final DataStreamSource<String> in =
> env.addSource(kafkaSource);
>
>                 in.addSink(new PrintSinkFunction<>());
>                 in.addSink(getKafkaSink());
>
>                 env.execute();
>
>
>         }
>
>         private static FlinkKafkaConsumer010<String> getKafkaSource() {
>                 Properties properties = new Properties();
>                 properties.setProperty("bootstrap.servers",
> "localhost:9092");
>                 properties.setProperty("zookeeper.connect",
> "localhost:8081");
>                 properties.setProperty("group.id", "test11");
>                 final FlinkKafkaConsumer010<String> consumer = new
> FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties);
>                 HashMap<KafkaTopicPartition, Long> specificStartOffsets =
> new HashMap<>();
>                 specificStartOffsets.put(new KafkaTopicPartition("test",
> 0), 2L);
>                 consumer.restoreState(specificStartOffsets);
>                 return consumer;
>         }
>
>
>         private static FlinkKafkaProducer010<String> getKafkaSink() {
>                 Properties properties = new Properties();
>                 properties.setProperty("bootstrap.servers",
> "localhost:9092");
>                 return new FlinkKafkaProducer010<>("test2", new
> SimpleStringSchema(),
> properties);
>         }
>
>
> }
>
>
> I am using <flink.version>1.2.1</flink.version> for all flink
> dependencies .
>
> When I am running code on IDE or local flink set up , I am getting
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method
> caused an error.
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:545)
>         at
> org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:419)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>         at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:831)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>         at org.apache.flink.client.CliFrontend$2.call(
> CliFrontend.java:1120)
>         at org.apache.flink.client.CliFrontend$2.call(
> CliFrontend.java:1117)
>         at
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1548)
>         at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:40)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: java.lang.IllegalStateException: The runtime context has not
> been
> initialized.
>         at
> org.apache.flink.api.common.functions.AbstractRichFunction.
> getRuntimeContext(AbstractRichFunction.java:53)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.
> restoreState(FlinkKafkaConsumerBase.java:388)
>         at
> in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.
> getKafkaSource(ReplayTest.java:50)
>         at
> in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.
> main(ReplayTest.java:27)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:528)
>         ... 13 more
>
>
> Thanks and Regards
> Sohanvir
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Reset-Kafka-
> Consumer-using-Flink-Consumer-10-API-tp15077.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to