Re: Lateral join not finding correlate variable

2020-11-19 Thread godfrey he
Hi Dylan, I have reproduced your issue based on your code, Currently Flink does not support such nested correlate pattern query. I have created a issue to track this [1]. Thanks for your reporting and help. [1] https://issues.apache.org/jira/browse/FLINK-20255 Best, Godfrey Dylan Forciea 于2020

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-19 Thread Arvid Heise
I still haven't fully understood. Do you mean you can't infer the timestamp in source A because it depends on some internal field of source B? How is that actually working in a parallel setting? Which timestamp is used in the different instances of a source? Say, we have task A1 which is the firs

Re: Jdbc input format and system properties

2020-11-19 Thread Arvid Heise
Hi Flavio, if it arrives in the java process then you are doing everything right already (or almost). Are you shading the mysql connector? I'm suspecting that the property also get shaded then. You could decompile your jar to be sure. Have you verified that this is working as intended without Fli

Re: Flink on block storage in k8s

2020-11-19 Thread Yang Wang
Hi George, If you PVCs could be mounted ReadWriteMany[1], then I think Flink could be deployed on these PVs. However, for the high availability enabled, you still need a distributed coordination system(ZooKeeper, or the new introduced Kubernetes HA[2]) for the leader election/retrieval and meta st

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Rex Fenley
Thanks! Update: We've confirmed with a test copy of our data now that if we remove all the null values from arrays everything works smoothly and as expected. So this definitely appears to be the culprit. On Thu, Nov 19, 2020 at 6:41 PM Jark Wu wrote: > Thanks Rex! This is very helpful. Will che

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
I checked with the following json: { "schema":{ "type":"struct", "fields":[ { "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" },

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-19 Thread Si-li Liu
Thanks for your help! Now the timestamps already go with the items in streaming. My streaming pipeline is like this: source -> parser --shuffle--> join -> sink Streaming A and streaming B go through this pipeline, I keep logs in streaming A in memory cache (linkedHashmap) in join operator, then

Re: Force Join Unique Key

2020-11-19 Thread Rex Fenley
I'm reading your response as rocksdb having to seek across the whole dataset for the whole table, which we hope to avoid. What are the rules for the unique key and unique join key inference? Maybe we can reorganize our plan to allow it to infer unique keys more correctly. Thanks On Wed, Nov 18,

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Jark Wu
Thanks Rex! This is very helpful. Will check it out later. On Fri, 20 Nov 2020 at 03:02, Rex Fenley wrote: > Below is a highly redacted set of data that should represent the problem. > As you can see, the "roles" field has "[null]" in it, a null value within > the array. We also see in our DB c

Re: State of Machine Learning with Flink and especially FLIP-39

2020-11-19 Thread Becket Qin
Hi Niklas, We dropped the Flink ML lib in 1.9 and plan to replace it with a new machine learning library for traditional machine learning algorithms. And that library will be based on FLIP-39. The plan was pushed back a little bit because we plan to deprecate DataSet API and but haven't got the ba

Re: Job Manager is taking very long time to finalize the Checkpointing.

2020-11-19 Thread Slim Bouguerra
@Arvid thanks will try that, The NFS server I am using should be able to have TP. In my observation the Serde is taking most of the CPU. @Yun Tang Please find the logs also what are your thoughts? about Source Task Data Gen is causing this aka pusing the checkpoint to JM instead of filesystem ? T

Re: Dynamic ad hoc query deployment strategy

2020-11-19 Thread Kostas Kloudas
Hi, Thanks for reaching out! First of all, I would like to point out that an interesting alternative to the per-job cluster could be running your jobs in application mode [1]. Given that you want to run arbitrary SQL queries, I do not think you can "share" across queries the part of the job grap

Re: Jdbc input format and system properties

2020-11-19 Thread Flavio Pompermaier
the properties arrives to the task manager because I can see them in the java process (using ps aux)..or donyoubmean some special line of code? Il gio 19 nov 2020, 20:53 Arvid Heise ha scritto: > Hi Flavio, > > you are right, all looks good. > > Can you please verify if the properties arrived at

Re: Flink on block storage in k8s

2020-11-19 Thread George Costea
Hi there, Can flink be deployed to PVCs backed by block storage? It seems the only option is blob storage today. Thanks, George

Re: Jdbc input format and system properties

2020-11-19 Thread Arvid Heise
Hi Flavio, you are right, all looks good. Can you please verify if the properties arrived at the task manager in the remote debugger session? For example, you could check the JVisualVM Overview tab. On Thu, Nov 19, 2020 at 8:38 PM Flavio Pompermaier wrote: > At the moment I use a standalone cl

Re: State of Machine Learning with Flink and especially FLIP-39

2020-11-19 Thread Arvid Heise
Hi Niklas, indeed some efforts on the machine learning libraries are pushed back in favor of getting proper PyTorch and Tensorflow support through PyFlink. Native implementations in Flink have been done so far in the DataSet API, which is going to deprecated in the next few releases in favor of t

Re: Jdbc input format and system properties

2020-11-19 Thread Flavio Pompermaier
At the moment I use a standalone cluster, isn't using env.java.opts the right way to do it? Il gio 19 nov 2020, 20:11 Arvid Heise ha scritto: > Hi Flavio, > > -D afaik passes only the system property to the entry point (client or > jobmanager depending on setup), while you probably want to have

Re: How to achieve co-location constraints in Flink 1.9.1

2020-11-19 Thread Arvid Heise
Hi Si-li, slot sharing is indeed the way that Flink performs co-location. It's actually enabled by default. It should work as expected if upstream and downstream operators have the same parallelism. In certain cases, two operators can be even chained into one task where no serialization/network t

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-19 Thread Arvid Heise
Hi Si-li, couldn't you also add the timestamp as a state to the source? So the time would store the timestamp of the last emitted record. It's nearly identical to your solution but would fit the recovery model of Flink much better. If you want to go further back to account for the records that hav

Re: How to set keystore.jks location on EMR when reading Kafka topics via SSL

2020-11-19 Thread Arvid Heise
Glad to hear that you worked it out. Indeed, the path has to be accessible by the worker nodes. A common solution is also to put it on some DFS like HDFS and reference that. Then you only need to update one file if the key changes. On Thu, Nov 19, 2020 at 2:14 AM Fanbin Bu wrote: > i have to pu

Re: Job Manager is taking very long time to finalize the Checkpointing.

2020-11-19 Thread Arvid Heise
Hi Slim, for your initial question concerning the size of _metadata. When Flink writes the checkpoint, it assumes some kind of DFS. Pretty much all known DFS implementations behave poorly for many small files. If you run a job with 5 tasks and parallelism of 120, then you'd get 600 small checkpoin

Re: Jdbc input format and system properties

2020-11-19 Thread Arvid Heise
Hi Flavio, -D afaik passes only the system property to the entry point (client or jobmanager depending on setup), while you probably want to have it on the task managers. The specific options to pass it to the task managers depend on the way you deploy. -yD for yarn for example. For docker or k8s

Re: How do i load mysql data into task

2020-11-19 Thread Arvid Heise
Hi Jiazhi, you can use a rich function and query all static data in open [1] as you'd do it in Java if you want to load the data into main memory. If you want to dynamically query the database (enriching a record), you should use Async IO instead. [2] Alternatively, you can also use the data sour

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Rex Fenley
Below is a highly redacted set of data that should represent the problem. As you can see, the "roles" field has "[null]" in it, a null value within the array. We also see in our DB corresponding rows like the following. id | roles ---+ 16867433 | {NULL} We have confir

Re: SSL setup for YARN deployment when hostnames are unknown.

2020-11-19 Thread Jiahui Jiang
Yeah there is no wildcard hostname it can be using. Went ahead and started the implementation for the start up wrapper, but just realized after generating the key-cert pair in the JM wrapper, we will need to ping back to the client with the cert. Another question I have is, currently we are usi

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Dylan Forciea
Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code. Dylan From: Jark Wu Date: Thursday, November 19, 2020 at 9:12 AM To: Dylan Forciea Cc: Danny Chan , Rex Fenley , Flink ML Subject: Re: Filter Null in Array in SQL Conne

Re: Logs of JobExecutionListener

2020-11-19 Thread Flavio Pompermaier
You're right..I removed my flink dir and I re-extracted it and now it works. Unfortunately I didn't keep the old version to understand what were the difference but the error was probably caused by the fact that I had a previous version of the WordCount.jar (without the listener) in the flink lib di

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-19 Thread Simone Cavallarin
Many thanks for the Help!! Simone From: Aljoscha Krettek Sent: 19 November 2020 11:46 To: user@flink.apache.org Subject: Re: How to use EventTimeSessionWindows.withDynamicGap() On 17.11.20 17:37, Simone Cavallarin wrote: > Hi, > > I have been working on the sug

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Jark Wu
Hi Dylan, I think Rex encountered another issue, because he is using Kafka with Debezium format. Hi Rex, If you can share the json data and the exception stack, that would be helpful! Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data. Best, Ja

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Dylan Forciea
Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1]. Regards, Dylan Forciea [1] https://issues.apache.org/jira/browse/FLINK-19771 From: Danny C

Re: Caching Mechanism in Flink

2020-11-19 Thread Andrey Zagrebin
Hi Iacovos, As Matthias mentioned tasks' off-heap has nothing to do with the memory segments. This memory component is reserved only for the user code. The memory segments are managed by Flink and used for batch workloads, like in memory joins etc. They are part of managed memory (taskmanager.mem

Re: Logs of JobExecutionListener

2020-11-19 Thread Andrey Zagrebin
I also tried 1.11.0 and 1.11.2, both work for me. On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek wrote: > Hmm, there was this issue: > https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed > in your version. > > On 19.11.20 12:58, Flavio Pompermaier wrote: > > Which version

Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
Hmm, there was this issue: https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed in your version. On 19.11.20 12:58, Flavio Pompermaier wrote: Which version are you using? I used the exact same commands on Flink 1.11.0 and I didn't get the job listener output.. Il gio 19 no

Re: Strange behaviour when using RMQSource in Flink 1.11.2

2020-11-19 Thread Thomas Eckestad
Hi Andrey, Thank you for your response. I created https://issues.apache.org/jira/browse/FLINK-20244. Best Regards, Thomas From: Andrey Zagrebin Sent: Thursday, November 19, 2020 8:41 To: Thomas Eckestad Cc: user@flink.apache.org Subject: Re: Strange behaviour

State of Machine Learning with Flink and especially FLIP-39

2020-11-19 Thread Niklas Wilcke
Hi Flink-Community, I'm digging through the history of FlinkML and FLIP-39 [0]. What I understood so far is that FlinkML has been removed in 1.9, because it got unmaintained. I'm not really able to find out whether FLIP-39 and providing a replacement for FlinkML is currently worked on. The Umbre

Re: Logs of JobExecutionListener

2020-11-19 Thread Flavio Pompermaier
Which version are you using? I used the exact same commands on Flink 1.11.0 and I didn't get the job listener output.. Il gio 19 nov 2020, 12:53 Andrey Zagrebin ha scritto: > Hi Flavio and Aljoscha, > > Sorry for the late heads up. I could not actually reproduce the reported > problem with 'flin

Re: Logs of JobExecutionListener

2020-11-19 Thread Flavio Pompermaier
Hi Aljoscha, in my main class, within the jar, I create the env and I call env.execute(). The listener is not called if the job is ran by the CLI client or FlinkRestClient, I don't see anything on the job manager or task manager. To me this is a bug and you can verify it attaching a listener to the

Re: Logs of JobExecutionListener

2020-11-19 Thread Andrey Zagrebin
Hi Flavio and Aljoscha, Sorry for the late heads up. I could not actually reproduce the reported problem with 'flink run' and local standalone cluster on master. I get the expected output with the suggested modification of WordCount program: $ bin/start-cluster.sh $ rm -rf out; bin/flink run fli

How to achieve co-location constraints in Flink 1.9.1

2020-11-19 Thread Si-li Liu
Hi Flink only have slotSharingGroup API on DataStream class, I can't find any public API to achieve co-location constraints. Could anyone provide me an example? Another question is that if I use slotSharing group, Flink will schedule two sub tasks to same slot is possible. I think such schedule w

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-19 Thread Aljoscha Krettek
On 17.11.20 17:37, Simone Cavallarin wrote: Hi, I have been working on the suggestion that you gave me, thanks! The first part is to add to the message the gap. 1)I receive the event, 2)I take that event and I map it using StatefulsessionCalculator, that is where I put together "The message",

Re: execution.runtime-mode=BATCH when reading from Hive

2020-11-19 Thread Aljoscha Krettek
Thanks! It's good to see that it is helpful to you. Best, Aljoscha On 18.11.20 18:11, Dongwon Kim wrote: Hi Aljoscha, Unfortunately, it's not that easy right now because normal Sinks that rely on checkpointing to write out data, such as Kafka, don't work in BATCH execution mode because we don

Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
JobListener.onJobExecuted() is only invoked in ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none of these is still in the call chain with that setup then the listener will not be invoked. Also, this would only happen on the client, not on the broker (in your case) or th

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
Can you also share your problematic json string here ? So that we can decide the specific error case cause. Rex Fenley 于2020年11月19日周四 下午2:51写道: > Hi, > > I recently discovered some of our data has NULL values arriving in an > ARRAY column. This column is being consumed by Flink via the Kafka > c

Re: Logs of JobExecutionListener

2020-11-19 Thread Flavio Pompermaier
I have a spring boot job server that act as a broker towards our application and a Flink session cluster. To submit a job I use the FlinkRestClient (that is also the one used in the CLI client when I use the run action it if I'm not wrong). However both methods don't trigger the job listener. Il g

Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
@Flavio, when you're saying you're using the RestClusterClient, you are not actually using that manually, right? You're just submitting your job via "bin/flink run ...", right? What's the exact invocation of "bin/flink run" that you're using? On 19.11.20 09:29, Andrey Zagrebin wrote: Hi Flavi

Re: Logs of JobExecutionListener

2020-11-19 Thread Andrey Zagrebin
Hi Flavio, I think I can reproduce what you are reporting (assuming you also pass '--output' to 'flink run'). I am not sure why it behaves like this. I would suggest filing a Jira ticket for this. Best, Andrey On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier wrote: > is this a bug or is it a

Re: Problems with FlinkKafkaProducer - closing after timeoutMillis = 9223372036854775807 ms.

2020-11-19 Thread Tzu-Li (Gordon) Tai
Hi, One thing to clarify first: I think the "Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms" log doesn't necessarily mean that a producer was closed due to timeout (Long.MAX_VALUE). I guess that is just a Kafka log message that is logged when a Kafka producer is closed with

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
Hi, Fenley ~ You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~ [1] https://issues.apache.org/jira/browse/FLINK-20234 Rex Fenley 于2020年11月19日周四 下午2:51写道: > Hi, > > I recently discovered some of our data has NULL values arriving