Hi! Could you share your complete user code? How is this data stream used? From your description it seems that it is broadcasted to a main stream.
sudhansu jena <sudhansu.jena...@gmail.com> 于2021年11月23日周二 上午1:12写道: > Hi Team, > > We are experiencing a very weird issue recently which relates to the flink > state broadcasting feature. > > The issue is sometimes we see the Flink Job is not pulling the control > stream upon job submission. > > The below code snippet which is responsible to pull the configs stored in > a S3 bucket periodically forever. > > > private DataStream<String> buildProducerControlStreamContinuously( > StreamExecutionEnvironment streamExecutionEnvironment, ParameterTool > configParams) { > > int s3PathMonitoringInterval = > configParams.getInt(AppConstant.S3_PRODUCER_CONFIG_PATH_MONITORING_INTERVAL); > String s3ProducerConfigPath = > configParams.get(AppConstant.S3_PRODUCER_CONFIG_PATH); > > return streamExecutionEnvironment.readFile(new TextInputFormat(new > Path(s3ProducerConfigPath)), > s3ProducerConfigPath, > *FileProcessingMode.PROCESS_CONTINUOUSLY,* > s3PathMonitoringInterval); > } > > Can you please let us know what could be the cause of this issue ? > > > >