Hi,

Jörn is probably right.

In contrast to print(), which immediately triggers an execution,
writeToSink() just appends a sink operator and requires to explicitly
trigger the execution.

The INFO messages of the TypeExtractor are "just" telling you, that Row
cannot be used as a POJO type, but that's fine here.

Best, Fabian

2018-05-27 19:51 GMT+02:00 Jörn Franke <jornfra...@gmail.com>:

> Do you have the complete source?
>
> I am missing a env.execute at the end
>
> > On 27. May 2018, at 18:55, chrisr123 <chris.rueg...@gmail.com> wrote:
> >
> > I'm using Flink 1.4.0
> >
> > I'm trying to save the results of a Table API query to a CSV file, but
> I'm
> > getting an error.
> > Here are the details:
> >
> > My Input file looks like this:
> > id,species,color,weight,name
> > 311,canine,golden,75,dog1
> > 312,canine,brown,22,dog2
> > 313,feline,gray,8,cat1
> >
> > I run a query on this to select canines only, and I want to save this to
> a
> > csv file:
> >
> >            ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> >            BatchTableEnvironment tableEnv =
> > TableEnvironment.getTableEnvironment(env);
> >
> >                        String inputPath = "location-of-source-file";
> >            CsvTableSource petsTableSource = CsvTableSource.builder()
> >                    .path(inputPath)
> >                    .ignoreFirstLine()
> >                    .fieldDelimiter(",")
> >                    .field("id", Types.INT())
> >                    .field("species", Types.STRING())
> >                    .field("color", Types.STRING())
> >                    .field("weight", Types.DOUBLE())
> >                    .field("name", Types.STRING())
> >                    .build();
> >
> >            // Register our table source
> >            tableEnv.registerTableSource("pets", petsTableSource);
> >            Table pets = tableEnv.scan("pets");
> >
> >            Table counts = pets
> >                    .groupBy("species")
> >                    .select("species, species.count as count")
> >                    .filter("species === 'canine'");
> >
> >            DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
> >                        result.print();
> >
> >            // Write Results to File
> >            TableSink<Row> sink = new 
> > CsvTableSink("/home/hadoop/output/pets",
> ",");
> >            counts.writeToSink(sink);
> >
> > When I run this, I get the output from the result.print() call as this:
> >
> > canine,2
> >
> > but I do not see any results written
> > to the file, and I see the error below.
> > How can I save the results I'm seeing in stdout to a CSV file?
> > Thanks!
> >
> >
> >
> > 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> > contain a getter for field fields
> > 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> > contain a setter for field fields
> > 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a
> valid
> > POJO type because not all fields are valid POJO fields.
> > 2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> > contain a getter for field fields
> > 2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> > contain a setter for field fields
> > 2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a
> valid
> > POJO type because not all fields are valid POJO fields.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to