@Arvid thanks will try that, The NFS server I am using should be able to
have TP. In my observation the Serde is taking most of the CPU.

@Yun Tang <myas...@live.com>
Please find the logs also what are your thoughts? about Source Task Data
Gen is causing this aka pusing the checkpoint to JM instead of filesystem ?
The TM stacktrace
https://gist.github.com/b-slim/971a069dd0754eb770d0e319a12657fb
The JM stacktrace
https://gist.github.com/b-slim/24808478c3e857be563e513a3d65e223

On Thu, Nov 19, 2020 at 11:20 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Slim,
>
> for your initial question concerning the size of _metadata. When Flink
> writes the checkpoint, it assumes some kind of DFS. Pretty much all known
> DFS implementations behave poorly for many small files. If you run a job
> with 5 tasks and parallelism of 120, then you'd get 600 small checkpoint
> files (or more depending on the configuration).
>
> To solve it, Flink combines very small files into the _metadata according
> to some threshold [1]. These small files can quickly add up though. You can
> disable that behavior by setting the threshold to 0.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#advanced-state-backends-options
>
> On Thu, Nov 19, 2020 at 12:57 AM Slim Bouguerra <slim.bougue...@gmail.com>
> wrote:
>
>> Hi Yun,
>> Thanks for the help after applying your recommendation, I am getting the
>> same issue aka very long checkpoints and then timeout
>> Now My guess is maybe the datagen source is pushing the checkpoint via
>> the network to JM is there a way to double check?
>> IF that is the case is there a way to exclude the source operators from
>> the checkpoints ?
>> Thanks
>> Please find the attached logs:
>> 1 I checked the shared folder and it has the shared operator state.
>> 2 I did set the value of fs-memory-threshold to 1kb
>>
>> This the source of the SQL testing job
>>
>> CREATE TABLE datagen (
>>                           f_sequence INT,
>>                           f_random INT,
>>                           f_random_str STRING,
>>                           f_random_str_4 STRING,
>>                           f_random_str_3 STRING,
>>                           f_random_str_2 STRING,
>>                           f_random_str_1 STRING,
>>                           ts AS localtimestamp,
>>                           WATERMARK FOR ts AS ts
>> ) WITH (
>>       'connector' = 'datagen',
>>       -- optional options --
>>       'rows-per-second'='500000',
>>       'fields.f_sequence.kind'='sequence',
>>       'fields.f_sequence.start'='1',
>>       'fields.f_sequence.end'='200000000',
>>       'fields.f_random.min'='1',
>>       'fields.f_random.max'='100',
>>       'fields.f_random_str.length'='100000',
>>       'fields.f_random_str_4.length'='100000',
>>       'fields.f_random_str_3.length'='100000',
>>       'fields.f_random_str_2.length'='100000',
>>       'fields.f_random_str_1.length'='100000'
>>       );
>>
>> ---------------------------------------
>> With more debugging I see this exception stack on the job manager
>> java.io.IOException: The rpc invocation size 199965215 exceeds the
>> maximum akka framesize.
>>
>>      at
>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>      at
>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>      at
>> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>      at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>      at com.sun.proxy.$Proxy25.acknowledgeCheckpoint(Unknown Source)
>> [?:?]
>>
>>      at
>> org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder.acknowledgeCheckpoint(RpcCheckpointResponder.java:46)
>> [flink-dist_2.11-1.11.1.jar:1.1
>>  .1[]
>>
>>
>>      at
>> org.apache.flink.runtime.state.TaskStateManagerImpl.reportTaskStateSnapshots(TaskStateManagerImpl.java:117)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>      at
>> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:160)
>> [flink-dist_2.11-1.11
>>  1.jar:1.11.1[]
>>
>>
>>      at
>> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:121)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>      at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> [?:1.8.0_172]
>>      at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> [?:1.8.0_172]
>>
>> ----------------------------------------------
>> And sometime the JM dies with this OOM
>>  java.lang.OutOfMemoryError: Java heap space
>>   at java.util.Arrays.copyOf(Arrays.java:3236) ~[?:1.8.0_172]
>>   at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>> ~[?:1.8.0_172]
>>   at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> ~[?:1.8.0_172]
>>   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> ~[?:1.8.0_172]
>>   at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>> ~[?:1.8.0_172]
>>   at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> ~[?:1.8.0_172]
>>   at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> ~[?:1.8.0_172]
>>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> ~[?:1.8.0_172]
>>   at
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:324)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:324)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:324)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.serialization.JavaSerializer.toBinary(Serializer.scala:324)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:53)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:906)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:906)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:905)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.remote.EndpointWriter.writeSend(Endpoint.scala:793)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.remote.EndpointWriter.delegate$1(Endpoint.scala:682)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.remote.EndpointWriter.writeLoop$1(Endpoint.scala:693)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.remote.EndpointWriter.sendBufferedMessages(Endpoint.scala:706)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at
>> akka.remote.EndpointWriter$$anonfun$3.applyOrElse(Endpoint.scala:637)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>   at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>
>>
>> On Wed, Nov 18, 2020 at 12:16 AM Yun Tang <myas...@live.com> wrote:
>>
>>> Hi Slim
>>>
>>> You could check the logs of taskmanager to see whether incremental
>>> checkpoint is really enabled (or you could find whether files existed under
>>> /opt/flink/pv/checkpoints/c0580ec8f55fcf1e0ceaa46fc3778b99/shared to
>>> judge).
>>> If your configuration of rocksDB and incremental-checkpoingt is really
>>> enabled, I think the large metadata size is caused by the memory threshold
>>> [1] which will send data in bytes format back to JM directly if state
>>> handle is smaller than specific threshold.
>>> Try to decrease this value to '1 kb' to see whether the size of meta
>>> data could also decrease.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-backend-fs-memory-threshold
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Slim Bouguerra <bs...@apache.org>
>>> *Sent:* Wednesday, November 18, 2020 6:16
>>> *To:* user@flink.apache.org <user@flink.apache.org>
>>> *Subject:* Job Manager is taking very long time to finalize the
>>> Checkpointing.
>>>
>>>
>>> Originally posed to the dev list
>>> ---------- Forwarded message ---------
>>> From: *Slim Bouguerra* <bs...@apache.org>
>>> Date: Tue, Nov 17, 2020 at 8:09 AM
>>> Subject: Job Manager is taking very long time to finalize the
>>> Checkpointing.
>>> To: <d...@flink.apache.org>
>>>
>>>
>>> Hi Devs,
>>> I am very new to the Flink code base and working on the evaluation of
>>> the Checkpointing strategy
>>>
>>> In my current setup I am using an NFS based file system as a checkpoint
>>> store. (NAS/NFS has a very high TP over 2GB/s on one node and I am using 12
>>> NFS servers )
>>> When pushing the system to some relatively medium scale aka 120 subtasks
>>> over 6 works with a total state of 100GB.
>>> I observe that the Job manager takes over 2 minutes to finalize the
>>> checkpoint. (observed on the UI and CPU profiling of JM see the flame graph
>>> of 30 second sample)
>>> As you can see by the attached Flames graphs the JM is very busy
>>> serializing the metadata
>>> (>org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.serializeOperatorState
>>> (2,875 samples, 99.65%))
>>> Now the question is why this metadata file is so big in the order of
>>> 3GBs in my case.
>>> How does this size scale ? num_of_tasks * num_states ?
>>>
>>> /opt/flink/pv/checkpoints/c0580ec8f55fcf1e0ceaa46fc3778b99/chk-1
>>> bash-4.2$ ls -all -h
>>> -rw-r--r-- 1 flink flink 3.0G Nov 17 01:42 _metadata
>>>
>>> The second question how to better measure the time taken by the JM to
>>> commit the transaction aka time_done_checkpoint - time_got_all_ask_form_tm
>>> Is there a config flag I am missing to make this last step faster ?
>>>
>>> My current configs for Checkpoints
>>> state.backend: rocksdb
>>>     # See the PV mount path need to be the same as  <mountPath:
>>> "/opt/flink/pv">
>>>     state.checkpoints.dir: file:///opt/flink/pv/checkpoints
>>>     state.savepoints.dir: file:///opt/flink/pv/savepoints
>>>     state.backend.incremental: true
>>>     #
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#checkpointing
>>>     execution.checkpointing.interval: 60000
>>>     execution.checkpointing.mode: AT_LEAST_ONCE
>>>     # hitting The rpc invocation size 19598830 exceeds the maximum akka
>>>     akka.framesize: 100485760b
>>>     #
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#heartbeat-timeout
>>>     heartbeat.timeout: 70000
>>>     #
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-timeout
>>>     execution.checkpointing.timeout: 15minutes
>>>
>>>
>>> some metadata about the checkpoint
>>>
>>> {"@class":"completed","id":1,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1605315046120,"latest_ack_timestamp":1605315093466,"state_size":12239786229,"end_to_end_duration":47346,"alignment_buffered":0,"num_subtasks":120,"num_acknowledged_subtasks":120,"tasks":{},"external_path":"file:/opt/flink/pv/checkpoints/7474752476036c14d7fdeb4e86af3638/chk-1"}
>>>
>>
>>
>> --
>>
>> B-Slim
>> _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

B-Slim
_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______

Reply via email to