Hi Timo, I figured it out, thanks a lot for your help. Are there any articles detailing the pre-flight and cluster phases? I couldn't find anything on ci.apache.org/projects/flink and I think this behaviour should be documented as a warning/note.
On Thu, Oct 22, 2020 at 6:44 PM Timo Walther <twal...@apache.org> wrote: > Hi Manas, > > you can use static variable but you need to make sure that the logic to > fill the static variable is accessible and executed in all JVMs. > > I assume `pipeline.properties` is in your JAR that you submit to the > cluster right? Then you should be able to access it through a singleton > pattern instead of a static variable access. > > Regards, > Timo > > > On 22.10.20 14:17, Manas Kale wrote: > > Sorry, I messed up the code snippet in the earlier mail. The correct one > > is : > > > > public static void main(String[] args) { > > Properties prop =new Properties(); > > > > InputStream is = > Config.class.getClassLoader().getResourceAsStream("pipeline.properties"); > > prop.load(is); > > > > HashMap<String, String> strMap =new HashMap<>(); > > > > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC")); > > > > new Config(strMap); > > > > ... > > > > } > > > > public class Config { > > > > public static StringCONFIG_TOPIC; > > > > publicConfig(HashMap<String, String> s) { > > > > CONFIG_TOPIC = s.get("CONFIG_TOPIC"); > > > > } > > > > } > > > > The value of CONFIG_TOPIC in a minicluster is properly loaded but null > > when run on a cluster. > > > > > > On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <manaskal...@gmail.com > > <mailto:manaskal...@gmail.com>> wrote: > > > > Hi Timo, > > Thank you for the explanation, I can start to see why I was getting > > an exception. > > Are you saying that I cannot use static variables at all when trying > > to deploy to a cluster? I would like the variables to remain static > > and not be instance-bound as they are accessed from multiple classes. > > Based on my understanding of what you said, I implemented the > > following pattern: > > > > public static void main(String[] args) { > > Properties prop =new Properties(); > > > > InputStream is = > Config.class.getClassLoader().getResourceAsStream("pipeline.properties"); > > prop.load(is); > > > > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC")); > > > > new Config(strMap, longMap); > > > > ... > > > > } > > > > public class Config { > > > > public static StringCONFIG_TOPIC; > > public static StringCONFIG_KAFKA; > > > > public Config(HashMap<String, String> s) { > > CONFIG_TOPIC = s.get("CONFIG_TOPIC"); > > CONFIG_KAFKA = s.get("CONFIG_KAFKA"); > > > > } > > > > } > > > > This produces the same issue. With the easier solution that you > > listed, are you implying I use multiple instances or a singleton > > pattern of some sort? > > > > On Thu, Oct 22, 2020 at 1:23 PM Timo Walther <twal...@apache.org > > <mailto:twal...@apache.org>> wrote: > > > > Hi Manas, > > > > you need to make sure to differentiate between what Flink calls > > "pre-flight phase" and "cluster phase". > > > > The pre-flight phase is were the pipeline is constructed and all > > functions are instantiated. They are then later serialized and > > send to > > the cluster. > > > > If you are reading your properties file in the `main()` method > > and store > > something in static variables, the content is available locally > > where > > the pipeline is constructed (e.g. in the client) but when the > > function > > instances are send to the cluster. Those static variables are > fresh > > (thus empty) in the cluster JVMs. You need to either make sure > > that the > > properties file is read from each task manager again, or easier: > > pass > > the parameters as constructor parameters into the instances such > > that > > they are shipped together with the function itself. > > > > I hope this helps. > > > > Regards, > > Timo > > > > > > On 22.10.20 09:24, Manas Kale wrote: > > > Hi, > > > I am trying to write some data to a kafka topic and I have > > the following > > > situation: > > > > > > monitorStateStream > > > > > > .process(new > > > IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER) > > > > > > /... // Stream that outputs elements of type IDAP2Alarm/ > > > > > > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA, > > > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK); > > > > > > private static <T extends IDAP2JSONOutput> > > FlinkKafkaProducer<T> getFlinkKafkaProducer(String servers, > > String topic) { > > > Properties properties =new Properties(); > > > properties.setProperty("bootstrap.servers", servers); > > > return new FlinkKafkaProducer<T>(topic, > > > (element, timestamp) -> element.serializeForKafka(), > > > properties, > > > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); > > > } > > > > > > /* > > > This interface is used to indicate that a class may be output > > to Kafka. > > > Since Kafka treats all > > > data as bytes, classes that implement this interface have to > > provide an > > > implementation for the > > > serializeForKafka() method. > > > */ > > > public interface IDAP2JSONOutput { > > > > > > // Implement serialization logic in this method. > > > ProducerRecord<byte[],byte[]> serializeForKafka(); > > > > > > } > > > > > > public class IDAP2Alarmextends Tuple5<...>implements > > IDAP2JSONOutput{ > > > > > > private final LoggerLOGGER = > > LoggerFactory.getLogger(IDAP2Alarm.class); > > > > > > @Override > > > public ProducerRecord<byte[],byte[]> serializeForKafka() { > > > byte[] rawValue; > > > byte[] rawKey; > > > String k = getMonitorFeatureKey().getMonitorName() ; > > > ... > > > > > > rawValue = val.getBytes(); > > > > > > LOGGER.info("value of alarms topic from idap2 alarm : " + > > > Config.ALARMS_TOPIC); > > > > > > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, > > rawValue); // Line 95 > > > } > > > > > > } > > > > > > > > > Config.ALARMS_TOPIC is a static string that is read from a > > properties > > > file. When I run this code on my IDE minicluster, it runs > > great with no > > > problems. But when I submit it as a jar to the cluster, I get > > the > > > following error: > > > > > > Caused by: java.lang.IllegalArgumentException: Topic cannot > > be null. > > > at > > > > > > > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71) > > > > > ~[flink_POC-0.1.jar:?] > > > at > > > > > > > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133) > > > > > ~[flink_POC-0.1.jar:?] > > > * at > > > > > > flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95) > > > > > ~[flink_POC-0.1.jar:?]* > > > at > > > > > > > flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62) > > > > > ~[flink_POC-0.1.jar:?] > > > at > > > > > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854) > > > > > ~[flink_POC-0.1.jar:?] > > > at > > > > > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) > > > > > ~[flink_POC-0.1.jar:?] > > > at > > > > > > > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > *at > > > > > > flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69) > > > > > ~[flink_POC-0.1.jar:?]* > > > * at > > > > > > flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25) > > > > > ~[flink_POC-0.1.jar:?]* > > > at > > > > > > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io> > > > > > <http://org.apache.flink.streaming.runtime.io > >.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io> > > > > > <http://org.apache.flink.streaming.runtime.io > >.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io> > > > > > <http://org.apache.flink.streaming.runtime.io > >.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242] > > > > > > Apparently Config.ALARM_TOPIC is being evaluated as null. > > Also, the > > > LOGGER statement in IDAP2Alarm above is never printed when > > running on > > > Flink cluster. In order to verify if the correct value of > > > Config.ALARM_TOPIC is read from configuration file, I printed > > it from > > > Config class - and it prints correctly. So my questions are: > > > > > > * Why does this work on a minicluster but not when > > submitted as a jar > > > to a normal cluster? I am using Flink v1.11.0 in both my > > POM file > > > and the cluster runtime. > > > * Why does the LOGGER line not get printed even though > > execution > > > definitely reached it (as seen from the stacktrace)? > > > > > > Thank you, > > > Manas Kale > > > >