Hi,

Can you post the complete stacktrace?

Best,
Gary

On Tue, Mar 3, 2020 at 1:08 PM kant kodali <kanth...@gmail.com> wrote:

> Hi All,
>
> I am just trying to read edges which has the following format in Kafka
>
> 1,2
> 1,3
> 1,5
>
> using the Table API and then converting to DataStream of Edge Objects and
> printing them. However I am getting
> java.util.concurrent.ExecutionException but not sure why?
>
> Here is the sample code
>
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.graph.Edge;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.*;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.descriptors.Csv;
> import org.apache.flink.table.descriptors.Kafka;
> import org.apache.flink.table.descriptors.Schema;
> import org.apache.flink.types.NullValue;
> import org.apache.flink.types.Row;
>
> import java.util.UUID;
>
> public class Test {
>
>     public static void main(String... args) throws Exception {
>
>         EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStateBackend((StateBackend) new 
> RocksDBStateBackend("file:///tmp/rocksdb"));
>
>         StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(env, bsSettings);
>
>         bsTableEnv.connect(
>             new Kafka()
>                 .property("bootstrap.servers", "localhost:9092")
>                 .property("zookeeper.connect", "localhost:2181")
>                 .property("group.id", UUID.randomUUID().toString())
>                 .startFromEarliest()
>                 .version("universal")
>                 .topic("edges")
>         )
>         .withFormat(new Csv().fieldDelimiter(','))
>         .withSchema(
>             new Schema()
>                 .field("source", DataTypes.BIGINT())
>                 .field("target", DataTypes.BIGINT())
>         )
>         .createTemporaryTable("kafka_source");
>
>         Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from 
> kafka_source");
>
>         TypeInformation<Edge<Long, NullValue>> edgeTypeInformation = 
> TypeInformation.of(new TypeHint<Edge<Long, NullValue>>() {
>             @Override
>             public TypeInformation<Edge<Long, NullValue>> getTypeInfo() {
>                 return super.getTypeInfo();
>             }
>         });
>
>         DataStream<Edge<Long, NullValue>> edges = 
> bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
>                 .map(row -> new Edge<>((Long) row.getField(0), (Long) 
> row.getField(1), NullValue.getInstance()))
>                 .returns(edgeTypeInformation);
>
>         edges.print();
>
>         bsTableEnv.execute("sample job");
>     }
> }
>
>
>
>

Reply via email to