liang ding created FLINK-18724: ---------------------------------- Summary: Integration with DataStream and DataSet API report error Key: FLINK-18724 URL: https://issues.apache.org/jira/browse/FLINK-18724 Project: Flink Issue Type: Bug Components: API / Core, Connectors / Kafka, Table SQL / API Affects Versions: 1.11.1 Reporter: liang ding
I want to create a table from a DataStream(kafka) : there is two reason I need to use DataStream: # I need decode msg to columns by custom format, in sql mode I don't known how to do it. # I have realize DeserializationSchema or FlatMapFunction both. when use datastream I can do many things before it become a suitable table, that is my prefer way in any other apply. so I do it like that: {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings tSet= EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv=StreamTableEnvironment.create(env,tSet); DataStream<MyRow> stream = env .addSource(new FlinkKafkaConsumer<>("test-log", new SimpleStringSchema(), properties)) .flatMap(new LogParser()); //stream.printToErr(); tEnv.fromDataStream(stream).select("userId,city").execute().print(); tEnv.execute("test-sql"); //env.execute("test"); {code} then I got message: {noformat} [Kafka Fetcher for Source: Custom Source -> Flat Map ->* -> select: (userId,city) -> to: Row (3/3)] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-flink-3-5, groupId=flink-3] Node 0 sent an invalid full fetch response with extra=(test-log-0, response=( [Kafka Fetcher for Source: Custom Source -> Flat Map ->* -> select: (userId,city) -> to: Row (3/3)] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-flink-3-5, groupId=flink-3] Node 0 sent an invalid full fetch response with extra=(test-log-1, response=({noformat} it seen like both StreamExecutionEnvironment and StreamTableEnvironment start the fetcher and make no one successed. and there is no guide Integration which made me confused: should I do env.execute or tableEnv.execute or both(it's seen not) ? and the blink planner way -- This message was sent by Atlassian Jira (v8.3.4#803005)