hi kant,

> Also why do I need to convert to DataStream to print the rows of a table?
Why not have a print method in the Table itself?
Flink 1.10 introduces a utility class named TableUtils to convert a Table
to List<Row>, this utility class is mainly used for demonstration or
testing and is only applicable for *small batch jobs* and small finite *append
only stream jobs*.  code like:
Table table = tEnv.sqlQuery("select ...");
List<Row> result = TableUtils.collectToList(table);
result.....

currently, we are planner to implement Table#collect[1], after
that Table#head and Table#print may be also introduced soon.

>  The program finished with the following exception:
please make sure that the kafka version in Test class and the kafka version
in pom dependency are same. I tested your code successfully.

Bests,
Godfrey

[1] https://issues.apache.org/jira/browse/FLINK-14807


Benchao Li <libenc...@gmail.com> 于2020年3月1日周日 下午4:44写道:

> Hi kant,
>
> CSV format is an independent module, you need to add it as your
> dependency.
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-csv</artifactId>
>    <version>${flink.version}</version>
> </dependency>
>
>
> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 下午3:43写道:
>
>> ------------------------------------------------------------
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: findAndCreateTableSource failed.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: org.apache.flink.table.api.TableException:
>> findAndCreateTableSource failed.
>> at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>> at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>> at Test.main(Test.java:34)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>> ... 8 more
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> Could not find a suitable table factory for
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>>
>> Reason: Required context properties mismatch.
>>
>> The matching candidates:
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> Mismatched properties:
>> 'connector.type' expects 'filesystem', but is 'kafka'
>>
>> The following properties are requested:
>> connector.property-version=1
>> connector.topic=test-topic1
>> connector.type=kafka
>> connector.version=0.11
>> format.property-version=1
>> format.type=csv
>> schema.0.data-type=VARCHAR(2147483647)
>> schema.0.name=f0
>> update-mode=append
>>
>> The following factories have been considered:
>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> at
>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>> at
>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>> at
>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>> at
>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>> at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
>> ... 34 more
>>
>> On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <pi...@ververica.com>
>> wrote:
>>
>>> Hi,
>>>
>>> > Thanks for the pointer. Looks like the documentation says to use
>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>> deprecated in Flink 1.10.
>>>
>>> It looks like not all of the documentation was updated after methods
>>> were deprecated. However if you look at the java docs of the
>>> `registerTableSink` method, you can find an answer [1].
>>>
>>> >  It sill doesnt work because it says for CSV the connector.type
>>> should be filesystem not Kafka.
>>>
>>> Can you post the full stack trace? As I’m not familiar with the Table
>>> API, maybe you Timo or Dawid know what’s going on here?
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
>>>
>>> On 1 Mar 2020, at 07:50, kant kodali <kanth...@gmail.com> wrote:
>>>
>>> Here is my updated code after digging through the source code (not sure
>>> if it is correct ). It sill doesnt work because it says for CSV the
>>> connector.type should be filesystem not Kafka but documentation says it is
>>> supported.
>>>
>>>
>>> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
>>> import org.apache.flink.runtime.state.StateBackend;
>>> import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.table.api.DataTypes;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>> import org.apache.flink.table.descriptors.Csv;
>>> import org.apache.flink.table.descriptors.Kafka;
>>> import org.apache.flink.table.descriptors.Schema;
>>>
>>> public class Test {
>>>
>>>     public static void main(String... args) throws Exception {
>>>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>                 .useBlinkPlanner()
>>>                 .inStreamingMode()
>>>                 .build();
>>>
>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>         streamExecutionEnvironment.setStateBackend((StateBackend) new 
>>> RocksDBStateBackend("file:///tmp/rocksdb"));
>>>         StreamTableEnvironment tableEnvironment = 
>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>
>>>         tableEnvironment
>>>             .connect(
>>>                 new Kafka()
>>>                     .version("0.11")
>>>                     .topic("test-topic1")
>>>             )
>>>             .withFormat(new Csv())
>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>             .inAppendMode()
>>>             .createTemporaryTable("kafka_source");
>>>
>>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>>> kafka_source");
>>>
>>>         tableEnvironment
>>>             .connect(
>>>                 new Kafka()
>>>                     .version("0.11")
>>>                     .topic("test-topic2")
>>>             )
>>>             .withFormat(new Csv())
>>>             .withSchema(new Schema().field("f0", DataTypes.STRING()))
>>>             .inAppendMode()
>>>             .createTemporaryTable("kafka_target");
>>>
>>>         tableEnvironment.insertInto("kafka_target", resultTable);
>>>
>>>         tableEnvironment.execute("Sample Job");
>>>     }
>>> }
>>>
>>>
>>> On Sat, Feb 29, 2020 at 7:48 PM kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Hi Benchao,
>>>>
>>>> Agreed a ConsoleSink is very useful but that is not the only problem
>>>> here. Documentation says use  tableEnv.registerTableSink all over the place
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>>>> however that function is deprecated. so how do I add any other Sink?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <libenc...@gmail.com> wrote:
>>>>
>>>>> Hi kant,
>>>>>
>>>>> AFAIK, there is no "print to stdout" sink for Table API now, you can
>>>>> implement one custom sink following this doc[1].
>>>>>
>>>>> IMO, an out-of-box print table sink is very useful, and I've created
>>>>> an issue[2] to track this.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>>>>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>>>>
>>>>> kant kodali <kanth...@gmail.com> 于2020年3月1日周日 上午2:30写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thanks for the pointer. Looks like the documentation says to use
>>>>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>>>>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink 
>>>>>> that
>>>>>> can print to stdout? what sink should I use to print to stdout and how 
>>>>>> do I
>>>>>> add it without converting into DataStream?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <pi...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal.
>>>>>>> It’s not part of any public API.
>>>>>>>
>>>>>>> You don’t have to convert DataStream into Table to read from Kafka
>>>>>>> in Table API. I guess you could, if you had used DataStream API’s
>>>>>>> FlinkKafkaConsumer as it’s documented here [1].
>>>>>>>
>>>>>>> But you should be able to use Kafka Table connector directly, as it
>>>>>>> is described in the docs [2][3].
>>>>>>>
>>>>>>> Piotrek
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>>>>>> [3]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>>>>>
>>>>>>> On 29 Feb 2020, at 12:54, kant kodali <kanth...@gmail.com> wrote:
>>>>>>>
>>>>>>> Also why do I need to convert to DataStream to print the rows of a
>>>>>>> table? Why not have a print method in the Table itself?
>>>>>>>
>>>>>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali <kanth...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> Do I need to use DataStream API or Table API to construct sources?
>>>>>>>> I am just trying to read from Kafka and print it to console. And yes I
>>>>>>>> tried it with datastreams and it works fine but I want to do it using 
>>>>>>>> Table
>>>>>>>> related APIs. I don't see any documentation or a sample on how to 
>>>>>>>> create
>>>>>>>> Kafka table source or any other source using Table Source API's so 
>>>>>>>> after
>>>>>>>> some digging I wrote the following code. My ultimate goal is to avoid
>>>>>>>> Datastream API as much as possible and just use Table API & SQL but 
>>>>>>>> somehow
>>>>>>>> I feel the Flink framework focuses on DataStream than the SQL 
>>>>>>>> interface. am
>>>>>>>> I wrong? From the user perspective wouldn't it make more sense to 
>>>>>>>> focus on
>>>>>>>> SQL interfaces for both streaming and batch?
>>>>>>>>
>>>>>>>>
>>>>>>>> import 
>>>>>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>>>>>>>> import 
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>>>>>>>> import org.apache.flink.table.api.DataTypes;
>>>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>>>> import org.apache.flink.table.api.Table;
>>>>>>>> import org.apache.flink.table.api.TableSchema;
>>>>>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>>>>>> import org.apache.flink.table.sources.TableSource;
>>>>>>>> import org.apache.flink.types.Row;
>>>>>>>>
>>>>>>>> import java.io.IOException;
>>>>>>>> import java.util.Properties;
>>>>>>>>
>>>>>>>> public class Test {
>>>>>>>>
>>>>>>>>     public class MyDeserializationSchema extends 
>>>>>>>> AbstractDeserializationSchema<Row> {
>>>>>>>>         @Override
>>>>>>>>         public Row deserialize(byte[] message) throws IOException {
>>>>>>>>             return Row.of(new String(message));
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     public static void main(String... args) throws Exception {
>>>>>>>>         Test test = new Test();
>>>>>>>>         EnvironmentSettings settings = 
>>>>>>>> EnvironmentSettings.newInstance()
>>>>>>>>                 .useBlinkPlanner()
>>>>>>>>                 .inStreamingMode()
>>>>>>>>                 .build();
>>>>>>>>
>>>>>>>>         StreamExecutionEnvironment streamExecutionEnvironment = 
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>         StreamTableEnvironment tableEnvironment = 
>>>>>>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>>>>>>
>>>>>>>>         TableSource tableSource = test.getKafkaTableSource();
>>>>>>>>         Table kafkaTable = 
>>>>>>>> tableEnvironment.fromTableSource(tableSource);
>>>>>>>>         tableEnvironment.createTemporaryView("kafka_source", 
>>>>>>>> kafkaTable);
>>>>>>>>
>>>>>>>>         Table resultTable = tableEnvironment.sqlQuery("select * from 
>>>>>>>> kafka_source");
>>>>>>>>         tableEnvironment.toAppendStream(resultTable, 
>>>>>>>> Row.class).print();
>>>>>>>>
>>>>>>>>         streamExecutionEnvironment.execute("Sample Job");
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     public KafkaTableSource getKafkaTableSource() {
>>>>>>>>         TableSchema tableSchema = TableSchema.builder().field("f0", 
>>>>>>>> DataTypes.STRING()).build();
>>>>>>>>         Properties properties = new Properties();
>>>>>>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>>>>>         properties.setProperty("group.id", "test");
>>>>>>>>         return new KafkaTableSource(tableSchema, "test-topic1", 
>>>>>>>> properties, new MyDeserializationSchema());
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> I get the following error
>>>>>>>>
>>>>>>>> The program finished with the following exception:
>>>>>>>>
>>>>>>>> The implementation of the FlinkKafkaConsumerBase is not
>>>>>>>> serializable. The object probably contains or references non 
>>>>>>>> serializable
>>>>>>>> fields.
>>>>>>>>
>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>>
>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>>
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>>>>>>>>
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>>>>>>>>
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>>>>>>>>
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>>>>>>>>
>>>>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>>>>>>
>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>>
>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>>>>>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>>>>>>
>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>>>>>>
>>>>>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>>>>>>
>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>>>>>>
>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>>>>>>
>>>>>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>>>>>> Test.main(Test.java:40)
>>>>>>>>
>>>>>>>> The error seems to be on the line
>>>>>>>>
>>>>>>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>>>>>>
>>>>>>>> and I am not sure why it is not able to serialize?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Benchao Li
>>>>> School of Electronics Engineering and Computer Science, Peking University
>>>>> Tel:+86-15650713730
>>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>>
>>>>>
>>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>

Reply via email to