Hi Tim,
Thanks for your reply. I am not seeing an option to specify a
.returns(new TypeHint<Tuple6<String,
String,String,String,String,String>>(){}) with KeyedStream ??
monitoringTupleKeyedStream = kinesisStream.keyBy(new
KeySelector<Monitoring, Tuple>() {
public Tuple getKey(Monitoring mon) throws Exception
{......return new Tuple6<>(..} })
I tried using
TypeInformation<Tuple6<String, String, String, String, String,
String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String,
String, String, String, String>>(){});
kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...},
info); //specify typeInfo through
TIA,
Vijay
On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <vict...@gmail.com
<mailto:vict...@gmail.com>> wrote:
Flink needs type information for serializing and deserializing
objects, and that is lost due to Java type erasure. The only way
to workaround this is to specify the return type of the function
called in the lambda.
Fabian's answer here explains it well.
https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554
Tim
On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan
<bvija...@gmail.com <mailto:bvija...@gmail.com>> wrote:
Hi,
I am trying to use the KeyedStream with Tuple to handle
diffrent types of Tuples including Tuple6.
Keep getting the Exception:
*Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException:
Usage of class Tuple as a type is not allowed. Use a concrete
subclass (e.g. Tuple1, Tuple2, etc.) instead*.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can
pass it on to treat Tuple6 as a Tuple like the
monitoringTupleKeyedStream.
Code below:
KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream
= null;
String keyOperationType = ....;//provided
if (StringUtils.isNotEmpty(keyOperationType)) {
if
(keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION))
{
monitoringTupleKeyedStream =
kinesisStream.keyBy("deployment", "gameId", "eventName",
"component");
} else if
(keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION))
{
monitoringTupleKeyedStream =
kinesisStream.keyBy("deployment", "gameId", "eventName",
"component", "instance");
} else if
(keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION))
{
TypeInformation<Tuple6<String, String, String,
String, String, String>> info = TypeInformation.of(new
TypeHint<Tuple6<String, String, String, String, String,
String>>(){});
monitoringTupleKeyedStream =
kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
public Tuple getKey(Monitoring mon) throws
Exception {
String key = "";
String keyName = "";
final String eventName = mon.getEventName();
if (eventName != null &&
((eventName.equalsIgnoreCase(INGRESS_FPS)))
)) {
keyName = PCAM_ID;
key = mon.getEventDataMap() != null ?
(String) mon.getEventDataMap().get(PCAM_ID) : "";
} else if (eventName != null &&
(eventName.equalsIgnoreCase(EGRESS_FPS))) {
keyName = OUT_BITRATE;
key = mon.getEventDataMap() != null ?
(String) mon.getEventDataMap().get(OUT_BITRATE) : "";
//TODO: identify key to use
}
mon.setKeyName(keyName);
mon.setKeyValue(key);
return new Tuple6<>(mon.getDeployment(),
mon.getGameId(), eventName, mon.getComponent(),
mon.getKeyName(), mon.getKeyValue());
}
}); //, info)
} else if
(keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION))
{
monitoringTupleKeyedStream =
kinesisStream.keyBy("deployment", "gameId", "eventName",
"component", "instance", "container"); //<== this is also
a Tuple6 but no complaints ?
}
}
This example below needs monitoringTupleKeyedStream to be
KeyedStream<Monitoring, Tuple6<String, String, String, String,
String, String>>
TypeInformation<Tuple6<String, String, String, String,
String, String>> info = TypeInformation.of(new
TypeHint<Tuple6<String, String, String, String, String,
String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new
KeySelector<Monitoring, Tuple6<String, String, String,
String, String, String>>() {
@Override
public Tuple6<String, String, String,
String, String, String> getKey(Monitoring mon) throws
Exception {
String key = "";
String keyName = "";
//TODO: extract to a method to
pull key to use from a config file
final String eventName =
mon.getEventName();
if (eventName != null &&
((eventName.equalsIgnoreCase(INGRESS_FPS)))
)) {
keyName = PCAM_ID;
key = mon.getEventDataMap() !=
null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
} else if (eventName != null &&
(eventName.equalsIgnoreCase(EGRESS_FPS))) {
keyName = OUT_BITRATE;
key = mon.getEventDataMap() !=
null ? (String) mon.getEventDataMap().get(OUT_BITRATE) :
""; //TODO: identify key to use
}
mon.setKeyName(keyName);
mon.setKeyValue(key);
return new
Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName,
mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
}
}, info);
TIA