Hello,
I am using Flink version:1.14.2

I have a pipeline in which I want to fetch data from kafka and write to S3.
The fetch from kafka needs to be bound from timestamp t1 to timestamp t1+n.
I intend to run this in batch mode, but a streaming pipeline which is
scheduled to start on trigger and stop when the offset is reached is just
fine as well.
The issue I have is, when I set the bounded to
`OffsetsInitializer.latest()` everything works just fine. But when I try to
read between timestamps as follows:

```
KafkaSource.<T>builder()
                .setBootstrapServers(resolvedBootstrapBroker)
                .setTopics(List.of("INGEST_METERING_2"))
                .setGroupId(consumerGroupId)
                .setStartingOffsets(

OffsetsInitializer.timestamp(Instant.now().minus(10,
ChronoUnit.HOURS).toEpochMilli()))
                .setValueOnlyDeserializer(deserializationSchema)

.setBounded(OffsetsInitializer.timestamp(Instant.now().minus(10,
ChronoUnit.MINUTES).toEpochMilli()))
                .setProperties(additionalProperties)
                .build();
```

The following is what happens:
* When I run it in batch mode, the output in S3 is a partial file and it is
never completed.
* When I run it in streaming mode, (because of checkpoints) the partial
file does complete. But the pipeline is not terminated.

As I mentioned, in the exact same code as above, if I just  `setBounded` to
`OffsetsInitializer.latest()` everything works fine.


My pipeline is as follows:

```
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
            env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
            Configuration config = new Configuration();

config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
            env.setRestartStrategy(RestartStrategies.noRestart());
            env.configure(config);
            final KafkaSource<MeteringEventsWrapper> kafkaSource =
getKafkaSource();
            final FileSink<MeteringEvent> fileSink = getFileSink();
            env.fromSource(kafkaSource,
WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source")
                    .sinkTo(fileSink);
```

And my sink is as follows:

```
FileSink
                .forRowFormat(sinkPath, encoder)
                .withOutputFileConfig(

OutputFileConfig.builder().withPartPrefix(PREFIX).withPartSuffix(SUFFIX).build())
                .withBucketAssigner(bucketAssigner)
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .build();
```

>From the logs, I can see that the SplitFetcher receives a terminal signal
when run with latest offsets as follows:

```2022-08-29 14:28:06,036 INFO
 org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager
[] - Closing splitFetcher 0 because it is idle.
2022-08-29 14:28:06,036 INFO
 org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Shutting down split fetcher 0
```

But the same does not happen when I run with an offset that I provide.
Additional info: I have tried both timestamp offset and also manually
fetching offsets from kafka and passing the offsets as a topicPartition to
Long map.

I have tried everything I know. Any help will be highly appreciated.

Thanks,
Vinod

Reply via email to