Basically, I am trying to avoid writing code like: switch( key ) { case "key.deserializer" : result.put(key , Class.forName(value)); break; case "key.serializer" : result.put(key , Class.forName(value)); break; case "value.deserializer" : result.put(key , Class.forName(value)); break; case "value.serializer" : result.put(key , Class.forName(value)); break; case "max.partition.fetch.bytes" : result.put(key, Long.valueOf(value)); break; case "max.poll.interval.ms" : result.put(key, Long.valueOf(value)); break; case "enable.auto.commit" : result.put(key, Boolean.valueOf(value)); break; default: result.put(key, value); break; }
since I would need to go over all possible Kafka properties that are not expected as a String. On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain <cerebrotecnolog...@gmail.com> wrote: > On page https://spark.apache.org/docs/latest/streaming-kafka-0- > 10-integration.html > there is this Java example: > > Map<String, Object> kafkaParams = new > HashMap<>();kafkaParams.put("bootstrap.servers", > "localhost:9092,anotherhost:9092");kafkaParams.put("key.deserializer", > StringDeserializer.class);kafkaParams.put("value.deserializer", > StringDeserializer.class);kafkaParams.put("group.id", > "use_a_separate_group_id_for_each_stream");kafkaParams.put("auto.offset.reset", > "latest");kafkaParams.put("enable.auto.commit", false); > Collection<String> topics = Arrays.asList("topicA", "topicB"); > JavaInputDStream<ConsumerRecord<String, String>> stream = > KafkaUtils.createDirectStream( > streamingContext, > LocationStrategies.PreferConsistent(), > ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) > ); > > I would like to configure Kafka from properties loaded from a Properties > file or a Map<String, String>. > > Is there any API to take a Map<String, String> and produce the required > Map<String, Object> required to set the Kafka parameters ? Such code would > convert "true" to a boolean, or a class name to the Class depending on the > key. > > Seems to me that I would need to know ALL possible Kafka parameters and > what data type they should be converted to in order to produce the > Map<String, Object> kafkaParams. > > The older API used a Map<String, String> passed to the > KafkaUtils.createDirectStream > > Thanks > > > > > > >