Re: Upgrade calcite version

2021-03-10 Thread Danny Chan
basic and SQL planning (the relational algebra) that really few people have, so i would not suggest to do that at all for yourself. What is the purpose to must upgrade Calcite, can you share something ~ Best, Danny Chan 盛森林 于2021年2月4日周四 下午10:47写道: > Hi, > I want to upgrade calcite to 1

Re: Flink reads data from JDBC table only on startup

2020-12-30 Thread Danny Chan
For your case, you should use a temporal table join syntax, and set up a refresh TTL for the RHS join cache. Taras Moisiuk 于2020年12月28日周一 下午7:21写道: > Hi Danny, > > I use regular join and it looks like: > > SELECT > ... > FROM *dynamic_kafka_table* k > JOIN *jdbc_table* j ON k.id = j.k_id > > > S

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

2020-12-27 Thread Danny Chan
> SQL parse failed. Encount What syntax did you use ? > TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain RAW type back to TypeInformation. Did you try to construct type information by a new fresh TypeInformationRawType ? Yuval Itzchakov 于2020年12月24日周四 下午7:24写道: > An expansion to

Re: Flink reads data from JDBC table only on startup

2020-12-27 Thread Danny Chan
Hi Taras ~ There is a look up cache for temporal join but it default is false, see [1]. That means, by default FLINK SQL would lookup the external databases on each record from the JOIN LHS. Did you use the temporal table join syntax or normal stream-stream join syntax ? The temporal table join u

Re: Long latency when consuming a message from KAFKA and checkpoint is enabled

2020-12-27 Thread Danny Chan
Hi, Nick ~ The behavior is as expected, because Kafka source/sink relies on the Checkpoints to complement the exactly-once write semantics, a checkpoint snapshot the states on a time point which is used for recovering, the current internals for Kafka sink is that it writes to Kafka but only commits

Re: NullPointerException while calling TableEnvironment.sqlQuery in Flink 1.12

2020-12-21 Thread Danny Chan
Hi Yuval Itzchakov ~ The thread you paste has a different stake trace with your case. In the pasted thread, the JaninoRelMetadataProvider was missed because we only set it once in a thread local variable, when the RelMetadataQuery was used in a different working thread, the JaninoRelMetadataProvi

Re: Flink 1.12, Kafka Connector, JSON format - "Could not find a suitable table factory"

2020-12-10 Thread Danny Chan
One thing needs to note is that the old connectors are still in the release-1.11/release-1.12 jars. So the old option still works but with the old connector codes. You may need to find the root cause why the new options do not work, maybe some stack trace here ? abelm 于2020年12月10日周四 下午10:54写道:

Re: Flink 1.12, Kafka Connector, JSON format - "Could not find a suitable table factory"

2020-12-09 Thread Danny Chan
Hi, abelm ~ Which version Flink did you use? We did some refactoring for the connector options since Flink 1.11. The METADATA syntax is only supported since version 1.12. In 1.11, to ignore the parse errors, you need to use option "json.ignore-parse-error" [1] [1] https://ci.apache.org/projects/

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

2020-12-09 Thread Danny Chan
me, we'll be storing all these empty data sets in rocks for no > reason. To clarify, we're using Debezium as our source and using Flink as a > materialization engine, so we never want to explicitly set a timeout on any > of our data, we just want to scale up predictably with our

Re: what's meaning of the "true/false" from "groupy...select"?THANKS

2020-12-09 Thread Danny Chan
The "true" means the message is an insert/update after, the "false" means the message is a retraction (for the old record that needs to be modified). Appleyuchi 于2020年12月9日周三 下午12:20写道: > > The complete code is: > https://paste.ubuntu.com/p/hpWB87kT6P/ > > The result is: > 2> (true,1,diaper,4) >

Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

2020-12-09 Thread Danny Chan
Hi, Rex Fenley ~ If there is stateful operator as the output of the aggregate function. Then each time the function receives an update (or delete) for the key, the agg operator would emit 2 messages, one for retracting the old record, one for the new message. For your case, the new message is the

Re: lookup cache clarification

2020-12-09 Thread Danny Chan
Yes, you understand it correctly. Marco Villalobos 于2020年12月9日周三 上午4:23写道: > I set up the following lookup cache values: > > 'lookup.cache.max-rows' = '20' > 'lookup.cache.ttl' = '1min' > > for a jdbc connector. > > This table currently only has about 2 records in it. However, > since I

Re: How can I optimize joins or cache misses in SQL api?

2020-12-09 Thread Danny Chan
Hi, Marco Villalobos ~ It's nice to see that you choose the SQL API which is more concise and expressive. To answer some of your questions: > Q: Is there a way to control that? I don't want the N + 1 query problem. No, the SQL evaluate row by row, there maybe some optimizations internal that bu

Re: How to tell what mode a Table operator is in

2020-12-03 Thread Danny Chan
If a stateful operator has also a stateful operator in its input sub-pipeline, then it may receive retract messages. Operator like group agg, stream-stream join or rank are stateful. We can not show if the operator are receiving retract messages in the UI. But your request is reasonable. Rex Fenl

Re: Performance consequence of leftOuterJoinLateral

2020-12-01 Thread Danny Chan
Hi, Rex ~ For "leftOuterJoinLateral" do you mean join a table function through lateral table ? If it is, yes, the complexity is O(1) for each probe key of LHS. The table function evaluate the extra columns and append it to the left columns. Rex Fenley 于2020年12月2日周三 上午7:54写道: > Hello, > > I'm cu

Re: Filter Null in Array in SQL Connector

2020-12-01 Thread Danny Chan
> "type": "string", >>>>> "optional": true, >>>>> "name": "io.debezium.data.Enum", >>>>> "version": 1, >>>>> "pa

Re: Is there a way we can specify operator ID for DDLs?

2020-11-26 Thread Danny Chan
Here is the issue https://issues.apache.org/jira/browse/FLINK-20368 Kevin Kwon 于2020年11月26日周四 上午8:50写道: > thanks alot :) > > On Wed, Nov 25, 2020 at 3:26 PM Danny Chan wrote: > >> SQL does not support that now. But i think your request is reasonable. >> AFAIK . S

Re: Is there a way we can specify operator ID for DDLs?

2020-11-25 Thread Danny Chan
tor compatibility. I use Kafka as > source and I'd want to save the offsets through checkpoint > > I know how to do with DataStream API but not with plain SQL DDL > > On Wed, Nov 25, 2020, 3:09 AM Danny Chan wrote: > >> Hi Kevin Kwon ~ >> >> Do you want to cu

Re: Flink 1.11 avro format question

2020-11-25 Thread Danny Chan
For your question 1. This does not work as expected. I would check it soon to see if it is a bug and fire a fix. Hongjian Peng 于2020年11月25日 周三下午4:45写道: > In Flink 1.10, we can pass this schema with 'format.avro-schema' property > to SQL DDL, but in Flink 1.11, the Avro schema is always derived fr

Re: Flink 1.11 avro format question

2020-11-25 Thread Danny Chan
Hi Hongjian Peng ~ For your question 1, it is not work as expected. If it is true, there is definitely a bug. I would check and fix it later. For your question 2, yes. This is an intent design. There is a routine in the type inference: all the fields of a nullable struct type should also be nulla

Re: Is there a way we can specify operator ID for DDLs?

2020-11-24 Thread Danny Chan
Hi Kevin Kwon ~ Do you want to customize only the source operator name or all the operator name in order for the state compatibility ? State compatibility is an orthogonal topic and keep the operator name is one way to solve it. Kevin Kwon 于2020年11月25日周三 上午1:11写道: > For SQLs, I know that the o

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
quot;, > "fields": [ > { "type": "string", "optional": false, "field": "version" }, > { "type": "string", "optional": false, "field": "connector" }, &g

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
Can you also share your problematic json string here ? So that we can decide the specific error case cause. Rex Fenley 于2020年11月19日周四 下午2:51写道: > Hi, > > I recently discovered some of our data has NULL values arriving in an > ARRAY column. This column is being consumed by Flink via the Kafka > c

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
Hi, Fenley ~ You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~ [1] https://issues.apache.org/jira/browse/FLINK-20234 Rex Fenley 于2020年11月19日周四 下午2:51写道: > Hi, > > I recently discovered some of our data has NULL values arriving

Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-18 Thread Danny Chan
le(SqlCreateTableConverter.java:76) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)

Re: Table SQL Filesystem CSV recursive directory traversal

2020-11-13 Thread Danny Chan
In the current master code base, all the FileInputFormat default add the files recursively with the given paths. (e.g. the #addFilesInDir method). So it should be supported as default for SQL. Timo Walther 于2020年11月9日周一 下午11:25写道: > Hi Ruben, > > by looking at the code, it seems you should be a

Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-13 Thread Danny Chan
Hi Dongwon ~ Table from different catalog/db is supported, you need to specify the full path of the source table: CREATE TABLE Orders_with_watermark ( *...*) WITH ( *...*)LIKE my_catalog.my_db.Orders; Dongwon Kim 于2020年11月11日周三 下午2:53写道: > Hi, > > Is it disallowed to refer to a table

Re: Failure to execute streaming SQL query

2020-11-06 Thread Danny Chan
Hi, Satyam ~ What version of Flink release did you use? I tested your first SQL statements in local and they both works great. Your second SQL statement fails because currently we does not support stream-stream join on time attributes because the join would breaks the semantic of time attribute (

Re: Re: How to use both of SQL and DataStream in 1.11

2020-11-03 Thread Danny Chan
} > > ``` > > > So if use `StatementSet.addInsert`, should I must use > UpsertStreamTableSink and StreamTableSourceFactory to wrap the > RichSinkFunction? > > Is there a way to keep using DataStream API in table environment? which is > more expressive. > > > > &g

Re: How to use both of SQL and DataStream in 1.11

2020-11-02 Thread Danny Chan
You can still convert the datastream to table and register it with method void TableEnvironment.createTemporaryView(String path, Table view) Then create a StatementSet with StatementSet TableEnvironment.createStatementSet(), With the StatementSet, you can execute multiple insert statements alto

Re: quoted identifiers in Table SQL give SQL parse failed.

2020-10-29 Thread Danny Chan
Yes, Flink SQL use the back quote ` as the quote character, for your SQL, it should be: CREATE TABLE table1(`ts` TIMESTAMP) WITH(...) Ruben Laguna 于2020年10月29日周四 下午6:32写道: > I made this question on [Stackoverflow][1] but I'm cross posting here. > > > Are double quoted identifiers allowed in Fl

Re: Working with bounded Datastreams - Flink 1.11.1

2020-10-28 Thread Danny Chan
In SQL, you can use the over window to deduplicate the messages by the id [1], but i'm not sure if there are same semantic operators in DataStream. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication s_penakalap...@yahoo.com 于2020年10月28日周三 下午12:34写

Re: How to understand NOW() in SQL when using Table & SQL API to develop a streaming app?

2020-10-27 Thread Danny Chan
Our behavior also conflicts with the SQL standard, we should also mention this in the document. Till Rohrmann 于2020年10月27日周二 下午10:37写道: > Thanks for the clarification. This improvement would be helpful, I believe. > > Cheers, > Till > > On Tue, Oct 27, 2020 at 1:19 PM Jark Wu wrote: > >> Hi Til

Re: NullPointerException when trying to read null array in Postgres using JDBC Connector

2020-10-22 Thread Danny Chan
Yes, the current code throws directly for NULLs, can you log an issue there ? Dylan Forciea 于2020年10月21日周三 上午4:30写道: > I believe I am getting an error because I have a nullable postgres array > of text that is set to NULL that I’m reading using the JDBC SQL Connector. > Is this something that sh

Re: "stepless" sliding windows?

2020-10-22 Thread Danny Chan
The SLIDING window always triggers as of each step, what do you mean by "stepless" ? Alex Cruise 于2020年10月21日周三 上午1:52写道: > whoops.. as usual, posting led me to find some answers myself. Does this > make sense given my requirements? > > Thanks! > > private class MyWindowAssigner(val windowSize:

Re: Extract column and table lineage from flink sql

2020-10-22 Thread Danny Chan
after the scan or join is a source. If you got the rel tree, you can get the info by a shuttle, if an AST instead, you can have a SqlVisitor. Danny Chan 于2020年10月22日周四 下午5:24写道: > Hi, dawangli ~ > > Usually people build the lineage of tables through a self-built platform, > there was a D

Re: flink watermark strategy

2020-08-30 Thread Danny Chan
Watermark mainly serves for windows for the late arrive data, it actually reduces your performance. Best, Danny Chan 在 2020年8月29日 +0800 AM3:09,Vijayendra Yadav ,写道: > Hi Team, > > For regular unbounded streaming application streaming through kafka, which  > does not use any event ti

user@flink.apache.org

2020-08-30 Thread Danny Chan
considered “late”. [1]  https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins [2]  https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html Best, Danny Chan 在 2020年8月29日 +0800 AM12:59,Sofya T. Irwin ,写道: > Hi Danny, > &

user@flink.apache.org

2020-08-27 Thread Danny Chan
Hi, Sofya T. Irwin ~ Can you share your case why you need a timed-window join there ? Now the sql timed window join is not supported yet, and i want to hear your voice if it is necessary to support in SQL. Sofya T. Irwin 于2020年7月30日周四 下午10:44写道: > Hi, > I'm trying to investigate a SQL job usi

[Survey] Demand collection for stream SQL window join

2020-08-26 Thread Danny Chan
/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html Best, Danny Chan

Re: How to visit outer service in batch for sql

2020-08-26 Thread Danny Chan
Hi, did you try to define a UDAF there within your group window sql, where you can have a custom service there. I’m afraid you are right, SQL only supports time windows. Best, Danny Chan 在 2020年8月26日 +0800 PM8:02,刘建刚 ,写道: >       For API, we can visit outer service in batch through countWin

Re: flink interval join后按窗口聚组问题

2020-08-26 Thread Danny Chan
operators. See KeyedCoProcessOperatorWithWatermarkDelay and RowTimeIntervalJoin.getMaxOutputDelay for details. Best, Danny Chan 在 2020年7月3日 +0800 PM3:29,元始(Bob Hu) <657390...@qq.com>,写道: > 您好,我想请教一个问题: > flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。 > 比如关联条件是se

Re: How to write a customer sink partitioner when using flinksql kafka-connector

2020-08-18 Thread Danny Chan
-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java Best, Danny Chan 在 2020年8月18日 +0800 PM8:18,wangl...@geekplus.com ,写道: > > > CREATE TABLE kafka_sink_table( >  warehouse_id INT, >  pack_task_order_id BIGINT, >  out

Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

2020-08-13 Thread Danny Chan
Weighing ~ tEnv.executeSql would execute the SQL asynchronously, e.g. submitting a job to the backend cluster with a builtin job name, the tEnv.executeSql itself did return a JobResult immediately with a constant affected rows count -1. Best, Danny Chan 在 2020年8月13日 +0800 PM3:46,Lu Weizheng ,写

Re: Two Queries and a Kafka Topic

2020-08-11 Thread Danny Chan
/temporal_tables.html#temporal-table Best, Danny Chan 在 2020年8月5日 +0800 AM4:34,Marco Villalobos ,写道: > Lets say that I have: > > SQL Query One from data in PostgreSQL (200K records). > SQL Query Two from data in PostgreSQL (1000 records). > and Kafka Topic One. > > Let's also say that mai

Re: Does Flink automatically apply any backpressure ?

2020-08-03 Thread Danny Chan
Yes, just like Jake said, the back pressure happened automatically and usually there is no need to tweak it, you actually can have configure the metrics about it, see [1] [1]  https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html Best, Danny Chan 在 2020年7月31日

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Danny Chan
ing ) partitioned by (date string) In which you can declare the partition column name & type at the same time. Best, Danny Chan 在 2020年7月21日 +0800 PM11:30,Dongwon Kim ,写道: > Thanks Jark for the update. > > However, getting back to the original question, can I use a nested column > direct

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Danny Chan
7; = 'json' |) LIKE navi (EXCLUDING ALL) |""".stripMargin tableEnv.executeSql(sql) In master branch, both are correct, can you share you stack trace detail ? Which version did you use and what SQL context throws the error ? Best, Danny Chan 在 2020年7月21日 +0800

Re: Encoding problem in WHERE LIKE

2020-07-20 Thread Danny Chan
Hi, Dongwon ~ > Caused by: org.apache.calcite.sql.parser.SqlParseException: Lexical error at > line 1, column 96.  Encountered The error did report the position, you can take a reference to see which syntax context caused the problem. Best, Danny Chan 在 2020年7月20日 +0800 PM11:10,Dongwon

Re: Flink SQL - Join Lookup Table

2020-07-20 Thread Danny Chan
Best, Danny Chan 在 2020年7月21日 +0800 AM6:32,Kelly Smith ,写道: > Hi folks, > > I have a question Flink SQL. What I want to do is this: > > > • Join a simple lookup table (a few rows) to a stream of data to enrich the > stream by adding a column from the lookup table. > >

Fwd: Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Danny Chan
Best, Danny Chan -- 转发信息 -- 发件人: Danny Chan 日期: 2020年7月20日 +0800 PM4:51 收件人: Dongwon Kim 主题: Re: [Table API] how to configure a nested timestamp field > Or is it possible you pre-define a catalog there and register through the SQL > CLI yaml ? > > Best, > Danny

Re: HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

2020-07-16 Thread Danny Chan
I suspect there are some inconsistency in the nullability of the whole record field, can you compare the 2 schema and see the diff ? For a table, you can get the TableSchema first and print it out. Best, Danny Chan 在 2020年7月16日 +0800 AM10:56,Leonard Xu ,写道: > Hi, Jim > > Could you p

Re: Table API throws "No FileSystem for scheme: file" when loading local parquet

2020-07-12 Thread Danny Chan
> No FileSystem for scheme: file It seems that your path does not work correctly, from the patch you gave, the directly name 'test.parquet’ seems invalid. Best, Danny Chan 在 2020年7月11日 +0800 AM8:07,Danny Chan ,写道: > > It seems that your path does not work correctly, from the patch

Re: Does Flink support TFRecordFileOutputFormat?

2020-07-12 Thread Danny Chan
I didn’t see any class named TFRecordFileOutputFormat in Spark, for TF do you mean TensorFlow ? Best, Danny Chan 在 2020年7月10日 +0800 PM5:28,殿李 ,写道: > Hi, > > Does Flink support TFRecordFileOutputFormat? I can't find the relevant > information in the document. > > As

Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-08 Thread Danny Chan
Thanks Zhijiang and Piotr for the great work as release manager, and thanks everyone who makes the release possible! Best, Danny Chan 在 2020年7月8日 +0800 PM4:59,Congxian Qiu ,写道: > > Thanks Zhijiang and Piotr for the great work as release manager, and thanks > everyone who makes th

Re: can't exectue query when table type is datagen

2020-07-06 Thread Danny Chan
well. [1]  https://github.com/apache/flink/blob/1b1c343e8964c6400c7c1de3c70212522ba59a64/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java#L86 [2] https://issues.apache.org/jira/browse/FLINK-18500 Best, Danny Chan 在 2020年7月5日 +0800 AM10:52,xin Destiny ,写

Re: Dynamic source and sink.

2020-07-01 Thread Danny Chan
, Danny Chan 在 2020年6月28日 +0800 PM3:30,C DINESH ,写道: > Hi All, > > In a flink job I have a pipeline. It is consuming data from one kafka topic > and storing data to Elastic search cluster. > > without restarting the job can we add another kafka cluster and another > elastic s

Re: Flink table modules

2020-04-26 Thread Danny Chan
ays resolves the object reference to the one in the 1st loaded module. Here is the doc why we evolves this feature [1] [1]  https://cwiki.apache.org/confluence/display/FLINK/FLIP-68%3A+Extend+Core+Table+System+with+Pluggable+Modules Best, Danny Chan 在 2020年4月23日 +0800 PM11:12,Flavio Pompermaier

Re: LookupableTableSource from kafka consumer

2020-04-21 Thread Danny Chan
, Danny Chan 在 2020年4月21日 +0800 PM8:02,Jark Wu ,写道: > Hey, > > You can take JDBCTableSource [1] as an example about how to implement a > LookupableTableSource. > However, I'm not sure how to support lookup for kafka. Because AFAIK, kafka > doesn't have the ability to loo

Re: Flink upgrade to 1.10: function

2020-04-21 Thread Danny Chan
The JSON_VALUE was coded into the parser, which is always parsed as the builtin operator, so there is no change to override it yet. I have fired an issue[1] to track this and hope we can resolve it in the next Calcite release. [1] https://issues.apache.org/jira/browse/CALCITE-3943 Best, Danny