Hi Godfrey, I was just clicking the run button on my IDE and it doesn't really show me errors so I used command line fink run <jar> and that shows me what the error is. It tells me I need to change to toRetractStream() and both StreamExecutionEnvrionment and StreamTableEnvrionment .execute seems to work fine although I am not sure which one is the correct usage.
Thanks! On Sat, Jan 18, 2020 at 6:52 PM kant kodali <kanth...@gmail.com> wrote: > Hi Godfrey, > > Thanks a lot for your response. I just tried it with env.execute("simple > job") but I still get the same error message. > > Kant > > On Sat, Jan 18, 2020 at 6:26 PM godfrey he <godfre...@gmail.com> wrote: > >> hi kant, >> >> > 1) The Documentation says full outer join is supported however the >> below code just exits with value 1. No error message. >> if you have converted Table to DataStream, please execute it >> with StreamExecutionEnvironment ( call env.execute("simple job") ) >> >> > 2) If I am using a blink planner should I use TableEnvironment or >> StreamTableEnvironment ? >> for streaming job, both Environment can be used. the difference is: >> TableEnvironment will optimize multiple queries into one DAG when >> executing, while StreamTableEnvironment will independent optimize each >> query. >> StreamTableEnvironment supports convert from/to DataStream, >> while TableEnvironment does not support it. >> StreamTableEnvironment supports register TableFunction >> and AggregateFunction, while TableEnvironment does not support it now. >> >> for batch job, only TableEnvironment is the only choice, because >> DataStream does not support batch job now. >> >> > 3) Why flink current stable documentation(1.9) recommends (old >> planner)? any rough timeline on when we would be able to use blink planner >> in production? perhaps 1.10 or 1.11? >> 1.9 is blink planner's first version, and it is unstable. In 1.10, blink >> planner is more statable, we are switching the blink planner to the default >> step by step [0]. >> >> [0] >> http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E >> >> kant kodali <kanth...@gmail.com> 于2020年1月18日周六 下午5:40写道: >> >>> Hi All, >>> >>> 1) The Documentation says full outer join is supported however the below >>> code just exits with value 1. No error message. >>> >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>> import org.apache.flink.streaming.api.datastream.DataStream; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>> import org.apache.flink.table.api.*; >>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>> import org.apache.flink.types.Row; >>> >>> import java.util.Properties; >>> >>> public class Test { >>> >>> public static void main(String... args) throws Exception { >>> >>> EnvironmentSettings bsSettings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> StreamTableEnvironment bsTableEnv = >>> StreamTableEnvironment.create(env, bsSettings); >>> >>> Properties properties = new Properties(); >>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>> properties.setProperty("group.id", "test"); >>> >>> FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>( >>> java.util.regex.Pattern.compile("test-topic1"), >>> new SimpleStringSchema(), >>> properties); >>> FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>( >>> java.util.regex.Pattern.compile("test-topic2"), >>> new SimpleStringSchema(), >>> properties); >>> >>> DataStream<String> stream1 = env.addSource(consumer1); >>> DataStream<String> stream2 = env.addSource(consumer2); >>> >>> bsTableEnv.registerDataStream("sample1", stream1); >>> bsTableEnv.registerDataStream("sample2", stream2); >>> >>> Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL >>> OUTER JOIN sample2 on sample1.f0=sample2.f0"); >>> result.printSchema(); >>> >>> bsTableEnv.toAppendStream(result, Row.class).print(); >>> bsTableEnv.execute("sample job"); >>> } >>> } >>> >>> >>> 2) If I am using a blink planner should I use TableEnvironment or >>> StreamTableEnvironment ? >>> >>> 3) Why flink current stable documentation(1.9) recommends (old planner)? >>> any rough timeline on when we would be able to use blink planner in >>> production? perhaps 1.10 or 1.11? >>> >>> Thanks! >>> >>> >>>