Hi all,

I've been using the KafkaSource API as opposed to the classic consumer and
things have been going well. I configured my source such that it could be
used in either a streaming or bounded mode, with the bounded approach
specifically aimed at improving testing (unit/integration).

I've noticed that when I attempt to run through a test - it seems that the
pipeline never acknowledges the "end" of the stream in a bounded context
and just runs forever and never makes it to my assert.

Does anything look glaringly wrong with how the source is being defined?

object KafkaEventSource {

    fun withParameters(parameters: ParameterTool): KafkaSource<Event> {
        val schemaRegistryUrl = parameters.getRequired("schema.registry.url")

        val builder = KafkaSource.builder<Event>()
            .setBootstrapServers(parameters.getRequired("bootstrap.servers"))
            .setGroupId(parameters.getRequired("group.id"))
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setProperty("schema.registry.url", schemaRegistryUrl)
            .setTopics(parameters.getRequired("topic"))
            .setDeserializer(EventDeserializer(schemaRegistryUrl))

        if (parameters.getBoolean("bounded", false)) {
            builder.setBounded(OffsetsInitializer.latest())
        }

        return builder.build()
    }
}

I can verify that the generated source has it's boundedness set properly
and all of the configuration options are correct.

My test itself is fairly simple and can be broken down as follows:

   1. Inject records into a Kafka Topic
   2. Initialize my Flink job using all of my testing parameters
   3. Apply my assertion (in this case verifying that a JdbcSink wrote to a
   specific database)

@Test
fun `Example `(){
    // Arrange
    val events = getTestEvents()
    sendToKafka(events, parameters)

    // Act
    EntityIdentificationJob.run(parameters)

    // Assert
    val users = queryCount("SELECT * FROM users", connection)
    assertEquals(1, users)
}

Where my job itself is broken down further and reads from the source,
performs a process function into multiple side outputs and writes each of
them to a distinct JdbcSink based on the type:

@JvmStatic
fun main(args: Array<String>) {
    val parameters = loadParams(args)
    val stream = StreamExecutionEnvironment.getExecutionEnvironment()

    // Read from Kafka
    val entities = stream
       .fromSource(KafkaEventSource.withParameters(parameters),
WatermarkStrategy.noWatermarks(), "kafka")
       .process(IdentifyEntitiesFunction())

    // Write out each tag to its respective sink
    for (entityType in EntityTypes.all) {
        entities
            .getSideOutput(entityType)
            .addSink(PostgresEntitySink.withEntity(entityType.typeInfo,
parameters))
    }

    stream.execute(parameters.getRequired("application"))
}

I can verify in the logs that my sink is being executed and writing to the
appropriate database, however the job itself never finishes. I've tried it
using a single Kafka partition as well as multiple partitions and even
commented out the logic related to writing to the database. It still just
seems to run ... forever.

Any recommendations? Perhaps there's a bad configuration or setting that
isn't being used as intended?

Thanks,

Rion

Reply via email to