Re: Job Manager is taking very long time to finalize the Checkpointing.

2020-11-18 Thread Yun Tang
Hi Slim You could check the logs of taskmanager to see whether incremental checkpoint is really enabled (or you could find whether files existed under /opt/flink/pv/checkpoints/c0580ec8f55fcf1e0ceaa46fc3778b99/shared to judge). If your configuration of rocksDB and incremental-checkpoingt is real

RE: Random Task executor shutdown

2020-11-18 Thread LINZ, Arnaud
Hello, Actually the log is more complete when the application ends, and it’s a Zookeeper related issue. I took another log. Job Manager’s log: (…) 2020-11-12 14:34:09,798 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint atte

Re: What happens when a job is rescaled

2020-11-18 Thread Yun Tang
Hi Richard, Since you did not provide the information of which state backend you use, I would give the phase of rescaling from externalized checkpoint for two different state backends: For RocksDB: 1) If parallelism not changed, downloading all sst files and then just open the files as one roc

RE: Random Task executor shutdown

2020-11-18 Thread LINZ, Arnaud
Hello, We are wondering whether it is related to https://issues.apache.org/jira/browse/ZOOKEEPER-2775 or not. What is the version of the shaded zookeeper client in Flink 1.10.0 ? Best, Arnaud De : LINZ, Arnaud Envoyé : mercredi 18 novembre 2020 09:39 À : 'Guowei Ma' Cc : user Objet : RE: Rando

Union SingleOutputSteramOperator and getSideOutput doesn't work

2020-11-18 Thread Efrat Abramovitz
Cheers, We have stumbled upon an issue regarding union streams after they have a tagged side output, it seems we cannot extract side output anymore. Issue: SingleOutputSteramOperator stream cease to be SingleOutputSteramOperator after union, and cannot perform getSideOutput. Specifically i

RE: Random Task executor shutdown

2020-11-18 Thread LINZ, Arnaud
Hi, It’s 3.4.10 and does contain the bug. I’ll patch my flink client and see if it happens again. Best regards, Arnaud De : LINZ, Arnaud Envoyé : mercredi 18 novembre 2020 10:35 À : 'Guowei Ma' Cc : 'user' Objet : RE: Random Task executor shutdown Hello, We are wondering whether it is related

Re: IllegalStateException Printing Plan

2020-11-18 Thread Dawid Wysakowicz
I think it is currently not possible to get the AST as JSON. There is a similar feature request here: https://issues.apache.org/jira/browse/FLINK-19687 Best, Dawid On 17/11/2020 20:59, Rex Fenley wrote: > So I tried|userDocsTable.explain()| however it doesn't give me the AST > as JSON so that I

Re: execution.runtime-mode=BATCH when reading from Hive

2020-11-18 Thread Aljoscha Krettek
Hi Dongwon, Unfortunately, it's not that easy right now because normal Sinks that rely on checkpointing to write out data, such as Kafka, don't work in BATCH execution mode because we don't have checkopoints there. It will work, however, if you use a source that doesn't rely on checkpointing i

Re: How to run a per-job cluster for a Beam pipeline w/ FlinkRunner on YARN

2020-11-18 Thread Aljoscha Krettek
Yes, these options are yarn-specific, but you can specify arbitrary options using -Dfoo=bar. And yes, sorry about the confusion but -e is the parameter to use on Flink 1.10, it's equivalent. Best, Aljoscha On 17.11.20 16:37, Dongwon Kim wrote: Hi Aljoscha, Thanks for the input. The '-t' op

Strange behaviour when using RMQSource in Flink 1.11.2

2020-11-18 Thread Thomas Eckestad
Hi, we are using the RabbitMQ source connector with exactly-once guarantees. For this to work, according to the official Flink documentation, we are supplying correlation IDs with each published message and we use a parallelism of one with the Flink job being the single/only consumer of the que

[no subject]

2020-11-18 Thread Denis Nutiu
Hello everyone! I'm new to Apache Flink and I would like to get some opinions on how I should deploy my Flink jobs. Let's say I want to do sentiment analysis for Slack workspaces. I have 10 companies each having 2 slack workspaces. How should I deploy Flink jobs if I'd like to efficiently utiliz

Re: Union SingleOutputSteramOperator and getSideOutput doesn't work

2020-11-18 Thread Aljoscha Krettek
Hi, I'm afraid you stumbled across an inconsistency in the API. In the Java API we differentiate between DataStream and SingleOutputStreamOperator where the latter is used for "physical" operations that, among other things, allow things like getting side outputs. The Scala API hides this dif

Re: Lateral join not finding correlate variable

2020-11-18 Thread godfrey he
Hi Dylan, Could you provide which Flink version you find out the problem with? I test the above query on master, and I get the plan, no errors occur. Here is my test case: @Test def testLateralJoin(): Unit = { util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1,

Re: Flink on YARN: delegation token expired prevent job restart

2020-11-18 Thread Truong Duc Kien
Hi all, So I've checked the log and it seems that the expired delegation error was triggered during resource localization. Maybe there's something wrong with my Hadoop setup, NMs are supposed to get a good token from RM in order to localize resources automatically. Regards, Kiên 2020-11-17 10:28

Re: Lateral join not finding correlate variable

2020-11-18 Thread Dylan Forciea
Godfrey, I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am still having the same issue. Note that I am using the JDBC Connector for the input tables, and table1 and table2 are actually created from queries on those connector tables and not directly. Since you indicat

Re: Upsert UDFs

2020-11-18 Thread Jark Wu
Hi Rex, Sorry for the late response. Under the hood, if the UDTAF only implements `emitValue`, then the framework will call `emitValue` for every input record. Assuming this is a TopN UDTAF, the implementation of `emitValue` returns set [A, B, C] for input1 and returns set [A, B, D] for input2, t

Re: Force Join Unique Key

2020-11-18 Thread Jark Wu
Hi Rex, Currently, the join operator may use 3 kinds of state structure depending on the input key and join key information. 1) input doesn't have a unique key => MapState, where the map key is the input row and the map value is the number of equal rows. 2) input has unique key, but the unique k

How do i load mysql data into task

2020-11-18 Thread ?g???U?[????
Hi all      How to use DataStream to load mysql data into the memory of flink task when the task is initialized? Please give me a demo. Thanks, Jiazhi

Re:

2020-11-18 Thread Arvid Heise
It's not clear to me if you deploy streaming applications or batch jobs. In case of a batch job, you probably want to get everything into one big job to use resources as efficiently as possible. I'm assuming stream for the remainder of this mail. The granularity of the job depends more on your op

Re: execution.runtime-mode=BATCH when reading from Hive

2020-11-18 Thread Dongwon Kim
Hi Aljoscha, Unfortunately, it's not that easy right now because normal Sinks that > rely on checkpointing to write out data, such as Kafka, don't work in > BATCH execution mode because we don't have checkpoints there. It will > work, however, if you use a source that doesn't rely on checkpointing

Re: Force Join Unique Key

2020-11-18 Thread Rex Fenley
Thanks for the info. So even if there is no unique key inferred for a Row, the set of rows to join on each Join key should effectively still be an O(1) lookup if the join key is unique right? Also, I've been digging around the code to find where the lookup of rows for a join key happens and haven

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-18 Thread Pierre Oberholzer
Hi Wei, It works ! Thanks a lot for your support. I hadn't tried this last combination for option 1, and I had wrong syntax for option 2. So to summarize.. Methods working: - Current: DataTypeHint in UDF definition + SQL for UDF registering - Outdated: override getResultType in UDF definition +

Jdbc input format and system properties

2020-11-18 Thread Flavio Pompermaier
Hi to all, while trying to solve a leak with dynamic class loading I found out that mysql connector creates an AbandonedConnectionCleanupThread that is retained in the ChildFirstClassLoader..from version 8.0.22 there's the possibility to inhibit this thread passing the system property com.mysql.dis

How to set keystore.jks location on EMR when reading Kafka topics via SSL

2020-11-18 Thread Fanbin Bu
Hi, This is a repost with modified subject per Sri Tummala's suggestion. I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried to put keystore.jks location under /usr/lib/flink/... like: export SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/

Re: How to convert Int to Date

2020-11-18 Thread Rex Fenley
Looks like using a cast from INT to DATE worked out just fine. Thanks! On Tue, Nov 17, 2020 at 6:39 AM Timo Walther wrote: > Hi Rex, > > the classes mentioned in the documentation such as `int` and > `java.lang.Integer` are only used when you leave the SQL world to a UDF > or to a Java implemen

Re: How to set keystore.jks location on EMR when reading Kafka topics via SSL

2020-11-18 Thread Fanbin Bu
i have to put the keystore file to the nodes. On Wed, Nov 18, 2020 at 4:29 PM Fanbin Bu wrote: > Hi, > > This is a repost with modified subject per Sri Tummala's suggestion. > > I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I > tried to put keystore.jks location under /usr/

Re: Kafka SQL table Re-partition via Flink SQL

2020-11-18 Thread Slim Bouguerra
Hi Jark Thanks very much will this work with Avro On Tue, Nov 17, 2020 at 07:44 Jark Wu wrote: > Hi Slim, > > In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and > set the class name to 'sink.partitioner' option. > > In 1.12, you can re-partition the data by specifying the

Re: Kafka SQL table Re-partition via Flink SQL

2020-11-18 Thread Jark Wu
Yes, it works with all the formats supported by the kafka connector. On Thu, 19 Nov 2020 at 10:18, Slim Bouguerra wrote: > Hi Jark > Thanks very much will this work with Avro > > On Tue, Nov 17, 2020 at 07:44 Jark Wu wrote: > >> Hi Slim, >> >> In 1.11, I think you have to implement a custom Fli

Re: Lateral join not finding correlate variable

2020-11-18 Thread godfrey he
Dylan, Thanks for you feedback, if the planner encounters "unexpected correlate variable $cor2 in the plan" exception, There's a high probability that FlinkDecorrelateProgram has some bugs or the query pattern is not supported now. I try to use JDBC Connector as the input tables, but I still don't

Re: Force Join Unique Key

2020-11-18 Thread Jark Wu
Actually, if there is no unique key, it's not O(1), because there maybe multiple rows are joined by the join key, i.e. iterate all the values in the MapState under the current key, this is a "seek" operation on rocksdb which is not efficient. Are you asking where the join key is set? The join key

Re: Kafka SQL table Re-partition via Flink SQL

2020-11-18 Thread Slim Bouguerra
Great, thanks! On Wed, Nov 18, 2020 at 18:21 Jark Wu wrote: > Yes, it works with all the formats supported by the kafka connector. > > On Thu, 19 Nov 2020 at 10:18, Slim Bouguerra > wrote: > >> Hi Jark >> Thanks very much will this work with Avro >> >> On Tue, Nov 17, 2020 at 07:44 Jark Wu wro

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-18 Thread Wei Zhong
Hi Pierre, Currently there is no type hint like ‘Map[String, Any]’. The recommended way is declaring your type more explicitly. If you insist on doing this, you can try to declaring a RAW data type for java.util.HashMap [1], but you may encounter some troubles [2] related to the kryo serialize

Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-18 Thread Danny Chan
Yes, this is a bug which is fixed recently [1] for release 1.12 and 1.11.3 You can also switch to the source table catalog first before you execute the CREATE TABLE LIKE DDL just like Ingo suggested. [1] https://issues.apache.org/jira/browse/FLINK-19281 김동원 于2020年11月17日周二 上午12:19写道: > Hi Ingo,

Re: Job Manager is taking very long time to finalize the Checkpointing.

2020-11-18 Thread Yun Tang
Hi Slim Have you ever checked whether the job is on backpressure during the checkpoint, and you could check the checkpoint details via web UI [1] to see the duration of sync & async phase. BTW, I cannot see the "IOException: The rpc invocation size 199965215 exceeds the maximum akka framesize.

Re: Lateral join not finding correlate variable

2020-11-18 Thread Dylan Forciea
Godfrey, I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace running exactly this code: import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.

Re: Upsert UDFs

2020-11-18 Thread Rex Fenley
Wow, that sounds definitively better. I'll try porting our aggregates over to using `emitUpdateWithRetract` then. I'm assuming the Elasticsearch SQL connector will respond appropriately. Thanks for the help! On Wed, Nov 18, 2020 at 7:20 AM Jark Wu wrote: > Hi Rex, > > Sorry for the late respons

Re: Force Join Unique Key

2020-11-18 Thread Rex Fenley
Ok, but if there is only 1 row per Join key on either side of the join, then wouldn't "iterate all the values in the MapState under the current key" effectively be "iterate 1 value in MapState under the current key" which would be O(1)? Or are you saying that it must seek across the entire dataset

Re: Force Join Unique Key

2020-11-18 Thread Jark Wu
Yes, exactly. The rocksdb has to "seek" data sets because it doesn't know how many entries are under the join key. On Thu, 19 Nov 2020 at 13:38, Rex Fenley wrote: > Ok, but if there is only 1 row per Join key on either side of the join, > then wouldn't "iterate all the values in the MapState und

Filter Null in Array in SQL Connector

2020-11-18 Thread Rex Fenley
Hi, I recently discovered some of our data has NULL values arriving in an ARRAY column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator i

Re: Strange behaviour when using RMQSource in Flink 1.11.2

2020-11-18 Thread Andrey Zagrebin
Hi Thomas, I am not an expert on RMQSource connector but your concerns look valid. Could you file a Jira issue in Flink issue tracker? [1] I cannot immediately refer to a committer who could help with this but let's hope that the issue gets attention. If you want to contribute an improvement for