Thanks so much for your answer , but then how should I perform such as
comparison ? Which options do we have ?
Thanks

Le mer. 24 juill. 2019 10:01 p.m., Ruidong Li <leonxp...@gmail.com> a
écrit :

> Hi, it's because the Outer Joins will generate retractions, consider the
> behavior of Left Outer Join
>
> 1.  left record arrives, no matched right record, so  +(left, null) will
> be generated.
> 2  right record arrives, the previous result should be retracted, so
> -(left, null) and +(left, right) will be generated
>
> Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月25日周四 上午8:15写道:
>
>> Hello guys I have registered some table environments and now I'm trying
>> to perform a query on these using LEFT JOIN like the example below:
>>
>>  Table fullenrichment = tenv.sqlQuery(
>>                 "SELECT   pp.a,pp.b,pp.c,pp.d,pp.a " +
>>                         " FROM t1 pp LEFT JOIN t2 ent" +
>>                         " ON pp.b = ent.b" +
>>                         " LEFT JOIN t3 act " +
>>                         " ON pp.a = act.a "
>>         );
>>
>> Once the query is complete I need to read this into a Row DS
>>
>> DS<Row> results = tenv.toAppendStream(fullenrichment,Row.class);
>>
>> I'm getting the following error, however, if I execute the same code but
>> instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the
>> code works , why this behavior?
>>
>> 1930 [main] INFO
>>  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  -
>> Flink Kinesis Consumer is going to read the following streams:
>> tr-stream-ingestion,
>> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a getter for field fields
>> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a setter for field fields
>> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> Class class org.apache.flink.types.Row cannot be used as a POJO type
>> because not all fields are valid POJO fields, and must be processed as
>> GenericType. Please read the Flink documentation on "Data Types &
>> Serialization" for details of the effect on performance.
>> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a getter for field fields
>> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a setter for field fields
>> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> Class class org.apache.flink.types.Row cannot be used as a POJO type
>> because not all fields are valid POJO fields, and must be processed as
>> GenericType. Please read the Flink documentation on "Data Types &
>> Serialization" for details of the effect on performance.
>> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a getter for field fields
>> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a setter for field fields
>> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> Class class org.apache.flink.types.Row cannot be used as a POJO type
>> because not all fields are valid POJO fields, and must be processed as
>> GenericType. Please read the Flink documentation on "Data Types &
>> Serialization" for details of the effect on performance.
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Table is not an append-only table. Use the toRetractStream() in order to
>> handle add and retract messages.
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
>> at
>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
>> at
>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
>> at consumer.trconsumer.main(trconsumer.java:180)
>>
>

Reply via email to