Hi All, I am just trying to read edges which has the following format in Kafka
1,2 1,3 1,5 using the Table API and then converting to DataStream of Edge Objects and printing them. However I am getting java.util.concurrent.ExecutionException but not sure why? Here is the sample code import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.graph.Edge; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.*; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.NullValue; import org.apache.flink.types.Row; import java.util.UUID; public class Test { public static void main(String... args) throws Exception { EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend((StateBackend) new RocksDBStateBackend("file:///tmp/rocksdb")); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings); bsTableEnv.connect( new Kafka() .property("bootstrap.servers", "localhost:9092") .property("zookeeper.connect", "localhost:2181") .property("group.id", UUID.randomUUID().toString()) .startFromEarliest() .version("universal") .topic("edges") ) .withFormat(new Csv().fieldDelimiter(',')) .withSchema( new Schema() .field("source", DataTypes.BIGINT()) .field("target", DataTypes.BIGINT()) ) .createTemporaryTable("kafka_source"); Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from kafka_source"); TypeInformation<Edge<Long, NullValue>> edgeTypeInformation = TypeInformation.of(new TypeHint<Edge<Long, NullValue>>() { @Override public TypeInformation<Edge<Long, NullValue>> getTypeInfo() { return super.getTypeInfo(); } }); DataStream<Edge<Long, NullValue>> edges = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class) .map(row -> new Edge<>((Long) row.getField(0), (Long) row.getField(1), NullValue.getInstance())) .returns(edgeTypeInformation); edges.print(); bsTableEnv.execute("sample job"); } }