Hi All,

I am just trying to read edges which has the following format in Kafka

1,2
1,3
1,5

using the Table API and then converting to DataStream of Edge Objects and
printing them. However I am getting java.util.concurrent.ExecutionException but
not sure why?

Here is the sample code

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.graph.Edge;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.Row;

import java.util.UUID;

public class Test {

    public static void main(String... args) throws Exception {

        EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend((StateBackend) new
RocksDBStateBackend("file:///tmp/rocksdb"));

        StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(env, bsSettings);

        bsTableEnv.connect(
            new Kafka()
                .property("bootstrap.servers", "localhost:9092")
                .property("zookeeper.connect", "localhost:2181")
                .property("group.id", UUID.randomUUID().toString())
                .startFromEarliest()
                .version("universal")
                .topic("edges")
        )
        .withFormat(new Csv().fieldDelimiter(','))
        .withSchema(
            new Schema()
                .field("source", DataTypes.BIGINT())
                .field("target", DataTypes.BIGINT())
        )
        .createTemporaryTable("kafka_source");

        Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from
kafka_source");

        TypeInformation<Edge<Long, NullValue>> edgeTypeInformation =
TypeInformation.of(new TypeHint<Edge<Long, NullValue>>() {
            @Override
            public TypeInformation<Edge<Long, NullValue>> getTypeInfo() {
                return super.getTypeInfo();
            }
        });

        DataStream<Edge<Long, NullValue>> edges =
bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
                .map(row -> new Edge<>((Long) row.getField(0), (Long)
row.getField(1), NullValue.getInstance()))
                .returns(edgeTypeInformation);

        edges.print();

        bsTableEnv.execute("sample job");
    }
}

Reply via email to