Hi, Jörn is probably right.
In contrast to print(), which immediately triggers an execution, writeToSink() just appends a sink operator and requires to explicitly trigger the execution. The INFO messages of the TypeExtractor are "just" telling you, that Row cannot be used as a POJO type, but that's fine here. Best, Fabian 2018-05-27 19:51 GMT+02:00 Jörn Franke <jornfra...@gmail.com>: > Do you have the complete source? > > I am missing a env.execute at the end > > > On 27. May 2018, at 18:55, chrisr123 <chris.rueg...@gmail.com> wrote: > > > > I'm using Flink 1.4.0 > > > > I'm trying to save the results of a Table API query to a CSV file, but > I'm > > getting an error. > > Here are the details: > > > > My Input file looks like this: > > id,species,color,weight,name > > 311,canine,golden,75,dog1 > > 312,canine,brown,22,dog2 > > 313,feline,gray,8,cat1 > > > > I run a query on this to select canines only, and I want to save this to > a > > csv file: > > > > ExecutionEnvironment env = > > ExecutionEnvironment.getExecutionEnvironment(); > > BatchTableEnvironment tableEnv = > > TableEnvironment.getTableEnvironment(env); > > > > String inputPath = "location-of-source-file"; > > CsvTableSource petsTableSource = CsvTableSource.builder() > > .path(inputPath) > > .ignoreFirstLine() > > .fieldDelimiter(",") > > .field("id", Types.INT()) > > .field("species", Types.STRING()) > > .field("color", Types.STRING()) > > .field("weight", Types.DOUBLE()) > > .field("name", Types.STRING()) > > .build(); > > > > // Register our table source > > tableEnv.registerTableSource("pets", petsTableSource); > > Table pets = tableEnv.scan("pets"); > > > > Table counts = pets > > .groupBy("species") > > .select("species, species.count as count") > > .filter("species === 'canine'"); > > > > DataSet<Row> result = tableEnv.toDataSet(counts, Row.class); > > result.print(); > > > > // Write Results to File > > TableSink<Row> sink = new > > CsvTableSink("/home/hadoop/output/pets", > ","); > > counts.writeToSink(sink); > > > > When I run this, I get the output from the result.print() call as this: > > > > canine,2 > > > > but I do not see any results written > > to the file, and I see the error below. > > How can I save the results I'm seeing in stdout to a CSV file? > > Thanks! > > > > > > > > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor > > (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not > > contain a getter for field fields > > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor > > (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not > > contain a setter for field fields > > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor > > (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a > valid > > POJO type because not all fields are valid POJO fields. > > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor > > (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not > > contain a getter for field fields > > 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor > > (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not > > contain a setter for field fields > > 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor > > (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a > valid > > POJO type because not all fields are valid POJO fields. > > > > > > > > > > > > > > > > > > > > -- > > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >