Filesystem as a stream source in Table/SQL API

2020-11-20 Thread eef hhj
Hi, I'm facing a situation where I want the Flink App to dynamically detect the change of the Filesystem batch data source. As I tried in the following example in sql-client.sh, it can query all the records under the folder for the select. While I'm adding a new file to the folder, the query does

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Si-li Liu
Thanks for your reply! Yes, I want to A_i and T_i run in the same slot. Ideally, T operator should have 1 parallism in topo, also all A_i can start from the same timestamp, but some minor difference of resume timestamp in different A_i source is also acceptable. So I think multiple T operator is a

Re: Concise example of how to deploy flink on Kubernetes

2020-11-20 Thread Xingbo Huang
Hi George, Have you referred to the official document[1]? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html Best, Xingbo 在 2020年11月21日星期六,George Costea 写道: > Hi there, > > Is there an example of how to deploy a flink cluster on Kubernetes? > I'd lik

Re: Force Join Unique Key

2020-11-20 Thread Rex Fenley
I have a few more questions. Even if a join has no unique keys, couldn't the join key be used to organize records into a tree, of groups of records, per join key so that lookups are faster? I also have been looking at RocksDB docs and it looks like it has a RangeScan operation. I'm guessing then

Concise example of how to deploy flink on Kubernetes

2020-11-20 Thread George Costea
Hi there, Is there an example of how to deploy a flink cluster on Kubernetes? I'd like to deploy the flink cluster, a kafka-broker, and then the greeter example to give it a try. Thanks, George

Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
Hm yes that are good reasons. The issue is that if you put it into Flink, then it's part of the system classloader of Flink, so there is no way to unload classes or protect Flink's classes (+its dependencies) from being overwritten by your dependencies. I'm thinking that this may cause differences

Re: Logs of JobExecutionListener

2020-11-20 Thread Flavio Pompermaier
I think that the problem is that my REST service submits the job to the Flink standalone cluster and responds to the client with the submitted job ID. To achieve this, I was using the RestClusterClient because with that I can use the following code and retrieve the JobID: (1) JobID flinkJobId

Re: Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
A couple of reasons I 've done that - it's listed as an option here : https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#advanced-customization under optional libraries - I have over 200 jobs running that rely on the same core functionality provided by the jar in ques

Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
Oh no, please never put user code (with included libraries) into flink's classpath. It's not supported exactly for classloader reasons. Why did you think that this would be a good approach? Is your jar too big? Maybe a different deployment mode would be more appropriate? [1] Alternatively, if you

Hi I'm having problems with self-signed certificiate trust with Native K8S

2020-11-20 Thread Kevin Kwon
Hi I am using MinIO as a S3 mock backend for Native K8S Everything seems to be fine except that it cannot connect to S3 since self-signed certificates' trusted store are not cloned in Deployment resources Below is in order, how I add the trusted keystore by using keytools and how I run my app wit

Re: Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
I just switched from providing my jar while creating a Remote environment to providing this jar on flink's classpath. It used to work just fine when the jar was shipped to Fllink with the job graph. Now when jar is available to flink on the startup the same job that used to run is failing with exce

Re: Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
Thanks, Arvid, That is what I thought too. I went through all the instances where it might 've been a member variable and made sure that it's declared as transient :-( Is there anything else I can check? Alex On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise wrote: > Are you using ObjectMapper as a n

Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
The easiest solution for all non-trivial issues like this is to start the application locally in a local executor, so you can debug in your IDE. Additionally, double-check that you have no lambdas/anonymous classes that reference outer classes with ObjectMapper. ObjectMapper should also be static

Re: Filter Null in Array in SQL Connector

2020-11-20 Thread Rex Fenley
Btw, this is what our source and sink essentially look like, with some columns redacted. CREATE TABLE source_kafka_data ( id BIGINT, roles ARRAY, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = 'kafka', '

Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
Are you using ObjectMapper as a non-transient field? If so, please make it transient and initialize in open() of a Rich*Function. On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman wrote: > Hi, > I added my custom jar (that includes dependencies on Jackson) to Flink > classpath. It seems to be l

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread fuyao . li
Hi Timo, One more question, the blog also mentioned a jira task to solve this issue. https://issues.apache.org/jira/browse/FLINK-10886. Will this feature be available in 1.12? Thanks! Best, Fuyao On 11/20/20 11:37, fuyao...@oracle.com wrote: Hi Timo, Thanks for your reply! I think your s

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread fuyao . li
Hi Timo, Thanks for your reply! I think your suggestions is really helpful! The good news is that I had managed to figure out it something by myself few days ago. 1. Thanks for the update about the table parallelism issue! 2. After trying out the idleness setting. It prevents some idle subta

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Arvid Heise
Your topology is definitively interesting and makes sense to me on a high level. The main question remaining is the parallelism. I'm assuming you run your pipeline with parallelism p and both source A and timestampcalculator T are run with parallelism p. You want to create a situation where for A_i

Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
Hi, I added my custom jar (that includes dependencies on Jackson) to Flink classpath. It seems to be loaded just fine. But when the job starts I am getting an exception below. I am sure how to interpret the exception though and would appreciate it if somebody gives me advice on it. Thanks Alex 202

Re: Dynamic ad hoc query deployment strategy

2020-11-20 Thread lalala
Hi Kostas, Thank you for your response. Is what you are saying valid for session mode? I can submit my jobs to the existing Flink session, will they be able to share the sources? We do register our Kafka tables to `GenericInMemoryCatalog`, and the documentation says `The GenericInMemoryCatalog i

Non uniform distribution of subtasks even with cluster.evenly-spread-out-slots

2020-11-20 Thread Harshit Hajela
Hi Flink Community, I'm currently running a heavy flink job on Flink 1.9.3 that has a lot of subtasks and observing some subtask distribution issues. The job in question has 9288 sub tasks and they are running on a large set of TMs (total available slots are 1792). I'm using the *cluster.evenly-s

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread Timo Walther
Hi Fuyao, sorry for not replying earlier. You posted a lot of questions. I scanned the thread quickly, let me try to answer some of them and feel free to ask further questions afterwards. "is it possible to configure the parallelism for Table operation at operator level" No this is not pos

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Si-li Liu
Thanks for your reply. I want to join two stream A and stream B. Items in stream A come in first then I keep them in memory cache, as join key and item, then serval minutes later the items in stream B come in then the join work is performed. The timestamp of the latest expired item in memory cache

Re: Jdbc input format and system properties

2020-11-20 Thread Arvid Heise
No magic for JVM properties afaik (and I just looked in the code base for the most obvious candidates). There is also nothing to gain from overwriting properties. I'm also certain that it should work as it's used in most secured setups to inject keys/keytabs. What happens if you execute the Flink

Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
Yes, that's what is surprising..I already did a remote debug on the TM and that property is not read..but that's really weird..could it be that the JVM properties gets cleared before invoking the tasks? Il ven 20 nov 2020, 12:50 Arvid Heise ha scritto: > All looks good and as it should be. > > C

Re: Jdbc input format and system properties

2020-11-20 Thread Arvid Heise
All looks good and as it should be. Can you do a remote debugging session to the tm once more and check Boolean.getBoolean("com.mysql.disableAbandonedConnectionCleanup") There is no magic involved in System properties in Flink. If the property is set on the process, the configuration works. If it

Re: State of Machine Learning with Flink and especially FLIP-39

2020-11-20 Thread Niklas Wilcke
Hi Arvid and Jiangjie, thanks to both of you for the quick and valuable response. I will take a look at the linked projects. Kind Regards, Niklas -- niklas.wil...@uniberg.com Mobile: +49 160 9793 2593 Office: +49 40 2380 6523 Simon-von-Utrecht-Straße 85a 20359 Hamburg UNIBERG GmbH Registerger

Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
I've just tested the following code in a java class and the property (-Dcom.mysql.cj.disableAbandonedConnectionCleanup=true) is read correctly and the abandonedConnectionCleanupDisabled does not initialize the cleanupThreadExecutorService (that in my other test was causing a dynamic classloading m

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-20 Thread Aljoscha Krettek
Sure, my pleasure! Aljoscha On 19.11.20 16:12, Simone Cavallarin wrote: Many thanks for the Help!! Simone From: Aljoscha Krettek Sent: 19 November 2020 11:46 To: user@flink.apache.org Subject: Re: How to use EventTimeSessionWindows.withDynamicGap() On 17.11

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-20 Thread Pierre Oberholzer
Hi Wei, Thanks for the hint. May I please follow up by adding more context and ask for your guidance. In case the bespoken Map[String,Any] object returned by Scala: - Has a defined schema (incl. nested) with up to 100k (!) different possible keys - Has only some portion of the keys populated for

Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
no no I didn't relocate any class related to jdbc Il ven 20 nov 2020, 10:02 Arvid Heise ha scritto: > I was particularly asking if you relocate classes. Since the property name > looks like a class name, it could have been changed as well. Could you > check the value of > PropertyDefinitions.SYS

Re: Jdbc input format and system properties

2020-11-20 Thread Arvid Heise
I was particularly asking if you relocate classes. Since the property name looks like a class name, it could have been changed as well. Could you check the value of PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup in your final jar? On Fri, Nov 20, 2020 at 9:35 AM Flavio Pompermaier wro

Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-11-20 Thread Kostas Kloudas
Hi Hector, The main reasons for deprecating the readFileStream() was that: 1) it was only capable of parsing Strings and in a rather limited way as one could not even specify the encoding 2) it was not fault-tolerant, so your concerns about exactly-once were not covered One concern that I can fin

Re: Dynamic ad hoc query deployment strategy

2020-11-20 Thread Kostas Kloudas
I am also cc'ing Timo to see if he has anything more to add on this. Cheers, Kostas On Thu, Nov 19, 2020 at 9:41 PM Kostas Kloudas wrote: > > Hi, > > Thanks for reaching out! > > First of all, I would like to point out that an interesting > alternative to the per-job cluster could be running you

Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
the mysql connector is put in the client classpath and in the Flink lib dir. When i debugged remotely the AbandonedConnectionCleanupThread was initialized at the first run of the job by the taskmamager. Today I'll try to run the mysql connector in a standalone java app to see if the property is rea