Hi Timo,
I figured it out, thanks a lot for your help.
Are there any articles detailing the pre-flight and cluster phases? I
couldn't find anything on ci.apache.org/projects/flink
<http://ci.apache.org/projects/flink> and I think this behaviour should
be documented as a warning/note.
On Thu, Oct 22, 2020 at 6:44 PM Timo Walther <[email protected]
<mailto:[email protected]>> wrote:
Hi Manas,
you can use static variable but you need to make sure that the logic to
fill the static variable is accessible and executed in all JVMs.
I assume `pipeline.properties` is in your JAR that you submit to the
cluster right? Then you should be able to access it through a singleton
pattern instead of a static variable access.
Regards,
Timo
On 22.10.20 14:17, Manas Kale wrote:
> Sorry, I messed up the code snippet in the earlier mail. The
correct one
> is :
>
> public static void main(String[] args) {
> Properties prop =new Properties();
>
> InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> prop.load(is);
>
> HashMap<String, String> strMap =new HashMap<>();
>
> strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
>
> new Config(strMap);
>
> ...
>
> }
>
> public class Config {
>
> public static StringCONFIG_TOPIC;
>
> publicConfig(HashMap<String, String> s) {
>
> CONFIG_TOPIC = s.get("CONFIG_TOPIC");
>
> }
>
> }
>
> The value of CONFIG_TOPIC in a minicluster is properly loaded but
null
> when run on a cluster.
>
>
> On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <[email protected]
<mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>
> Hi Timo,
> Thank you for the explanation, I can start to see why I was
getting
> an exception.
> Are you saying that I cannot use static variables at all when
trying
> to deploy to a cluster? I would like the variables to remain
static
> and not be instance-bound as they are accessed from multiple
classes.
> Based on my understanding of what you said, I implemented the
> following pattern:
>
> public static void main(String[] args) {
> Properties prop =new Properties();
>
> InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> prop.load(is);
>
> strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
>
> new Config(strMap, longMap);
>
> ...
>
> }
>
> public class Config {
>
> public static StringCONFIG_TOPIC;
> public static StringCONFIG_KAFKA;
>
> public Config(HashMap<String, String> s) {
> CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> CONFIG_KAFKA = s.get("CONFIG_KAFKA");
>
> }
>
> }
>
> This produces the same issue. With the easier solution that you
> listed, are you implying I use multiple instances or a singleton
> pattern of some sort?
>
> On Thu, Oct 22, 2020 at 1:23 PM Timo Walther
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>
> Hi Manas,
>
> you need to make sure to differentiate between what Flink
calls
> "pre-flight phase" and "cluster phase".
>
> The pre-flight phase is were the pipeline is constructed
and all
> functions are instantiated. They are then later
serialized and
> send to
> the cluster.
>
> If you are reading your properties file in the `main()`
method
> and store
> something in static variables, the content is available
locally
> where
> the pipeline is constructed (e.g. in the client) but when the
> function
> instances are send to the cluster. Those static variables
are fresh
> (thus empty) in the cluster JVMs. You need to either make
sure
> that the
> properties file is read from each task manager again, or
easier:
> pass
> the parameters as constructor parameters into the
instances such
> that
> they are shipped together with the function itself.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 22.10.20 09:24, Manas Kale wrote:
> > Hi,
> > I am trying to write some data to a kafka topic and I have
> the following
> > situation:
> >
> > monitorStateStream
> >
> > .process(new
>
IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
> >
> > /... // Stream that outputs elements of type
IDAP2Alarm/
> >
> > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
> > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
> >
> > private static <T extends IDAP2JSONOutput>
> FlinkKafkaProducer<T> getFlinkKafkaProducer(String servers,
> String topic) {
> > Properties properties =new Properties();
> > properties.setProperty("bootstrap.servers", servers);
> > return new FlinkKafkaProducer<T>(topic,
> > (element, timestamp) ->
element.serializeForKafka(),
> > properties,
> > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> > }
> >
> > /*
> > This interface is used to indicate that a class may be
output
> to Kafka.
> > Since Kafka treats all
> > data as bytes, classes that implement this interface
have to
> provide an
> > implementation for the
> > serializeForKafka() method.
> > */
> > public interface IDAP2JSONOutput {
> >
> > // Implement serialization logic in this method.
> > ProducerRecord<byte[],byte[]> serializeForKafka();
> >
> > }
> >
> > public class IDAP2Alarmextends Tuple5<...>implements
> IDAP2JSONOutput{
> >
> > private final LoggerLOGGER =
> LoggerFactory.getLogger(IDAP2Alarm.class);
> >
> > @Override
> > public ProducerRecord<byte[],byte[]> serializeForKafka() {
> > byte[] rawValue;
> > byte[] rawKey;
> > String k = getMonitorFeatureKey().getMonitorName() ;
> > ...
> >
> > rawValue = val.getBytes();
> >
> > LOGGER.info("value of alarms topic from idap2
alarm : " +
> > Config.ALARMS_TOPIC);
> >
> > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey,
> rawValue); // Line 95
> > }
> >
> > }
> >
> >
> > Config.ALARMS_TOPIC is a static string that is read from a
> properties
> > file. When I run this code on my IDE minicluster, it runs
> great with no
> > problems. But when I submit it as a jar to the
cluster, I get
> the
> > following error:
> >
> > Caused by: java.lang.IllegalArgumentException: Topic
cannot
> be null.
> > at
> >
>
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
>
> > ~[flink_POC-0.1.jar:?]
> > at
> >
>
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
>
> > ~[flink_POC-0.1.jar:?]
> > * at
> >
>
flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
>
> > ~[flink_POC-0.1.jar:?]*
> > at
> >
>
flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
>
> > ~[flink_POC-0.1.jar:?]
> > at
> >
>
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
>
> > ~[flink_POC-0.1.jar:?]
> > at
> >
>
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>
> > ~[flink_POC-0.1.jar:?]
> > at
> >
>
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > *at
> >
>
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
>
> > ~[flink_POC-0.1.jar:?]*
> > * at
> >
>
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
>
> > ~[flink_POC-0.1.jar:?]*
> > at
> >
>
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>
> <http://org.apache.flink.streaming.runtime.io>
> >
>
<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>
> <http://org.apache.flink.streaming.runtime.io>
> >
>
<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>
> <http://org.apache.flink.streaming.runtime.io>
> >
>
<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> >
>
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
>
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > at java.lang.Thread.run(Thread.java:748)
~[?:1.8.0_242]
> >
> > Apparently Config.ALARM_TOPIC is being evaluated as null.
> Also, the
> > LOGGER statement in IDAP2Alarm above is never printed when
> running on
> > Flink cluster. In order to verify if the correct value of
> > Config.ALARM_TOPIC is read from configuration file, I
printed
> it from
> > Config class - and it prints correctly. So my
questions are:
> >
> > * Why does this work on a minicluster but not when
> submitted as a jar
> > to a normal cluster? I am using Flink v1.11.0 in
both my
> POM file
> > and the cluster runtime.
> > * Why does the LOGGER line not get printed even though
> execution
> > definitely reached it (as seen from the stacktrace)?
> >
> > Thank you,
> > Manas Kale
>