Sorry, I messed up the code snippet in the earlier mail. The correct one is
:

public static void main(String[] args) {
       Properties prop = new Properties();

InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
prop.load(is);

HashMap<String, String> strMap = new HashMap<>();

strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

new Config(strMap);

...

}

public class Config {

public static String CONFIG_TOPIC;

public Config(HashMap<String, String> s) {

    CONFIG_TOPIC = s.get("CONFIG_TOPIC");

}

}

The value of CONFIG_TOPIC in a minicluster is properly loaded but null when
run on a cluster.


On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <[email protected]> wrote:

> Hi Timo,
> Thank you for the explanation, I can start to see why I was getting an
> exception.
> Are you saying that I cannot use static variables at all when trying to
> deploy to a cluster? I would like the variables to remain static and not be
> instance-bound as they are accessed from multiple classes.
> Based on my understanding of what you said, I implemented the
> following pattern:
>
> public static void main(String[] args) {
>        Properties prop = new Properties();
>
> InputStream is = 
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> prop.load(is);
>
> strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
>
> new Config(strMap, longMap);
>
> ...
>
> }
>
> public class Config {
>
> public static String CONFIG_TOPIC;
> public static String CONFIG_KAFKA;
>
> public Config(HashMap<String, String> s) {
>     CONFIG_TOPIC = s.get("CONFIG_TOPIC");
>     CONFIG_KAFKA = s.get("CONFIG_KAFKA");
>
> }
>
> }
>
> This produces the same issue. With the easier solution that you listed,
> are you implying I use multiple instances or a singleton pattern of some
> sort?
>
> On Thu, Oct 22, 2020 at 1:23 PM Timo Walther <[email protected]> wrote:
>
>> Hi Manas,
>>
>> you need to make sure to differentiate between what Flink calls
>> "pre-flight phase" and "cluster phase".
>>
>> The pre-flight phase is were the pipeline is constructed and all
>> functions are instantiated. They are then later serialized and send to
>> the cluster.
>>
>> If you are reading your properties file in the `main()` method and store
>> something in static variables, the content is available locally where
>> the pipeline is constructed (e.g. in the client) but when the function
>> instances are send to the cluster. Those static variables are fresh
>> (thus empty) in the cluster JVMs. You need to either make sure that the
>> properties file is read from each task manager again, or easier: pass
>> the parameters as constructor parameters into the instances such that
>> they are shipped together with the function itself.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> On 22.10.20 09:24, Manas Kale wrote:
>> > Hi,
>> > I am trying to write some data to a kafka topic and I have the
>> following
>> > situation:
>> >
>> > monitorStateStream
>> >
>> >     .process(new
>> IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
>> >
>> >     /... // Stream that outputs elements of type IDAP2Alarm/
>> >
>> > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
>> > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
>> >
>> > private static <T extends IDAP2JSONOutput> FlinkKafkaProducer<T>
>> getFlinkKafkaProducer(String servers, String topic) {
>> >     Properties properties =new Properties();
>> >     properties.setProperty("bootstrap.servers", servers);
>> >     return new FlinkKafkaProducer<T>(topic,
>> >           (element, timestamp) -> element.serializeForKafka(),
>> >           properties,
>> >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>> > }
>> >
>> > /*
>> > This interface is used to indicate that a class may be output to Kafka.
>> > Since Kafka treats all
>> > data as bytes, classes that implement this interface have to provide an
>> > implementation for the
>> > serializeForKafka() method.
>> > */
>> > public interface IDAP2JSONOutput {
>> >
>> >      // Implement serialization logic in this method.
>> > ProducerRecord<byte[],byte[]> serializeForKafka();
>> >
>> > }
>> >
>> > public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{
>> >
>> > private final LoggerLOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);
>> >
>> > @Override
>> > public ProducerRecord<byte[],byte[]> serializeForKafka() {
>> >      byte[] rawValue;
>> >      byte[] rawKey;
>> >      String k = getMonitorFeatureKey().getMonitorName() ;
>> >      ...
>> >
>> >      rawValue = val.getBytes();
>> >
>> >      LOGGER.info("value of alarms topic from idap2 alarm : " +
>> > Config.ALARMS_TOPIC);
>> >
>> > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, rawValue); //
>> Line 95
>> > }
>> >
>> > }
>> >
>> >
>> > Config.ALARMS_TOPIC is a static string that is read from a properties
>> > file. When I run this code on my IDE minicluster, it runs great with no
>> > problems. But when I submit it as a jar to the cluster, I get the
>> > following error:
>> >
>> > Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
>> >      at
>> >
>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
>>
>> > ~[flink_POC-0.1.jar:?]
>> >      at
>> >
>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
>>
>> > ~[flink_POC-0.1.jar:?]
>> > *    at
>> >
>> flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
>> > ~[flink_POC-0.1.jar:?]*
>> >      at
>> >
>> flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
>>
>> > ~[flink_POC-0.1.jar:?]
>> >      at
>> >
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
>>
>> > ~[flink_POC-0.1.jar:?]
>> >      at
>> >
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>>
>> > ~[flink_POC-0.1.jar:?]
>> >      at
>> >
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> > *at
>> >
>> flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
>> > ~[flink_POC-0.1.jar:?]*
>> > *    at
>> >
>> flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
>> > ~[flink_POC-0.1.jar:?]*
>> >      at
>> >
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at org.apache.flink.streaming.runtime.io
>> > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at org.apache.flink.streaming.runtime.io
>> > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at org.apache.flink.streaming.runtime.io
>> > <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
>> >
>> > Apparently Config.ALARM_TOPIC is being evaluated as null. Also, the
>> > LOGGER statement in IDAP2Alarm above is never printed when running on
>> > Flink cluster. In order to verify if the correct value of
>> > Config.ALARM_TOPIC is read from configuration file, I printed it from
>> > Config class - and it prints correctly. So my questions are:
>> >
>> >   * Why does this work on a minicluster but not when submitted as a jar
>> >     to a normal cluster? I am using Flink v1.11.0 in both my POM file
>> >     and the cluster runtime.
>> >   * Why does the LOGGER line not get printed even though execution
>> >     definitely reached it (as seen from the stacktrace)?
>> >
>> > Thank you,
>> > Manas Kale
>>
>>

Reply via email to