Hi Chris,
First thing, FxRate is not POJO, a POJO should have a constructor without
arguments. In this way, you can read from a POJO DataStream directly.
Second, if you want get field from POJO, please use get function
like: fx.get('currency'), if you have a POJO field, you can use this way to
get nested field from POJO.
Best,
Jingsong Lee
On Wed, Dec 4, 2019 at 12:33 AM Chris Miller <[email protected]> wrote:
> I'm having trouble dealing with a DataStream of POJOs. In particular, when
> I perform SQL operations on it I can't figure out the syntax for referring
> to individual fields within the POJO.
>
> Below is an example that illustrates the problem and the various
> approaches I've tried. Can anyone please point me in the right direction?
>
> import java.util.Arrays;
> import java.util.List;
>
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
>
> public class PojoTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv);
>
> // Using tuples, this works as expected
> List<Tuple2<String, Double>> tupleData = Arrays.asList(
> new Tuple2<>("USD", 1.0),
> new Tuple2<>("GBP", 1.3),
> new Tuple2<>("EUR", 1.11));
> DataStreamSource<Tuple2<String, Double>> tupleStream =
> streamEnv.fromCollection(tupleData);
> tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema();
>
> // Using a DataStream of POJOs, how do I obtain an equivalent table to
> the above?
> List<FxRate> pojoData = Arrays.asList(
> new FxRate("USD", 1.0),
> new FxRate("GBP", 1.3),
> new FxRate("EUR", 1.11));
> DataStreamSource<FxRate> pojoStream = streamEnv.fromCollection(pojoData);
>
> Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx");
> pojoTable.printSchema();
>
> // This fails with "ValidationException: Cannot resolve field [currency],
> input field list:[fx]"
> pojoTable.select("currency, rate").printSchema();
>
> // This fails with "ValidationException: Undefined function: currency"
> pojoTable.select("fx.currency AS currency, fx.rate AS
> rate").printSchema();
>
> // This fails with "ValidationException: Too many fields referenced from
> an atomic type"
> tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema();
>
> // This fails with "ValidationException: Field reference expression
> expected"
> tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema();
>
> streamEnv.execute();
> }
>
> public static class FxRate {
> public String currency;
> public double rate;
>
> public FxRate(String currency, double rate) {
> this.currency = currency;
> this.rate = rate;
> }
>
> @Override
> public String toString() {
> return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}';
> }
> }
> }
>
>
--
Best, Jingsong Lee