Hi Chesnay,
Sorry for causing the confusion. I solved the problem by following another
person's recommendation on the other post about using a wrapper POJO.
So, I used a wrapper MonitoringTuple to wrap the Tuple and that solved my
problem with varying number of fields in the Tuple interface.
public class MonitoringTuple {
> private Tuple tuple;
>
>
Then, I used it like this:
> KeyedStream<Map<String, Object>, MonitoringTuple>
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> MapTupleKeySelector(groupBySet));
The MapTupleKeySelector is defined below:
> public static class MapTupleKeySelector implements KeySelector<Map<String,
> Object>, MonitoringTuple> {
private final Set<String> groupBySet;
> public MapTupleKeySelector(Set<String> groupBySet) {
this.groupBySet = groupBySet;
}
> @Override
public MonitoringTuple getKey(Map<String, Object> inputMap) {
int groupBySetSize = groupBySet.size();
Tuple tuple = Tuple.newInstance(groupBySetSize);
int count = 0;
for (String groupBy : groupBySet) {
count = setTupleField(inputMap, tuple, count, groupBy);
}
return new MonitoringTuple(tuple);
}
}
> public static int setTupleField(Map<String, Object> inputMap, Tuple
> tuple, int count, String groupBy) {
Object groupByValueObj = inputMap.get(groupBy);
String tupleValue = Utils.convertToString(groupByValueObj);
tuple.setField(tupleValue, count++);
return count;
}
}
TIA,
On Thu, May 2, 2019 at 4:54 AM Chesnay Schepler <[email protected]> wrote:
> I'm not sure what you're asking.
>
> If you have a Deserialization schema that convert the data into a Map
> you're done as I understand it, what do you believe to be missing?
>
> If, for a given job, the number/types of fields are fixed you could look
> into using Row.
>
> On 01/05/2019 22:40, Vijay Balakrishnan wrote:
>
> Hi,
> Had asked this questions earlier as topic - "Flink - Type Erasure
> Exception trying to use Tuple6 instead of Tuple"
>
> Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
> etc.
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
> Tuple2, etc.) instead.
>
> DataStream<Map<String, Object>> kinesisStream = ...;
> KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream =
> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== complains
> about Tuple type for monitoringTupleKeyedStream
> .....
>
> public static class MapTupleKeySelector implements KeySelector<Map<String,
> Object>, Tuple> {
> private final Set<String> groupBySet;
>
> public MapTupleKeySelector(Set<String> groupBySet) {
> this.groupBySet = groupBySet;
> }
>
> @Override
> public Tuple getKey(Map<String, Object> inputMap) throws Exception
> {
> int groupBySetSize = groupBySet.size();
> Tuple tuple = Tuple.newInstance(groupBySetSize);
> //Tuple1 tuple = new Tuple1();
> int count = 0;
> for (String groupBy : groupBySet) {
> tuple.setField(groupByValue, count++);
> }
> return tuple;
> }
> }
>
> Abhishek had replied back in the Thread as follows: (posting in that
> thread as well creating a new thread):
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Can I get an example of how to create this extra Mapper class ?
>
> Currently, I am using deserialization to convert the incoming byte[] by
> implementing KinesisDeserializationSchema<Map<String, Object>> to convert
> to a DataStream<Map<String, Object>> kinesisStream.
>
> TIA,
>
>
>