Folks,
I use the following Streaming API from KafkaUtils :
public JavaPairInputDStream<String, String> inputDStream() {
HashSet<String> topicsSet = new
HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers);
return KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
}
I catch the messages using :JavaDStream<String> messages = inputDStream.map(new
Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
My problem is, each of these Kafka Topics stream in different message types.
How do I distinguish messages that are of type1, messages that are of type2,
..... ?
I tried the following:
private class ParseEvents<T> implements Function<String, T> {
final Class<T> parameterClass;
private ParseEvents(Class<T> parameterClass) {
this.parameterClass = parameterClass;
}
public T call(String message) throws Exception {
ObjectMapper mapper = new ObjectMapper();
T parsedMessage = null;
try {
parsedMessage = mapper.readValue(message, this.parameterClass);
} catch (Exception e1) {
logger.error("Ignoring Unknown Message %s", message);
}
return parsedMessage;
}
}JavaDStream<Type1> type1Events = messages.map(new
ParseEvents<Type1>(Type1.class));JavaDStream<Type2> type2Events =
messages.map(new ParseEvents<Type2>(Type2.class));JavaDStream<Type3>
type3Events = messages.map(new ParseEvents<Type3>(Type3.class));
But this does not work because type1 catches type2 messages and ignores them.
Is there a clean way of handling this ?