Re:Re: How to use both of SQL and DataStream in 1.11

2020-11-02 Thread izual
, and then submit the job with TableResult StatementSet.execute() izual 于2020年11月2日周一 上午11:28写道: Hi, community: We used flink 1.9.1, both SQL and DataStream API to support multiple sinks for product envs. For example, tableEnv.sqlUpdate("INSERT INTO dest1 S

How to use both of SQL and DataStream in 1.11

2020-11-01 Thread izual
Hi, community: We used flink 1.9.1, both SQL and DataStream API to support multiple sinks for product envs. For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and table.toAppendStream[Row].addSink(new RichSinkFunction[Row] {...}).name("dest2"), and env.execute() to submit t

Re:Re: Questions of "State Processing API in Scala"

2020-08-31 Thread izual
-01-21 15:54:56, "Tzu-Li (Gordon) Tai" wrote: >Hi Izual, > >Thanks for reporting this! I'm also forwarding this to the user mailing >list, as that is the more suitable place for this question. > >I think the usability of the State Processor API in Scala is indeed &

Re:Re: How to retain the column'name when convert a Table to DataStream

2020-07-31 Thread izual
ood way how you could retain the column names. :( Best, Dawid On 28/07/2020 10:26, izual wrote: Hi, Community: I met some field name errors when try to convert in Table and DataStream. flink version: 1.9.1 First, init a datastream and convert to table 'source', regi

How to retain the column'name when convert a Table to DataStream

2020-07-28 Thread izual
Hi, Community: I met some field name errors when try to convert in Table and DataStream. flink version: 1.9.1 First, init a datastream and convert to table 'source', register a tablefunction named 'foo' val sourceStream = env.socketTextStream("127.0.0.1", 8010) .map(line => line.toInt) tab

Re:Re: Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread izual
ound when the result does not change? At 2020-04-28 10:53:34, "Jark Wu" wrote: Hi izual, In such case, I think you should try COUNT DISTINCT instead of COUNT. DISTINCT will help to deduplicate, so no matter how many times you received id=1, the region count should always 3. SELE

Re:Re: How to disable the state behind `COUNT` sql?

2020-04-27 Thread izual
1min can resolve your issue. If not, maybe you need to change your dimension table, making it return the count directly instead of return the details. izual 于2020年4月27日周一 下午5:06写道: I implements my DimTable by extends `LookupTableSource`[1], which stores data like: id=1 -> (SH, BJ, SD)

How to disable the state behind `COUNT` sql?

2020-04-27 Thread izual
I implements my DimTable by extends `LookupTableSource`[1], which stores data like: id=1 -> (SH, BJ, SD) id=2 -> (...) and then extends `TableFunction` to return the value corresponding to the lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, then in the `TableFun

Re:Re: RuntimeException: Could not instantiate generated class 'StreamExecCalc$23166'

2020-04-23 Thread izual
, but Flink should have split it into many shorter methods (see TableConfig#maxGeneratedCodeLength). By default Flink will split methods longer than 64KB into shorter ones. izual 于2020年4月23日周四 下午6:34写道: Hi,Community: I add 4 complicated sqls in one job, and the job looks running well.

RuntimeException: Could not instantiate generated class 'StreamExecCalc$23166'

2020-04-23 Thread izual
Hi,Community: I add 4 complicated sqls in one job, and the job looks running well. But when I try to add 5th sql,the job failed at the beginning。 And throws errors info below: java.lang.RuntimeException: Could not instantiate generated class 'StreamExecCalc$23166' at org.apache.flink.table.

How to make two insert-into sqls orderly

2020-03-25 Thread izual
We have two output sinks, and the order assurance is achieved by code like this: record.apply(insert_into_sink1).thenApply( recorder_2 = foo(record) recorder_2.insert_into_sink2 ) So when sink2 receives the record_2, record must be existed in sink1, then we can seek corresponding value of rec

Re:Re: Question of SQL join on nested field of right table

2020-03-25 Thread izual
oin which will use the equality condition fields as the lookup keys. However, nested fields can't be lookup keys for now. Is that possible to have a top-level fields as the join key in your case? Best, Jark On Wed, 25 Mar 2020 at 18:08, izual wrote: Hi, Community: I defined a di

Question of SQL join on nested field of right table

2020-03-25 Thread izual
Hi, Community: I defined a dim table(tblDim) with schema : root |-- dim_nested_fields: ROW<`id` INT, `product_name` STRING> and the part of SQL is : JOIN ... ON leftTable.`nested_field`.id = tblDim.`dim_nested_fields`.id. which will throw an exception like: Exception in thread "main" o

How to write a retract table to Kafka?

2020-02-26 Thread izual
Hi community:SQL contains aggregate functions, GROUP BY, etc,will generate a RetractStream, which type is DataStream[(Boolean, Row)]. It's not allowed to write to Kafka because kafka table is based on AppendStreamTableSink. If I only need to write ADD message to Kafka, is it possible to achieve

SideOutput Exception: "Output Tag must not be null"

2020-01-26 Thread izual
I followed docs[1] and SideOutputITCase.scala(unittest case from flink-master), but encountered an Exception: Caused by: java.lang.IllegalArgumentException: OutputTag must not be null. code snippet implemented by Scala ``` private final val backupOutputTag = OutputTag[String]("backup") v

Re: Questions of "State Processing API in Scala"

2020-01-21 Thread Izual
Sry for wrong post. > This can probably be confirmed by looking at the exception stack trace. > Can you post a full copy of that? I missed the history jobs, but I think u r right. When I debug the program to find reason, came into these code snippet. ``` TypeSerializerSchemaCompatibility result