I have the Kafka confluent Document. But i cant understand the following line.
"It is important to understand that Kafka Streams is not a resource manager, but a library that “runs” anywhere its stream processing application runs. Multiple instances of the application are executed either on the same machine, or spread across multiple machines and tasks can be distributed automatically by the library <https://docs.confluent.io/current/streams/architecture.html#streams-architecture-threads> to those running application instances" i have tried to run on same machine with multiple JVM with multiple consumers. is it correct way to run on same machine using multiple consumers?? or is there any other way?? i have attached the code below
package kafka.examples.MultiConsumerMultipartition.taskConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import java.util.Collections; import java.util.Properties; import java.util.Random; import java.util.stream.IntStream; /** * Created by PravinKumar on 23/10/17. */ public class MultiPartitionMultiConsumerDriver { public static final String CONSUMER_GROUP_ID = "multipartitionmulticonsumerdriver2"; private static final int MAX_RECORDS=10000; public static void main(String[] args) throws InterruptedException { produceInput(); consumerOutput(); } public static Properties getConsumerProps() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MultiPartitionMultiConsumerUsingStream.BOOTSTRAP_SERVER); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass().getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.Long().deserializer().getClass().getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1"); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3"); return properties; } public static void produceInput(){ Random random=new Random(); String[] msg={"hi","my","name","is","pravin","kumar","studied","in","madras","institute","of","technology" ,"hi","my","name","is","pravin","kumar","studied","in","good","shepherd","school","properties","put" ,"ConsumerConfig","BOOTSTRAP","SERVERS","CONFIG","Single","Partition","MultiConsumer","UsingStream" , "BOOTSTRAP","SERVER","properties","put","StreamsConfig","DEFAULT","KEY","SERDE","CLASS","CONFIG" ,"Serdes","String","getClass","getName"}; Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MultiPartitionMultiConsumerUsingStream.BOOTSTRAP_SERVER); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serdes.String().serializer().getClass().getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Serdes.String().serializer().getClass().getName()); KafkaProducer<String,String> producer=new KafkaProducer<String, String>(producerProps); IntStream.range(0,MAX_RECORDS) .forEach(record ->producer.send(new ProducerRecord<String, String> (MultiPartitionMultiConsumerUsingStream.INPUT_TOPIC,null,msg[random.nextInt(msg.length)])));//msg[random.nextInt(msg.length)] producer.flush(); } public static void consumerOutput() throws InterruptedException { Properties consumerProps = getConsumerProps(); KafkaConsumer<String,Long> consumer = new KafkaConsumer<String, Long>(consumerProps); consumer.subscribe(Collections.singleton(MultiPartitionMultiConsumerUsingStream.OUTPUT_TOPIC)); while (true) { Thread.sleep(5_000); consumer.poll(Long.MAX_VALUE).forEach(ConsumerRecord -> System.out.println("Partition :"+ConsumerRecord.partition()+"Key : " + ConsumerRecord.key() + "Value : " + ConsumerRecord.value())); } } }
package kafka.examples.MultiConsumerMultipartition.taskConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import java.util.Properties; /** * Created by PravinKumar on 23/10/17. */ public class MultiPartitionMultiConsumerUsingStream { public static final String APPLICATION_ID="SingleConsumerMultiConsumerUsingStreamx5"; public static final String BOOTSTRAP_SERVER="localhost:9092"; public static final String INPUT_TOPIC="inputtopic23"; public static final String OUTPUT_TOPIC="outputtopic23"; public static void main(String[] args) { KafkaStreams streams=getStreams(); streams.cleanUp(); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } public static Properties getProps(){ Properties properties=new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,APPLICATION_ID); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName()); properties.put(StreamsConfig.STATE_DIR_CONFIG,"/home/admin/Documents/kafka_2.12.1.0.0/kafka-streams"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); return properties; } public static KafkaStreams getStreams(){ Properties props=getProps(); StreamsBuilder builder= new StreamsBuilder(); KStream<String, Long> inputStream = builder.<String,String>stream(INPUT_TOPIC) .map(((key, value) -> new KeyValue<>(value, value))) .groupByKey() .count() .toStream(); inputStream.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams kafkaStreams=new KafkaStreams(builder.build(),props); return kafkaStreams; } }