Hi all, I've been using the KafkaSource API as opposed to the classic consumer and things have been going well. I configured my source such that it could be used in either a streaming or bounded mode, with the bounded approach specifically aimed at improving testing (unit/integration).
I've noticed that when I attempt to run through a test - it seems that the pipeline never acknowledges the "end" of the stream in a bounded context and just runs forever and never makes it to my assert. Does anything look glaringly wrong with how the source is being defined? object KafkaEventSource { fun withParameters(parameters: ParameterTool): KafkaSource<Event> { val schemaRegistryUrl = parameters.getRequired("schema.registry.url") val builder = KafkaSource.builder<Event>() .setBootstrapServers(parameters.getRequired("bootstrap.servers")) .setGroupId(parameters.getRequired("group.id")) .setStartingOffsets(OffsetsInitializer.earliest()) .setProperty("schema.registry.url", schemaRegistryUrl) .setTopics(parameters.getRequired("topic")) .setDeserializer(EventDeserializer(schemaRegistryUrl)) if (parameters.getBoolean("bounded", false)) { builder.setBounded(OffsetsInitializer.latest()) } return builder.build() } } I can verify that the generated source has it's boundedness set properly and all of the configuration options are correct. My test itself is fairly simple and can be broken down as follows: 1. Inject records into a Kafka Topic 2. Initialize my Flink job using all of my testing parameters 3. Apply my assertion (in this case verifying that a JdbcSink wrote to a specific database) @Test fun `Example `(){ // Arrange val events = getTestEvents() sendToKafka(events, parameters) // Act EntityIdentificationJob.run(parameters) // Assert val users = queryCount("SELECT * FROM users", connection) assertEquals(1, users) } Where my job itself is broken down further and reads from the source, performs a process function into multiple side outputs and writes each of them to a distinct JdbcSink based on the type: @JvmStatic fun main(args: Array<String>) { val parameters = loadParams(args) val stream = StreamExecutionEnvironment.getExecutionEnvironment() // Read from Kafka val entities = stream .fromSource(KafkaEventSource.withParameters(parameters), WatermarkStrategy.noWatermarks(), "kafka") .process(IdentifyEntitiesFunction()) // Write out each tag to its respective sink for (entityType in EntityTypes.all) { entities .getSideOutput(entityType) .addSink(PostgresEntitySink.withEntity(entityType.typeInfo, parameters)) } stream.execute(parameters.getRequired("application")) } I can verify in the logs that my sink is being executed and writing to the appropriate database, however the job itself never finishes. I've tried it using a single Kafka partition as well as multiple partitions and even commented out the logic related to writing to the database. It still just seems to run ... forever. Any recommendations? Perhaps there's a bad configuration or setting that isn't being used as intended? Thanks, Rion