Hi kant,

AFAIK, there is no "print to stdout" sink for Table API now, you can
implement one custom sink following this doc[1].

IMO, an out-of-box print table sink is very useful, and I've created an
issue[2] to track this.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
[2] https://issues.apache.org/jira/browse/FLINK-16354

kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道:

> Hi,
>
> Thanks for the pointer. Looks like the documentation says to use
> tableEnv.registerTableSink however in my IDE it shows the method is
> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that
> can print to stdout? what sink should I use to print to stdout and how do I
> add it without converting into DataStream?
>
> Thanks!
>
> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com>
> wrote:
>
>> Hi,
>>
>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s
>> not part of any public API.
>>
>> You don’t have to convert DataStream into Table to read from Kafka in
>> Table API. I guess you could, if you had used DataStream API’s
>> FlinkKafkaConsumer as it’s documented here [1].
>>
>> But you should be able to use Kafka Table connector directly, as it is
>> described in the docs [2][3].
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>
>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote:
>>
>> Also why do I need to convert to DataStream to print the rows of a table?
>> Why not have a print method in the Table itself?
>>
>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Do I need to use DataStream API or Table API to construct sources? I am
>>> just trying to read from Kafka and print it to console. And yes I tried it
>>> with datastreams and it works fine but I want to do it using Table related
>>> APIs. I don't see any documentation or a sample on how to create Kafka
>>> table source or any other source using Table Source API's so after some
>>> digging I wrote the following code. My ultimate goal is to avoid Datastream
>>> API as much as possible and just use Table API & SQL but somehow I feel the
>>> Flink framework focuses on DataStream than the SQL interface. am I wrong?
>>> From the user perspective wouldn't it make more sense to focus on SQL
>>> interfaces for both streaming and batch?
>>>
>>>
>>> import 
>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>>> import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>>> import org.apache.flink.table.api.DataTypes;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.TableSchema;
>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>> import org.apache.flink.table.sources.TableSource;
>>> import org.apache.flink.types.Row;
>>>
>>> import java.io.IOException;
>>> import java.util.Properties;
>>>
>>> public class Test {
>>>
>>>     public class MyDeserializationSchema extends 
>>> AbstractDeserializationSchema<Row> {
>>>         @Override
>>>         public Row deserialize(byte[] message) throws IOException {
>>>             return Row.of(new String(message));
>>>         }
>>>     }
>>>
>>>     public static void main(String... args) throws Exception {
>>>         Test test = new Test();
>>>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>                 .useBlinkPlanner()
>>>                 .inStreamingMode()
>>>                 .build();
>>>
>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>         StreamTableEnvironment tableEnvironment = 
>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>
>>>         TableSource tableSource = test.getKafkaTableSource();
>>>         Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
>>>         tableEnvironment.createTemporaryView("kafka_source", kafkaTable);
>>>
>>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>>> kafka_source");
>>>         tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>
>>>         streamExecutionEnvironment.execute("Sample Job");
>>>     }
>>>
>>>     public KafkaTableSource getKafkaTableSource() {
>>>         TableSchema tableSchema = TableSchema.builder().field("f0", 
>>> DataTypes.STRING()).build();
>>>         Properties properties = new Properties();
>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>         properties.setProperty("group.id", "test");
>>>         return new KafkaTableSource(tableSchema, "test-topic1", properties, 
>>> new MyDeserializationSchema());
>>>     }
>>> }
>>>
>>>
>>> I get the following error
>>>
>>> The program finished with the following exception:
>>>
>>> The implementation of the FlinkKafkaConsumerBase is not serializable.
>>> The object probably contains or references non serializable fields.
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>>>
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>>>
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>>>
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>>>
>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>>>
>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>>>
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>>>
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>
>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>
>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>
>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>> Test.main(Test.java:40)
>>>
>>> The error seems to be on the line
>>>
>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>
>>> and I am not sure why it is not able to serialize?
>>>
>>> Thanks!
>>>
>>
>>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

Reply via email to