I am new to Flink and doing a POC on it and using it to read data from kafka 
topic and to store it in files on server. I am using FileSink to store files, 
it creates the directory structure date and time wise but no logs files are 
getting created.

When i run the program it creates directory structure as below but log files 
are not getting stored here.


/flink/testlogs/2021-12-08--07

/flink/testlogs/2021-12-08--06

I want the log files should be written every 15 mins to a new log file. Below 
is the code.


DataStream <String> kafkaTopicData = env.addSource(new 
FlinkKafkaConsumer<String>("MyTopic",new SimpleStringSchema(),p));



OutputFileConfig config = OutputFileConfig

                 .builder()

                 .withPartPrefix("prefix")

                 .withPartSuffix(".ext")

                 .build();



DataStream <Tuple6 < String,String,String ,String, String ,Integer >> 
newStream=kafkaTopicData.map(new LogParser());



final FileSink<Tuple6<String, String, String, String, String, Integer>> sink = 
FileSink.forRowFormat(new Path("/flink/testlogs"),

                  new SimpleStringEncoder < Tuple6 < String,String,String 
,String, String ,Integer >> ("UTF-8"))

                .withRollingPolicy(DefaultRollingPolicy.builder()

                        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))

                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))

                        .withMaxPartSize(1024 * 1024 * 1024)

                        .build())

                .withOutputFileConfig(config)

                .build();



        newStream.sinkTo(sink);



env.execute("DataReader");



LogParser returns Tuple6.





Regards,


Reply via email to