hi kant, "FULL OUTER JOIN" job will generate retract message, so toRetractStream is required to guarantee the correctness. I think it's better to use StreamExecutionEnvrionment.execute, because you have converted the Table to DataStream.
kant kodali <kanth...@gmail.com> 于2020年1月19日周日 上午11:59写道: > Hi Godfrey, > > I was just clicking the run button on my IDE and it doesn't really show me > errors so I used command line fink run <jar> and that shows me what the > error is. It tells me I need to change to toRetractStream() and both > StreamExecutionEnvrionment and StreamTableEnvrionment .execute seems to > work fine although I am not sure which one is the correct usage. > > Thanks! > > On Sat, Jan 18, 2020 at 6:52 PM kant kodali <kanth...@gmail.com> wrote: > >> Hi Godfrey, >> >> Thanks a lot for your response. I just tried it with env.execute("simple >> job") but I still get the same error message. >> >> Kant >> >> On Sat, Jan 18, 2020 at 6:26 PM godfrey he <godfre...@gmail.com> wrote: >> >>> hi kant, >>> >>> > 1) The Documentation says full outer join is supported however the >>> below code just exits with value 1. No error message. >>> if you have converted Table to DataStream, please execute it >>> with StreamExecutionEnvironment ( call env.execute("simple job") ) >>> >>> > 2) If I am using a blink planner should I use TableEnvironment or >>> StreamTableEnvironment ? >>> for streaming job, both Environment can be used. the difference is: >>> TableEnvironment will optimize multiple queries into one DAG when >>> executing, while StreamTableEnvironment will independent optimize each >>> query. >>> StreamTableEnvironment supports convert from/to DataStream, >>> while TableEnvironment does not support it. >>> StreamTableEnvironment supports register TableFunction >>> and AggregateFunction, while TableEnvironment does not support it now. >>> >>> for batch job, only TableEnvironment is the only choice, because >>> DataStream does not support batch job now. >>> >>> > 3) Why flink current stable documentation(1.9) recommends (old >>> planner)? any rough timeline on when we would be able to use blink planner >>> in production? perhaps 1.10 or 1.11? >>> 1.9 is blink planner's first version, and it is unstable. In 1.10, blink >>> planner is more statable, we are switching the blink planner to the default >>> step by step [0]. >>> >>> [0] >>> http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E >>> >>> kant kodali <kanth...@gmail.com> 于2020年1月18日周六 下午5:40写道: >>> >>>> Hi All, >>>> >>>> 1) The Documentation says full outer join is supported however the >>>> below code just exits with value 1. No error message. >>>> >>>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>> import >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>>> import org.apache.flink.table.api.*; >>>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>>> import org.apache.flink.types.Row; >>>> >>>> import java.util.Properties; >>>> >>>> public class Test { >>>> >>>> public static void main(String... args) throws Exception { >>>> >>>> EnvironmentSettings bsSettings = >>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>> final StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> StreamTableEnvironment bsTableEnv = >>>> StreamTableEnvironment.create(env, bsSettings); >>>> >>>> Properties properties = new Properties(); >>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>> properties.setProperty("group.id", "test"); >>>> >>>> FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>( >>>> java.util.regex.Pattern.compile("test-topic1"), >>>> new SimpleStringSchema(), >>>> properties); >>>> FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>( >>>> java.util.regex.Pattern.compile("test-topic2"), >>>> new SimpleStringSchema(), >>>> properties); >>>> >>>> DataStream<String> stream1 = env.addSource(consumer1); >>>> DataStream<String> stream2 = env.addSource(consumer2); >>>> >>>> bsTableEnv.registerDataStream("sample1", stream1); >>>> bsTableEnv.registerDataStream("sample2", stream2); >>>> >>>> Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL >>>> OUTER JOIN sample2 on sample1.f0=sample2.f0"); >>>> result.printSchema(); >>>> >>>> bsTableEnv.toAppendStream(result, Row.class).print(); >>>> bsTableEnv.execute("sample job"); >>>> } >>>> } >>>> >>>> >>>> 2) If I am using a blink planner should I use TableEnvironment or >>>> StreamTableEnvironment ? >>>> >>>> 3) Why flink current stable documentation(1.9) recommends (old >>>> planner)? any rough timeline on when we would be able to use blink planner >>>> in production? perhaps 1.10 or 1.11? >>>> >>>> Thanks! >>>> >>>> >>>>