Hi, Below is sample code I am trying with,
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); TypeInformation[] types = new TypeInformation[] {BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}; String[] fieldNames = new String[]{"id", "name", "salary", "department"}; RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); env.registerType(RowTypeInfo.class); env.addSource(new EmployeeSourceFunction(), "samplesource", rowTypeInfo) .keyBy("department").sum("salary").addSink(new PrintSinkFunction<>()); public class EmployeeSourceFunction implements SourceFunction<Row> { private boolean continueRead = true; @Override public void run(SourceContext<Row> ctx) throws Exception { while (continueRead) { for (int i = 0; i < 3 && continueRead; i++) { Row row = new Row(4); row.setField(0, Integer.valueOf(i)); row.setField(1, String.valueOf("user" + i)); row.setField(2, 1000 * i); row.setField(3, "DEV"); ctx.collect(row); } continueRead = false; } } @Override public void cancel() { continueRead = false; } } And I am getting below exception java.lang.ClassCastException: org.apache.flink.api.java.typeutils.RowTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.TupleTypeInfo at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:167) I have checked FieldAccessorFactory.java:167, if (typeInfo.isTupleType()) { TupleTypeInfo tupleTypeInfo = (TupleTypeInfo) typeInfo; RowTypeInfo returns 'true' for isTupleType() and cannot be casted. Can someone please tell me, Is it that I have done wrong configuration or bug in code ? -- Thank you, Madan.