I'm not sure what you're asking.
If you have a Deserialization schema that convert the data into a Map
you're done as I understand it, what do you believe to be missing?
If, for a given job, the number/types of fields are fixed you could look
into using Row.
On 01/05/2019 22:40, Vijay Balakrishnan wrote:
Hi,
Had asked this questions earlier as topic - "Flink - Type Erasure
Exception trying to use Tuple6 instead of Tuple"
Having issues defining a generic Tuple instead of a specific
Tuple1,Tuple2 etc.
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Usage of
class Tuple as a type is not allowed. Use a concrete subclass (e.g.
Tuple1, Tuple2, etc.) instead.
DataStream<Map<String, Object>> kinesisStream = ...;
KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream =
kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<=====
complains about Tuple type for monitoringTupleKeyedStream
.....
public static class MapTupleKeySelector implements
KeySelector<Map<String, Object>, Tuple> {
private final Set<String> groupBySet;
public MapTupleKeySelector(Set<String> groupBySet) {
this.groupBySet = groupBySet;
}
@Override
public Tuple getKey(Map<String, Object> inputMap) throws
Exception {
int groupBySetSize = groupBySet.size();
Tuple tuple = Tuple.newInstance(groupBySetSize);
//Tuple1 tuple = new Tuple1();
int count = 0;
for (String groupBy : groupBySet) {
tuple.setField(groupByValue, count++);
}
return tuple;
}
}
Abhishek had replied back in the Thread as follows: (posting in that
thread as well creating a new thread):
However, If you are trying to build some generic framework and for
different streams, there would be different fields, you can follow the
Map approach. For the latter approach, you need to write extra mapper
class which will convert all the fields in the stream to the Map based
stream.
Can I get an example of how to create this extra Mapper class ?
Currently, I am using deserialization to convert the incoming byte[]
by implementing KinesisDeserializationSchema<Map<String, Object>> to
convert to a DataStream<Map<String, Object>> kinesisStream.
TIA,