[ https://issues.apache.org/jira/browse/FLINK-2410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14713450#comment-14713450 ]
ASF GitHub Bot commented on FLINK-2410: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/943#discussion_r37983471 --- Diff: flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import java.io.Serializable; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.table.Table; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class PojoGroupingITCase extends MultipleProgramsTestBase { + + public PojoGroupingITCase(TestExecutionMode mode) { + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testPojoGrouping() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<String, Double, String>> data = env.fromElements( + new Tuple3<String, Double, String>("A", 23.0, "Z"), + new Tuple3<String, Double, String>("A", 24.0, "Y"), + new Tuple3<String, Double, String>("B", 1.0, "Z")); + + TableEnvironment tableEnv = new TableEnvironment(); + + Table table = tableEnv + .fromDataSet(data, "groupMe, value, name") + .select("groupMe, value, name") + .where("groupMe != 'B'"); + + DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class); + + DataSet<MyPojo> result = myPojos.groupBy("groupMe") + .sortGroup("value", Order.DESCENDING) + .first(1); + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); --- End diff -- Use `collect` instead of writing to disk (see FLINK-2032) > PojoTypeInfo is not completely serializable > ------------------------------------------- > > Key: FLINK-2410 > URL: https://issues.apache.org/jira/browse/FLINK-2410 > Project: Flink > Issue Type: Bug > Components: Java API > Reporter: Timo Walther > Assignee: Timo Walther > > Table API requires PojoTypeInfo to be serializable. The following code fails: > {code} > Table finishedEtlTable = maxMeasurements > .join(stationTable).where("s_station_id = m_station_id") > .select("year, month, day, value, country, name"); > DataSet<MaxTemperature> maxTemp = tableEnv.toDataSet(finishedEtlTable, > MaxTemperature.class); > maxTemp > .groupBy("year") > .sortGroup("value", Order.DESCENDING) > .first(1) > .print(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)