Hi Sudhansu,

Besides to the Caizhi's suggestion, could you also have a check if the 
controlStream could emit records normally by adding some kind of sinks or logs 
? 

Best,
Yun


------------------------------------------------------------------
From:Caizhi Weng <tsreape...@gmail.com>
Send Time:2021 Nov. 23 (Tue.) 10:22
To:sudhansu jena <sudhansu.jena...@gmail.com>
Cc:user <user@flink.apache.org>
Subject:Re: Broadcasting feature not working

Hi!

Could you share your complete user code? How is this data stream used? From 
your description it seems that it is broadcasted to a main stream.
sudhansu jena <sudhansu.jena...@gmail.com> 于2021年11月23日周二 上午1:12写道:

Hi Team,

We are experiencing a very weird issue recently which relates to the flink 
state broadcasting feature.

The issue is sometimes we see the Flink Job is not pulling the control stream 
upon job submission.

The below code snippet which is responsible to pull the configs stored in a S3 
bucket periodically forever.


private DataStream<String> buildProducerControlStreamContinuously(
      StreamExecutionEnvironment streamExecutionEnvironment, ParameterTool 
configParams) {

    int s3PathMonitoringInterval = 
configParams.getInt(AppConstant.S3_PRODUCER_CONFIG_PATH_MONITORING_INTERVAL);
    String s3ProducerConfigPath = 
configParams.get(AppConstant.S3_PRODUCER_CONFIG_PATH);

    return streamExecutionEnvironment.readFile(new TextInputFormat(new 
Path(s3ProducerConfigPath)),
        s3ProducerConfigPath,
FileProcessingMode.PROCESS_CONTINUOUSLY,
        s3PathMonitoringInterval);
  }

 Can you please let us know what could be the cause of this issue ?  



Reply via email to