Hi Timo, Sure, I have opened this <https://issues.apache.org/jira/browse/FLINK-19807> issue on Jira.
On Fri, Oct 23, 2020 at 4:09 PM Timo Walther <[email protected]> wrote: > Hi Manas, > > that is a good point. Feel free to open an issue for this. It is not the > first time that your question appeared on the mailing list. > > Regards, > Timo > > On 23.10.20 07:22, Manas Kale wrote: > > Hi Timo, > > I figured it out, thanks a lot for your help. > > Are there any articles detailing the pre-flight and cluster phases? I > > couldn't find anything on ci.apache.org/projects/flink > > <http://ci.apache.org/projects/flink> and I think this behaviour should > > be documented as a warning/note. > > > > > > On Thu, Oct 22, 2020 at 6:44 PM Timo Walther <[email protected] > > <mailto:[email protected]>> wrote: > > > > Hi Manas, > > > > you can use static variable but you need to make sure that the logic > to > > fill the static variable is accessible and executed in all JVMs. > > > > I assume `pipeline.properties` is in your JAR that you submit to the > > cluster right? Then you should be able to access it through a > singleton > > pattern instead of a static variable access. > > > > Regards, > > Timo > > > > > > On 22.10.20 14:17, Manas Kale wrote: > > > Sorry, I messed up the code snippet in the earlier mail. The > > correct one > > > is : > > > > > > public static void main(String[] args) { > > > Properties prop =new Properties(); > > > > > > InputStream is = > > > Config.class.getClassLoader().getResourceAsStream("pipeline.properties"); > > > prop.load(is); > > > > > > HashMap<String, String> strMap =new HashMap<>(); > > > > > > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC")); > > > > > > new Config(strMap); > > > > > > ... > > > > > > } > > > > > > public class Config { > > > > > > public static StringCONFIG_TOPIC; > > > > > > publicConfig(HashMap<String, String> s) { > > > > > > CONFIG_TOPIC = s.get("CONFIG_TOPIC"); > > > > > > } > > > > > > } > > > > > > The value of CONFIG_TOPIC in a minicluster is properly loaded but > > null > > > when run on a cluster. > > > > > > > > > On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <[email protected] > > <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>>> > wrote: > > > > > > Hi Timo, > > > Thank you for the explanation, I can start to see why I was > > getting > > > an exception. > > > Are you saying that I cannot use static variables at all when > > trying > > > to deploy to a cluster? I would like the variables to remain > > static > > > and not be instance-bound as they are accessed from multiple > > classes. > > > Based on my understanding of what you said, I implemented the > > > following pattern: > > > > > > public static void main(String[] args) { > > > Properties prop =new Properties(); > > > > > > InputStream is = > > > Config.class.getClassLoader().getResourceAsStream("pipeline.properties"); > > > prop.load(is); > > > > > > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC")); > > > > > > new Config(strMap, longMap); > > > > > > ... > > > > > > } > > > > > > public class Config { > > > > > > public static StringCONFIG_TOPIC; > > > public static StringCONFIG_KAFKA; > > > > > > public Config(HashMap<String, String> s) { > > > CONFIG_TOPIC = s.get("CONFIG_TOPIC"); > > > CONFIG_KAFKA = s.get("CONFIG_KAFKA"); > > > > > > } > > > > > > } > > > > > > This produces the same issue. With the easier solution that > you > > > listed, are you implying I use multiple instances or a > singleton > > > pattern of some sort? > > > > > > On Thu, Oct 22, 2020 at 1:23 PM Timo Walther > > <[email protected] <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>>> > wrote: > > > > > > Hi Manas, > > > > > > you need to make sure to differentiate between what Flink > > calls > > > "pre-flight phase" and "cluster phase". > > > > > > The pre-flight phase is were the pipeline is constructed > > and all > > > functions are instantiated. They are then later > > serialized and > > > send to > > > the cluster. > > > > > > If you are reading your properties file in the `main()` > > method > > > and store > > > something in static variables, the content is available > > locally > > > where > > > the pipeline is constructed (e.g. in the client) but when > the > > > function > > > instances are send to the cluster. Those static variables > > are fresh > > > (thus empty) in the cluster JVMs. You need to either make > > sure > > > that the > > > properties file is read from each task manager again, or > > easier: > > > pass > > > the parameters as constructor parameters into the > > instances such > > > that > > > they are shipped together with the function itself. > > > > > > I hope this helps. > > > > > > Regards, > > > Timo > > > > > > > > > On 22.10.20 09:24, Manas Kale wrote: > > > > Hi, > > > > I am trying to write some data to a kafka topic and I > have > > > the following > > > > situation: > > > > > > > > monitorStateStream > > > > > > > > .process(new > > > > > > IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER) > > > > > > > > /... // Stream that outputs elements of type > > IDAP2Alarm/ > > > > > > > > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA, > > > > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK); > > > > > > > > private static <T extends IDAP2JSONOutput> > > > FlinkKafkaProducer<T> getFlinkKafkaProducer(String > servers, > > > String topic) { > > > > Properties properties =new Properties(); > > > > properties.setProperty("bootstrap.servers", > servers); > > > > return new FlinkKafkaProducer<T>(topic, > > > > (element, timestamp) -> > > element.serializeForKafka(), > > > > properties, > > > > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); > > > > } > > > > > > > > /* > > > > This interface is used to indicate that a class may be > > output > > > to Kafka. > > > > Since Kafka treats all > > > > data as bytes, classes that implement this interface > > have to > > > provide an > > > > implementation for the > > > > serializeForKafka() method. > > > > */ > > > > public interface IDAP2JSONOutput { > > > > > > > > // Implement serialization logic in this method. > > > > ProducerRecord<byte[],byte[]> serializeForKafka(); > > > > > > > > } > > > > > > > > public class IDAP2Alarmextends Tuple5<...>implements > > > IDAP2JSONOutput{ > > > > > > > > private final LoggerLOGGER = > > > LoggerFactory.getLogger(IDAP2Alarm.class); > > > > > > > > @Override > > > > public ProducerRecord<byte[],byte[]> > serializeForKafka() { > > > > byte[] rawValue; > > > > byte[] rawKey; > > > > String k = > getMonitorFeatureKey().getMonitorName() ; > > > > ... > > > > > > > > rawValue = val.getBytes(); > > > > > > > > LOGGER.info("value of alarms topic from idap2 > > alarm : " + > > > > Config.ALARMS_TOPIC); > > > > > > > > return new ProducerRecord<>(Config.ALARMS_TOPIC, > rawKey, > > > rawValue); // Line 95 > > > > } > > > > > > > > } > > > > > > > > > > > > Config.ALARMS_TOPIC is a static string that is read > from a > > > properties > > > > file. When I run this code on my IDE minicluster, it > runs > > > great with no > > > > problems. But when I submit it as a jar to the > > cluster, I get > > > the > > > > following error: > > > > > > > > Caused by: java.lang.IllegalArgumentException: Topic > > cannot > > > be null. > > > > at > > > > > > > > > > > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71) > > > > > > > ~[flink_POC-0.1.jar:?] > > > > at > > > > > > > > > > > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133) > > > > > > > ~[flink_POC-0.1.jar:?] > > > > * at > > > > > > > > > > flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95) > > > > > > > ~[flink_POC-0.1.jar:?]* > > > > at > > > > > > > > > > > flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62) > > > > > > > ~[flink_POC-0.1.jar:?] > > > > at > > > > > > > > > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854) > > > > > > > ~[flink_POC-0.1.jar:?] > > > > at > > > > > > > > > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) > > > > > > > ~[flink_POC-0.1.jar:?] > > > > at > > > > > > > > > > > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > *at > > > > > > > > > > flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69) > > > > > > > ~[flink_POC-0.1.jar:?]* > > > > * at > > > > > > > > > > flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25) > > > > > > > ~[flink_POC-0.1.jar:?]* > > > > at > > > > > > > > > > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io> > > > <http://org.apache.flink.streaming.runtime.io> > > > > > > > > > <http://org.apache.flink.streaming.runtime.io > >.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io> > > > <http://org.apache.flink.streaming.runtime.io> > > > > > > > > > <http://org.apache.flink.streaming.runtime.io > >.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io> > > > <http://org.apache.flink.streaming.runtime.io> > > > > > > > > > <http://org.apache.flink.streaming.runtime.io > >.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > > > > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at > > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > > > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > > > > at java.lang.Thread.run(Thread.java:748) > > ~[?:1.8.0_242] > > > > > > > > Apparently Config.ALARM_TOPIC is being evaluated as > null. > > > Also, the > > > > LOGGER statement in IDAP2Alarm above is never printed > when > > > running on > > > > Flink cluster. In order to verify if the correct value > of > > > > Config.ALARM_TOPIC is read from configuration file, I > > printed > > > it from > > > > Config class - and it prints correctly. So my > > questions are: > > > > > > > > * Why does this work on a minicluster but not when > > > submitted as a jar > > > > to a normal cluster? I am using Flink v1.11.0 in > > both my > > > POM file > > > > and the cluster runtime. > > > > * Why does the LOGGER line not get printed even > though > > > execution > > > > definitely reached it (as seen from the > stacktrace)? > > > > > > > > Thank you, > > > > Manas Kale > > > > > > >
