hi kant,
"FULL OUTER JOIN" job will generate retract message, so toRetractStream is
required to guarantee the correctness.
I think it's better to use StreamExecutionEnvrionment.execute, because you
have converted the Table to DataStream.

kant kodali <kanth...@gmail.com> 于2020年1月19日周日 上午11:59写道:

> Hi Godfrey,
>
> I was just clicking the run button on my IDE and it doesn't really show me
> errors so I used command line fink run <jar> and that shows me what the
> error is. It tells me I need to change to toRetractStream() and both
> StreamExecutionEnvrionment and StreamTableEnvrionment .execute seems to
> work fine although I am not sure which one is the correct usage.
>
> Thanks!
>
> On Sat, Jan 18, 2020 at 6:52 PM kant kodali <kanth...@gmail.com> wrote:
>
>> Hi Godfrey,
>>
>> Thanks a lot for your response. I just tried it with env.execute("simple
>> job") but I still get the same error message.
>>
>> Kant
>>
>> On Sat, Jan 18, 2020 at 6:26 PM godfrey he <godfre...@gmail.com> wrote:
>>
>>> hi kant,
>>>
>>> > 1) The Documentation says full outer join is supported however the
>>> below code just exits with value 1. No error message.
>>> if you have converted Table to DataStream, please execute it
>>> with StreamExecutionEnvironment ( call env.execute("simple job") )
>>>
>>> > 2) If I am using a blink planner should I use TableEnvironment or
>>> StreamTableEnvironment ?
>>> for streaming job, both Environment can be used. the difference is:
>>>   TableEnvironment will optimize multiple queries into one DAG when
>>> executing, while StreamTableEnvironment will independent optimize each
>>> query.
>>>   StreamTableEnvironment supports convert from/to DataStream,
>>> while TableEnvironment does not support it.
>>>   StreamTableEnvironment supports register TableFunction
>>> and AggregateFunction, while TableEnvironment does not support it now.
>>>
>>> for batch job, only TableEnvironment is the only choice, because
>>> DataStream does not support batch job now.
>>>
>>> > 3) Why flink current stable documentation(1.9) recommends (old
>>> planner)? any rough timeline on when we would be able to use blink planner
>>> in production? perhaps 1.10 or 1.11?
>>> 1.9 is blink planner's first version, and it is unstable. In 1.10, blink
>>> planner is more statable, we are switching the blink planner to the default
>>> step by step [0].
>>>
>>> [0]
>>> http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E
>>>
>>> kant kodali <kanth...@gmail.com> 于2020年1月18日周六 下午5:40写道:
>>>
>>>> Hi All,
>>>>
>>>> 1) The Documentation says full outer join is supported however the
>>>> below code just exits with value 1. No error message.
>>>>
>>>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>> import 
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>>>> import org.apache.flink.table.api.*;
>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>> import org.apache.flink.types.Row;
>>>>
>>>> import java.util.Properties;
>>>>
>>>> public class Test {
>>>>
>>>>     public static void main(String... args) throws Exception {
>>>>
>>>>         EnvironmentSettings bsSettings = 
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>         final StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>         StreamTableEnvironment bsTableEnv = 
>>>> StreamTableEnvironment.create(env, bsSettings);
>>>>
>>>>         Properties properties = new Properties();
>>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>         properties.setProperty("group.id", "test");
>>>>
>>>>         FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
>>>>                 java.util.regex.Pattern.compile("test-topic1"),
>>>>                 new SimpleStringSchema(),
>>>>                 properties);
>>>>         FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
>>>>                 java.util.regex.Pattern.compile("test-topic2"),
>>>>                 new SimpleStringSchema(),
>>>>                 properties);
>>>>
>>>>         DataStream<String> stream1 = env.addSource(consumer1);
>>>>         DataStream<String> stream2 = env.addSource(consumer2);
>>>>
>>>>         bsTableEnv.registerDataStream("sample1", stream1);
>>>>         bsTableEnv.registerDataStream("sample2", stream2);
>>>>
>>>>         Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL 
>>>> OUTER JOIN sample2 on sample1.f0=sample2.f0");
>>>>         result.printSchema();
>>>>
>>>>         bsTableEnv.toAppendStream(result, Row.class).print();
>>>>         bsTableEnv.execute("sample job");
>>>>     }
>>>> }
>>>>
>>>>
>>>> 2) If I am using a blink planner should I use TableEnvironment or
>>>> StreamTableEnvironment ?
>>>>
>>>> 3) Why flink current stable documentation(1.9) recommends (old
>>>> planner)? any rough timeline on when we would be able to use blink planner
>>>> in production? perhaps 1.10 or 1.11?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>

Reply via email to