I am new to Flink and doing a POC on it and using it to read data from kafka topic and to store it in files on server. I am using FileSink to store files, it creates the directory structure date and time wise but no logs files are getting created.
When i run the program it creates directory structure as below but log files are not getting stored here. /flink/testlogs/2021-12-08--07 /flink/testlogs/2021-12-08--06 I want the log files should be written every 15 mins to a new log file. Below is the code. DataStream <String> kafkaTopicData = env.addSource(new FlinkKafkaConsumer<String>("MyTopic",new SimpleStringSchema(),p)); OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("prefix") .withPartSuffix(".ext") .build(); DataStream <Tuple6 < String,String,String ,String, String ,Integer >> newStream=kafkaTopicData.map(new LogParser()); final FileSink<Tuple6<String, String, String, String, String, Integer>> sink = FileSink.forRowFormat(new Path("/flink/testlogs"), new SimpleStringEncoder < Tuple6 < String,String,String ,String, String ,Integer >> ("UTF-8")) .withRollingPolicy(DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build()) .withOutputFileConfig(config) .build(); newStream.sinkTo(sink); env.execute("DataReader"); LogParser returns Tuple6. Regards,