Basically, I am trying to avoid writing code like:

      switch( key ) {
                case "key.deserializer" :  result.put(key ,
Class.forName(value)); break;
                case "key.serializer"   :  result.put(key ,
Class.forName(value)); break;
                case "value.deserializer" :  result.put(key ,
Class.forName(value)); break;
                case "value.serializer"   :  result.put(key ,
Class.forName(value)); break;
                case "max.partition.fetch.bytes" : result.put(key,
Long.valueOf(value)); break;
                case "max.poll.interval.ms" : result.put(key,
Long.valueOf(value)); break;
                case "enable.auto.commit" : result.put(key,
Boolean.valueOf(value)); break;
                default:
                    result.put(key, value);
                    break;
            }

since I would need to go over all possible Kafka properties that are not
expected as a String.

On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain <cerebrotecnolog...@gmail.com>
wrote:

> On page https://spark.apache.org/docs/latest/streaming-kafka-0-
> 10-integration.html
> there is this Java example:
>
> Map<String, Object> kafkaParams = new 
> HashMap<>();kafkaParams.put("bootstrap.servers", 
> "localhost:9092,anotherhost:9092");kafkaParams.put("key.deserializer", 
> StringDeserializer.class);kafkaParams.put("value.deserializer", 
> StringDeserializer.class);kafkaParams.put("group.id", 
> "use_a_separate_group_id_for_each_stream");kafkaParams.put("auto.offset.reset",
>  "latest");kafkaParams.put("enable.auto.commit", false);
> Collection<String> topics = Arrays.asList("topicA", "topicB");
> JavaInputDStream<ConsumerRecord<String, String>> stream =
>   KafkaUtils.createDirectStream(
>     streamingContext,
>     LocationStrategies.PreferConsistent(),
>     ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
>   );
>
> I would like to configure Kafka from properties loaded from a Properties
> file or a Map<String, String>.
>
> Is there any API to take a Map<String, String> and produce the required
> Map<String, Object> required to set the Kafka parameters ? Such code would
> convert "true" to a boolean, or a class name to the Class depending on the
> key.
>
> Seems to me that I would need to know ALL possible Kafka parameters and
> what data type they should be converted to in order to produce the
> Map<String, Object> kafkaParams.
>
> The older API used a Map<String, String> passed to the
> KafkaUtils.createDirectStream
>
> Thanks
>
>
>
>
>
>
>

Reply via email to