Hi Fabian,

Thanks for your response.

Sure, let me tell you a bit more about the job.

   - Flink version 1.13.1 (I also tried 1.13.2 because I saw FLINK-22886
   <https://issues.apache.org/jira/browse/FLINK-22886>, but this didn't
   help)
   - We're running on kubernetes in an application cluster.
   taskmanager.memory.process.size = 16GB, but we give our task manager pods a
   memory limit of 20GB. Our full config is below [0]

We've followed the steps at
https://erikwramner.files.wordpress.com/2017/10/native-memory-leaks-in-java.pdf
,
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr007.html,
and
https://technology.blog.gov.uk/2015/12/11/using-jemalloc-to-get-to-the-bottom-of-a-memory-leak/
to try and diagnose but this didn't really give us something to go off of.

Notably, we baselined the jcmd memory profile (jcmd $(pgrep java)
VM.native_memory baseline) and then ran a diff before and after the
post-restart memory spike, and nothing in there reflects the few GB of
usage increase.

What was added to Flink 1.14? What other issues have you seen in the past?

Also I came across
https://medium.com/expedia-group-tech/solving-a-native-memory-leak-71fe4b6f9463
when researching rocksdb. It suggests that unclosed RocksDB iterators can
be a source of memory leaks. Is there any chance there are iterators being
left open post job restart?

[0]
```
jobmanager.memory.process.size: 16Gb

taskmanager.rpc.port: 6122
taskmanager.memory.process.size: 16Gb
taskmanager.memory.managed.fraction: 0.4
taskmanager.numberOfTaskSlots: 4

high-availability.storageDir: <redacted>
kubernetes.cluster-id: <redacted>
kubernetes.namespace: <redacted>
high-availability.jobmanager.port: 50010
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
restart-strategy: exponential-delay
resourcemanager.taskmanager-registration.timeout: 30 min

blob.server.port: 6124
queryable-state.proxy.ports: 6125

heartbeat.interval: 60000
heartbeat.timeout: 120000

web.timeout: 1800000
rest.flamegraph.enabled: true

state.backend: rocksdb
state.checkpoints.dir: <redacted>
state.savepoints.dir: <redacted>

state.backend.rocksdb.localdir: /rocksdb
state.backend.incremental: true
state.backend.fs.memory-threshold: 1m
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.checkpoint.transfer.thread.num: 4
state.backend.rocksdb.block.blocksize: 16KB
state.backend.rocksdb.block.cache-size: 64MB
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED

jobmanager.execution.failover-strategy: region

metrics.scope.jm: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
metrics.scope.tm: flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator

state.backend.rocksdb.metrics.actual-delayed-write-rate: true
state.backend.rocksdb.metrics.background-errors: true
state.backend.rocksdb.metrics.block-cache-capacity: true
state.backend.rocksdb.metrics.block-cache-pinned-usage: true
state.backend.rocksdb.metrics.block-cache-usage: true
state.backend.rocksdb.metrics.compaction-pending: true
state.backend.rocksdb.metrics.cur-size-active-mem-table: true
state.backend.rocksdb.metrics.cur-size-all-mem-tables: true
state.backend.rocksdb.metrics.estimate-live-data-size: true
state.backend.rocksdb.metrics.estimate-num-keys: true
state.backend.rocksdb.metrics.estimate-pending-compaction-bytes: true
state.backend.rocksdb.metrics.estimate-table-readers-mem: true
state.backend.rocksdb.metrics.is-write-stopped: true
state.backend.rocksdb.metrics.mem-table-flush-pending: true
state.backend.rocksdb.metrics.num-deletes-active-mem-table: true
state.backend.rocksdb.metrics.num-deletes-imm-mem-tables: true
state.backend.rocksdb.metrics.num-entries-active-mem-table: true
state.backend.rocksdb.metrics.num-entries-imm-mem-tables: true
state.backend.rocksdb.metrics.num-immutable-mem-table: true
state.backend.rocksdb.metrics.num-live-versions: true
state.backend.rocksdb.metrics.num-running-compactions: true
state.backend.rocksdb.metrics.num-running-flushes: true
state.backend.rocksdb.metrics.num-snapshots: true
state.backend.rocksdb.metrics.size-all-mem-tables: true

env.java.opts: -Djavax.net.ssl.keyStore=/app/kafka/certs/certificate.jks
-Djavax.net.ssl.keyStorePassword=changeit -Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.port=1099
-Dcom.sun.management.jmxremote.rmi.port=1099
-Djava.rmi.server.hostname=127.0.0.1 -XX:NativeMemoryTracking=detail
env.java.opts.taskmanager: -Dtaskmanager.host=10.12.72.181
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/rocksdb/memdump.hprof
-Djava.rmi.server.hostname=127.0.0.1 -XX:NativeMemoryTracking=detail
jobmanager.rpc.address: flink-jobmanager
query.server.port: 6125
```

On Fri, Oct 1, 2021 at 9:38 AM Fabian Paul <fabianp...@ververica.com> wrote:

> Hi Kevin,
>
> You are right RocksDB is probably responsible for the memory consumption
> you are noticing. We have definitely seen similar issues in the past and
> with the latest Flink version 1.14 we tried to restrict the RocksDB memory
> consumption even more to make it better controllable.
>
> Can you tell is a bit more about the job you are using and the respective
> Flink version? I would be also interested what kind of memory
> configurations you did on the flink cluster i.e.
> taskmanager.memory.process.size. You can also have a look at the following
> docs pages [1] to
> fine tune the memory consumption of your job.
>
> Please let me know if that helps.
>
> Best,
> Fabian
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_setup/#configure-total-memory
>
>
>

Reply via email to