Hello, I am using Flink version:1.14.2 I have a pipeline in which I want to fetch data from kafka and write to S3. The fetch from kafka needs to be bound from timestamp t1 to timestamp t1+n. I intend to run this in batch mode, but a streaming pipeline which is scheduled to start on trigger and stop when the offset is reached is just fine as well. The issue I have is, when I set the bounded to `OffsetsInitializer.latest()` everything works just fine. But when I try to read between timestamps as follows:
``` KafkaSource.<T>builder() .setBootstrapServers(resolvedBootstrapBroker) .setTopics(List.of("INGEST_METERING_2")) .setGroupId(consumerGroupId) .setStartingOffsets( OffsetsInitializer.timestamp(Instant.now().minus(10, ChronoUnit.HOURS).toEpochMilli())) .setValueOnlyDeserializer(deserializationSchema) .setBounded(OffsetsInitializer.timestamp(Instant.now().minus(10, ChronoUnit.MINUTES).toEpochMilli())) .setProperties(additionalProperties) .build(); ``` The following is what happens: * When I run it in batch mode, the output in S3 is a partial file and it is never completed. * When I run it in streaming mode, (because of checkpoints) the partial file does complete. But the pipeline is not terminated. As I mentioned, in the exact same code as above, if I just `setBounded` to `OffsetsInitializer.latest()` everything works fine. My pipeline is as follows: ``` final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE); Configuration config = new Configuration(); config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); env.setRestartStrategy(RestartStrategies.noRestart()); env.configure(config); final KafkaSource<MeteringEventsWrapper> kafkaSource = getKafkaSource(); final FileSink<MeteringEvent> fileSink = getFileSink(); env.fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source") .sinkTo(fileSink); ``` And my sink is as follows: ``` FileSink .forRowFormat(sinkPath, encoder) .withOutputFileConfig( OutputFileConfig.builder().withPartPrefix(PREFIX).withPartSuffix(SUFFIX).build()) .withBucketAssigner(bucketAssigner) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); ``` >From the logs, I can see that the SplitFetcher receives a terminal signal when run with latest offsets as follows: ```2022-08-29 14:28:06,036 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 0 because it is idle. 2022-08-29 14:28:06,036 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0 ``` But the same does not happen when I run with an offset that I provide. Additional info: I have tried both timestamp offset and also manually fetching offsets from kafka and passing the offsets as a topicPartition to Long map. I have tried everything I know. Any help will be highly appreciated. Thanks, Vinod