I want to use Helidon SE 4.1.6 and produce the data to a specific partition
of Apache Kafka using the producer.

Detail : I have gone through the
https://helidon.io/docs/latest/se/reactive-messaging#_kafka_connectorpage
and written below code which is able to send the data using kafka producer
to the topic.

*Now I want to send the data to specific partition of kafka using Helidon
SE 4.1.6.*

Below code works and can produce data to a topic (No partitions):

public class KafkaProducer {

    public static void init() {

        String kafkaServer ="localhost:9092";
        String topic = "topic2";

        Channel<String> toKafka = Channel.<String>builder()
                .subscriberConfig(KafkaConnector.configBuilder()
                        .bootstrapServers(kafkaServer)
                        .topic(topic)
                        .keySerializer(StringSerializer.class)
                        .valueSerializer(StringSerializer.class)
                        .build())
                .build();

        KafkaConnector kafkaConnector = KafkaConnector.create();

        System.out.println("In producer: ");

        Messaging messaging = Messaging.builder()
                .publisher(toKafka, Multi.just("Test1",
"Test2").map(Message::of))
                .connector(kafkaConnector)
                .build()
                .start();

    }
}


I Tried below code, in below code I am creating producer record with
specific partition so that I can go to specified partition. But I
don't see data send specific partition.

To validate data I am running consumer on the topic.

public class KafkaProducerPar {

    public static void init() {

        String bootstrapServers = "localhost:9092"; // Replace with
your Kafka broker(s)
        String topic = "topic3";

        // Create a KafkaConnector
        Config config = Config.create();
        KafkaConnector kafkaConnector = KafkaConnector.create(config);

        // Create a Channel for producing messages
        Channel<ProducerRecord<String, String>> toKafka =
Channel.<ProducerRecord<String, String>>builder()
                .subscriberConfig(KafkaConnector.configBuilder()
                        .bootstrapServers(bootstrapServers)
                        .topic(topic)
                        .keySerializer(StringSerializer.class)
                        .valueSerializer(StringSerializer.class)
                        .build())
                .build();


        // Create a Messaging instance
        Messaging messaging = Messaging.builder()
                .publisher(toKafka,createMessageStream1(topic) )
                .connector(kafkaConnector)
                .build()
                .start();

        Messaging messaging2 = Messaging.builder()
                .publisher(toKafka, Multi.just(
                        Message.of(new ProducerRecord<>(topic, 0,
"key1", "Message for partition 0")),
                        Message.of(new ProducerRecord<>(topic, 1,
"key2", "Message for partition 1"))
                ))
                .connector(kafkaConnector)
                .build()
                .start();

    }
}


Please let me know if it is possible to even implement partitions feature
(that is produce to specific partition and consume from specific partition)
using the Helidon framework.

We have code already in place that uses core Java (non Helidon) to produce
and consume a specific partition. Should we just reuse that if Helidon
framework does not support this?

Thanks,

Subbu

Reply via email to