Hi!

Could you share your complete user code? How is this data stream used? From
your description it seems that it is broadcasted to a main stream.

sudhansu jena <sudhansu.jena...@gmail.com> 于2021年11月23日周二 上午1:12写道:

> Hi Team,
>
> We are experiencing a very weird issue recently which relates to the flink
> state broadcasting feature.
>
> The issue is sometimes we see the Flink Job is not pulling the control
> stream upon job submission.
>
> The below code snippet which is responsible to pull the configs stored in
> a S3 bucket periodically forever.
>
>
> private DataStream<String> buildProducerControlStreamContinuously(
>       StreamExecutionEnvironment streamExecutionEnvironment, ParameterTool
> configParams) {
>
>     int s3PathMonitoringInterval =
> configParams.getInt(AppConstant.S3_PRODUCER_CONFIG_PATH_MONITORING_INTERVAL);
>     String s3ProducerConfigPath =
> configParams.get(AppConstant.S3_PRODUCER_CONFIG_PATH);
>
>     return streamExecutionEnvironment.readFile(new TextInputFormat(new
> Path(s3ProducerConfigPath)),
>         s3ProducerConfigPath,
>         *FileProcessingMode.PROCESS_CONTINUOUSLY,*
>         s3PathMonitoringInterval);
>   }
>
>  Can you please let us know what could be the cause of this issue ?
>
>
>
>

Reply via email to