Hi, Can you post the complete stacktrace?
Best, Gary On Tue, Mar 3, 2020 at 1:08 PM kant kodali <kanth...@gmail.com> wrote: > Hi All, > > I am just trying to read edges which has the following format in Kafka > > 1,2 > 1,3 > 1,5 > > using the Table API and then converting to DataStream of Edge Objects and > printing them. However I am getting > java.util.concurrent.ExecutionException but not sure why? > > Here is the sample code > > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; > import org.apache.flink.graph.Edge; > import org.apache.flink.runtime.state.StateBackend; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.*; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.table.descriptors.Csv; > import org.apache.flink.table.descriptors.Kafka; > import org.apache.flink.table.descriptors.Schema; > import org.apache.flink.types.NullValue; > import org.apache.flink.types.Row; > > import java.util.UUID; > > public class Test { > > public static void main(String... args) throws Exception { > > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStateBackend((StateBackend) new > RocksDBStateBackend("file:///tmp/rocksdb")); > > StreamTableEnvironment bsTableEnv = > StreamTableEnvironment.create(env, bsSettings); > > bsTableEnv.connect( > new Kafka() > .property("bootstrap.servers", "localhost:9092") > .property("zookeeper.connect", "localhost:2181") > .property("group.id", UUID.randomUUID().toString()) > .startFromEarliest() > .version("universal") > .topic("edges") > ) > .withFormat(new Csv().fieldDelimiter(',')) > .withSchema( > new Schema() > .field("source", DataTypes.BIGINT()) > .field("target", DataTypes.BIGINT()) > ) > .createTemporaryTable("kafka_source"); > > Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from > kafka_source"); > > TypeInformation<Edge<Long, NullValue>> edgeTypeInformation = > TypeInformation.of(new TypeHint<Edge<Long, NullValue>>() { > @Override > public TypeInformation<Edge<Long, NullValue>> getTypeInfo() { > return super.getTypeInfo(); > } > }); > > DataStream<Edge<Long, NullValue>> edges = > bsTableEnv.toAppendStream(kafkaSourceTable, Row.class) > .map(row -> new Edge<>((Long) row.getField(0), (Long) > row.getField(1), NullValue.getInstance())) > .returns(edgeTypeInformation); > > edges.print(); > > bsTableEnv.execute("sample job"); > } > } > > > >