Hi Sudhansu, Besides to the Caizhi's suggestion, could you also have a check if the controlStream could emit records normally by adding some kind of sinks or logs ?
Best, Yun ------------------------------------------------------------------ From:Caizhi Weng <tsreape...@gmail.com> Send Time:2021 Nov. 23 (Tue.) 10:22 To:sudhansu jena <sudhansu.jena...@gmail.com> Cc:user <user@flink.apache.org> Subject:Re: Broadcasting feature not working Hi! Could you share your complete user code? How is this data stream used? From your description it seems that it is broadcasted to a main stream. sudhansu jena <sudhansu.jena...@gmail.com> 于2021年11月23日周二 上午1:12写道: Hi Team, We are experiencing a very weird issue recently which relates to the flink state broadcasting feature. The issue is sometimes we see the Flink Job is not pulling the control stream upon job submission. The below code snippet which is responsible to pull the configs stored in a S3 bucket periodically forever. private DataStream<String> buildProducerControlStreamContinuously( StreamExecutionEnvironment streamExecutionEnvironment, ParameterTool configParams) { int s3PathMonitoringInterval = configParams.getInt(AppConstant.S3_PRODUCER_CONFIG_PATH_MONITORING_INTERVAL); String s3ProducerConfigPath = configParams.get(AppConstant.S3_PRODUCER_CONFIG_PATH); return streamExecutionEnvironment.readFile(new TextInputFormat(new Path(s3ProducerConfigPath)), s3ProducerConfigPath, FileProcessingMode.PROCESS_CONTINUOUSLY, s3PathMonitoringInterval); } Can you please let us know what could be the cause of this issue ?