Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-07 Thread Arvid Heise
Hi, ChildFirstClassLoader are created (more or less) by application jar and seeing so many looks like a classloader leak to me. I'd expect you to see a new ChildFirstClassLoader popping up with each new job submission. Can you check who is referencing the ChildFirstClassLoader transitively? Usual

Re: Async + Broadcast?

2021-04-07 Thread Arvid Heise
Hi Alex, your approach is completely valid. What you want to achieve is that you have a chain between your state managing operator and the consuming async operations. In that way, you have no serialization overhead. To achieve that you want to - use Flink 1.11+ [1] - make sure that if you have a

Re: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-07 Thread Yangze Guo
I went through the JM & TM logs but could not find any valuable clue. The exception is actually thrown by kafka-producer-network-thread. Maybe @Qingsheng could also take a look? Best, Yangze Guo On Thu, Apr 8, 2021 at 10:39 AM 太平洋 <495635...@qq.com> wrote: > > I have configured to 512M, but prob

Re: Async + Broadcast?

2021-04-07 Thread Alex Cruise
Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :) -0xe1a On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hey Alex, > > I'm not sure if there is a best practice here, but what I can tell you is > that I worked on a job that did exa

Re: Zigzag shape in TM JVM used memory

2021-04-07 Thread Lu Niu
Hi, Piotr Thanks for replying. I asked this because such a pattern might imply memory oversubscription. For example, I tuned down the memory of one app from heap 2.63GB to 367MB and the job still runs fine: before: https://drive.google.com/file/d/1o8k9Vv3yb5gXITi4GvmlXMteQcRfmOhr/view?usp=sharing

Re: Async + Broadcast?

2021-04-07 Thread Austin Cawley-Edwards
Hey Alex, I'm not sure if there is a best practice here, but what I can tell you is that I worked on a job that did exactly what you're suggesting with a non-async operator to create a [record, config] tuple, which was then passed to the async stage. Our config objects were also not tiny (~500kb)

Async + Broadcast?

2021-04-07 Thread Alex Cruise
Hi folks, I have a somewhat complex Flink job that has a few async stages, and a few stateful stages. It currently loads its configuration on startup, and doesn't attempt to refresh it. Now I'm working on dynamic reconfiguration. I've written a polling source which sends a configuration snapshot

Compression with rocksdb backed state

2021-04-07 Thread deepthi Sridharan
I am trying to understand this section on compression of checkpoints which has me a bit confused https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression Could you please confirm if my understanding is correct: 1. Compression is disabled by default for f

Re: questions regarding stateful functions

2021-04-07 Thread Marco Villalobos
That is a more round-abou then I hoped for. I was hoping that a process function could call a stateful function and receive and act upon the response, but that's not the case. Thank you. On Wed, Apr 7, 2021 at 11:38 AM Igal Shilman wrote: > Yes it is possible. A process function upstream to a

Re: entrypoint for executing job in task manager

2021-04-07 Thread Bob Tiernay
Please see FLINK-14184 which should fully support such use cases in the future. Feel free to vote for it if you believe it would help your use case. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: questions regarding stateful functions

2021-04-07 Thread Igal Shilman
Yes it is possible. A process function upstream to a stateful function can emit a message that in turn will be routed to that function using the data stream integration. On Wed, Apr 7, 2021 at 7:16 PM Marco Villalobos wrote: > Thank you for the clarification. > > BUTthere was one question n

FLINK Kinesis consumer Checkpointing data loss

2021-04-07 Thread Vijayendra Yadav
Hi Team, We are trying to make sure we are not losing data when KINESIS Consumer is down. Kinesis streaming Job which has following checkpointing properties: *// checkpoint every X msecs env.enableCheckpointing(Conf.getFlinkCheckpointInterval());* *// enable externalized checkpoints whi

Re: questions regarding stateful functions

2021-04-07 Thread Marco Villalobos
Thank you for the clarification. BUTthere was one question not addressed: Can a stateful function be called by a process function? On Wed, Apr 7, 2021 at 8:19 AM Igal Shilman wrote: > Hello Marco! > > Your understanding is correct, but in addition > You can also use StateFun within a DataS

Re: Flink Taskmanager failure recovery and large state

2021-04-07 Thread Yaroslav Tkachenko
Hi Dhanesh, Thanks for the recommendation! I'll try it out. On Wed, Apr 7, 2021 at 1:59 AM dhanesh arole wrote: > Hi Yaroslav, > > We faced similar issues in our large stateful stream processing job. I had > asked question >

SingleValueAggFunction received more than one element error with LISTAGG

2021-04-07 Thread soumoks123
I receive the following error when trying to use the LISTAGG function in Table API. java.lang.RuntimeException: SingleValueAggFunction received more than one element. at GroupAggsHandler$1460.accumulate(Unknown Source) at org.apache.flink.table.runtime.operators.aggregate.GroupAgg

Re: questions regarding stateful functions

2021-04-07 Thread Igal Shilman
Hello Marco! Your understanding is correct, but in addition You can also use StateFun within a DataStream application [1] [1] https://ci.apache.org/projects/flink/flink-statefun-docs-master/docs/sdk/flink-datastream/ On Wed, Apr 7, 2021 at 2:49 AM Marco Villalobos wrote: > Upon reading about s

Reducing Task Manager Count Greatly Increases Savepoint Restore

2021-04-07 Thread Kevin Lam
Hi all, We are trying to benchmark savepoint size vs. restore time. One thing we've observed is that when we reduce the number of task managers, the time to restore from a savepoint increases drastically: 1/ Restoring from 9.7tb savepoint onto 156 task managers takes 28 minutes 2/ Restoring from

Re: How to know if task-local recovery kicked in for some nodes?

2021-04-07 Thread Sonam Mandal
Hi Till and Dhanesh, Thanks for the insights into both on how to check that this kicks in and on the expected behavior. My understanding too was that if multiple TMs are used for the job, any TMs that don’t go down can take advantage of local recovery. Do you have any insights on a good minimum

Re: How to know if task-local recovery kicked in for some nodes?

2021-04-07 Thread dhanesh arole
Hi Till, You are right. To give you more context about our setup, we are running 1 task slot per task manager and total number of task manager replicas equal to job parallelism. The issue actually exacerbates during rolling deployment of task managers as each TM goes offline and comes back online

Flink 1.13 and CSV (batch) writing

2021-04-07 Thread Flavio Pompermaier
Hi to all, I'm testing writing to a CSV using Flink 1.13 and I get the following error: The matching candidates: org.apache.flink.table.sinks.CsvBatchTableSinkFactory Unsupported property keys: format.quote-character I create the table env using this: final EnvironmentSettings envSettings = Envi

Re: Will env.execute() and executeInsert() submit 2 jobs?

2021-04-07 Thread Yik San Chan
I believe you should delete. You can delete that and see if that reduces the extra job. On Wed, Apr 7, 2021 at 5:02 PM 王 浩成 wrote: > My program firstly did something on a data stream using DataStream API, > and then I converted it into a table and inserted it into another table for > sink, at th

Re: How to know if task-local recovery kicked in for some nodes?

2021-04-07 Thread Till Rohrmann
Hi Dhanesh, if some of the previously used TMs are still available, then Flink should try to redeploy tasks onto them also in case of a global failover. Only those tasks which have been executed on the lost TaskManager will need new slots and have to download the state from the remote storage. Ch

Will env.execute() and executeInsert() submit 2 jobs?

2021-04-07 Thread 王 浩成
My program firstly did something on a data stream using DataStream API, and then I converted it into a table and inserted it into another table for sink, at the end I wrote ‘env.execute(‘jobname’)’ to make sure the DataStream API part work. But I found the job will create 2 Flink job, and from

Re: Flink Taskmanager failure recovery and large state

2021-04-07 Thread dhanesh arole
Hi Yaroslav, We faced similar issues in our large stateful stream processing job. I had asked question about it on a user mailing list a few days back. Based on the reply to

flink cdc postgres connector 1.1 - heartbeat.interval.ms - WAL consumption

2021-04-07 Thread bat man
Hi there, I am using flink 1.11 and cdc connector 1.1 to stream changes from a postgres table. I see the WAL consumption is increasing gradually even though the writes to tables are very less. I am using AWS RDS, from [1] I understand that setting the parameter heartbeat.interval.ms solves this W