Btw, with regard to:

> The default writer-buffer-number is 2 at most for each column family, and
the default write-buffer-memory size is 4MB.

This isn't what I see when looking at the OPTIONS-XXXXXX file in the
rocksdb directories in state:

[CFOptions "xxxxxx"]
  ttl=0
  report_bg_io_stats=false

compaction_options_universal={allow_trivial_move=false;size_ratio=1;min_merge_width=2;max_size_amplification_percent=200;max_merge_width=4294967295;compression_size_percent=-1;stop_style=kCompactionStopStyleTotalSize;}
  table_factory=BlockBasedTable
  paranoid_file_checks=false
  compression_per_level=
  inplace_update_support=false
  soft_pending_compaction_bytes_limit=68719476736
  max_successive_merges=0
  max_write_buffer_number=2
  level_compaction_dynamic_level_bytes=false
  max_bytes_for_level_base=268435456
  optimize_filters_for_hits=false
  force_consistency_checks=false
  disable_auto_compactions=false
  max_compaction_bytes=1677721600
  hard_pending_compaction_bytes_limit=274877906944

compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;ttl=0;}
  max_bytes_for_level_multiplier=10.000000
  level0_file_num_compaction_trigger=4
  level0_slowdown_writes_trigger=20
  compaction_pri=kByCompensatedSize
  compaction_filter=nullptr
  level0_stop_writes_trigger=36
 * write_buffer_size=67108864*
  min_write_buffer_number_to_merge=1
  num_levels=7
  target_file_size_multiplier=1
  arena_block_size=8388608
  memtable_huge_page_size=0
  bloom_locality=0
  inplace_update_num_locks=10000
  memtable_prefix_bloom_size_ratio=0.000000
  max_sequential_skip_in_iterations=8
  max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
  compression=kSnappyCompression
  max_write_buffer_number_to_maintain=0
  bottommost_compression=kDisableCompressionOption
  comparator=leveldb.BytewiseComparator
  prefix_extractor=nullptr
  target_file_size_base=67108864
  merge_operator=StringAppendTESTOperator
  memtable_insert_with_hint_prefix_extractor=nullptr
  memtable_factory=SkipListFactory
  compaction_filter_factory=nullptr
  compaction_style=kCompactionStyleLevel

Are these options somehow not applied or overridden?

On Mon, Jul 29, 2019 at 4:42 PM wvl <lee...@gmail.com> wrote:

> Excellent. Thanks for all the answers so far.
>
> So there was another issue I mentioned which we made some progress gaining
> insight into, namely our metaspace growth when faced with job restarts.
>
> We can easily hit 1Gb metaspace usage within 15 minutes if we restart
> often.
> We attempted to troubleshoot this issue by looking at all the classes in
> metaspace using `jcmd <pid> GC.class_stats`.
>
> Here we observed that after every job restart another entry is created for
> every class in our job. Where the old classes have InstBytes=0. So far so
> good, but moving to the Total column for these entries show that memory is
> still being used.
> Also, adding up all entries in the Total column indeed corresponds to our
> metaspace usage. So far we could only conclude that our job classes - none
> of them - were being unloaded.
>
> Then we stumbled upon this ticket. Now here are our results running the
> SocketWindowWordCount jar in a flink 1.8.0 cluster with one taskmanager.
>
> We achieve a class count by doing a jcmd 3052 GC.class_stats | grep -i
> org.apache.flink.streaming.examples.windowing.SessionWindowing | wc -l
>
> *First* run:
>   Class Count: 1
>   Metaspace: 30695K
>
> After *800*~ runs:
>   Class Count: 802
>   Metaspace: 39406K
>
>
> Interesting when we looked a bit later the class count *slowly* went
> down, slowly, step by step, where just to be sure we used `jcmd <pid>
> GC.run` to force GC every 30s or so. If I had to guess it took about 20
> minutes to go from 800~ to 170~, with metaspace dropping to 35358K. In a
> sense we've seen this behavior, but with much much larger increases in
> metaspace usage over far fewer job restarts.
>
> I've added this information to
> https://issues.apache.org/jira/browse/FLINK-11205.
>
> That said, I'd really like to confirm the following:
> - classes should usually only appear once in GC.class_stats output
> - flink / the jvm has very slow cleanup of the metaspace
> - something clearly is leaking during restarts
>
> On Mon, Jul 29, 2019 at 9:52 AM Yu Li <car...@gmail.com> wrote:
>
>> For the memory usage of RocksDB, there's already some discussion in
>> FLINK-7289 <https://issues.apache.org/jira/browse/FLINK-7289> and a good
>> suggestion
>> <https://issues.apache.org/jira/browse/FLINK-7289?focusedCommentId=16874305&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16874305>
>> from Mike to use the WriteBufferManager to limit the total memory usage,
>> FYI.
>>
>> We will drive to make the memory management of state backends more "hands
>> free" in latter release (probably in release 1.10) and please watch the
>> release plan and/or the weekly community update [1] threads.
>>
>> [1] https://s.apache.org/ix7iv
>>
>> Best Regards,
>> Yu
>>
>>
>> On Thu, 25 Jul 2019 at 15:12, Yun Tang <myas...@live.com> wrote:
>>
>>> Hi
>>>
>>> It's definitely not easy to calculate the accurate memory usage of
>>> RocksDB, but formula of "block-cache-memory + column-family-number *
>>> write-buffer-memory * write-buffer-number + index&filter memory"  should
>>> give enough sophisticated hints.
>>> When talking about the column-family-number, they are equals to the
>>> number of your states which are the declared state descriptors in one
>>> operator and potential one window state (if you're using window).
>>> The default writer-buffer-number is 2 at most for each column family,
>>> and the default write-buffer-memory size is 4MB. Pay attention that if you
>>> ever configure the options for RocksDB, these memory usage would differ
>>> from default values.
>>> The last part of index&filter memory is not easy to estimate, but from
>>> my experience this part of memory would not occupy too much only if you
>>> have many open files.
>>>
>>> Last but not least, Flink would enable slot sharing by default, and even
>>> if you only one slot per taskmanager, there might exists many RocksDB
>>> within that TM due to many operator with keyed state running.
>>>
>>> Apart from the theoretical analysis, you'd better to open RocksDB native
>>> metrics or track the memory usage of pods through Prometheus with k8s.
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* wvl <lee...@gmail.com>
>>> *Sent:* Thursday, July 25, 2019 17:50
>>> *To:* Yang Wang <danrtsey...@gmail.com>
>>> *Cc:* Yun Tang <myas...@live.com>; Xintong Song <tonysong...@gmail.com>;
>>> user <user@flink.apache.org>
>>> *Subject:* Re: Memory constrains running Flink on Kubernetes
>>>
>>> Thanks for all the answers so far.
>>>
>>> Especially clarifying was that RocksDB memory usage isn't accounted for
>>> in the flink memory metrics. It's clear that we need to experiment to
>>> understand it's memory usage and knowing that we should be looking at the
>>> container memory usage minus all the jvm managed memory, helps.
>>>
>>> In mean while, we've set MaxMetaspaceSize to 200M based on our metrics.
>>> Sadly the resulting OOM does not result a better behaved job, because it
>>> would seem that the (taskmanager) JVM itself is not restarted - which makes
>>> sense in a multijob environment.
>>> So we're looking into ways to simply prevent this metaspace growth (job
>>> library jars in /lib on TM).
>>>
>>> Going back to RocksDB, the given formula "block-cache-memory +
>>> column-family-number * write-buffer-memory * write-buffer-number +
>>> index&filter memory." isn't completely clear to me.
>>>
>>> Block Cache: "Out of box, RocksDB will use LRU-based block cache
>>> implementation with 8MB capacity"
>>> Index & Filter Cache: "By default index and filter blocks are cached
>>> outside of block cache, and users won't be able to control how much memory
>>> should be use to cache these blocks, other than setting max_open_files.".
>>> The default settings doesn't set max_open_files and the rocksdb default
>>> seems to be 1000 (
>>> https://github.com/facebook/rocksdb/blob/master/include/rocksdb/utilities/leveldb_options.h#L89)
>>> .. not completely sure about this.
>>> Write Buffer Memory: "The default is 64 MB. You need to budget for 2 x
>>> your worst case memory use."
>>>
>>> May I presume a unique ValueStateDescriptor equals a Column Family?
>>> If so, say I have 10 of those.
>>> 8MB + (10 * 64 * 2) + $Index&FilterBlocks
>>>
>>> So is that correct and how would one calculate $Index&FilterBlocks? The
>>> docs suggest a relationship between max_open_files (1000) and the amount
>>> index/filter of blocks that can be cached, but is this a 1 to 1
>>> relationship? Anyway, this concept of blocks is very unclear.
>>>
>>> > Have you ever set the memory limit of your taskmanager pod when
>>> launching it in k8s?
>>>
>>> Definitely. We settled on 8GB pods with taskmanager.heap.size: 5000m and
>>> 1 slot and were looking into downsizing a bit to improve our pod to VM
>>> ratio.
>>>
>>> On Wed, Jul 24, 2019 at 11:07 AM Yang Wang <danrtsey...@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>>
>>> The heap in a flink TaskManager k8s pod include the following parts:
>>>
>>>    - jvm heap, limited by -Xmx
>>>    - jvm non-heap, limited by -XX:MaxMetaspaceSize
>>>    - jvm direct memory, limited by -XX:MaxDirectMemorySize
>>>    - native memory, used by rocksdb, just as Yun Tang said, could be
>>>    limited by rocksdb configurations
>>>
>>>
>>> So if your k8s pod is terminated by OOMKilled, the cause may be the
>>> non-heap memory or native memory. I suggest you add an environment
>>> FLINK_ENV_JAVA_OPTS_TM="-XX:MaxMetaspaceSize=512m" in your
>>> taskmanager.yaml. And then only the native memory could cause OOM. Leave
>>> enough memory for rocksdb, and then hope your job could run smoothly.
>>>
>>> Yun Tang <myas...@live.com> 于2019年7月24日周三 下午3:01写道:
>>>
>>> Hi William
>>>
>>> Have you ever set the memory limit of your taskmanager pod when
>>> launching it in k8s? If not, I'm afraid your node might come across node
>>> out-of-memory [1]. You could increase the limit by analyzing your memory
>>> usage
>>> When talking about the memory usage of RocksDB, a rough calculation
>>> formula could be: block-cache-memory + column-family-number *
>>> write-buffer-memory * write-buffer-number + index&filter memory. The block
>>> cache, write buffer memory&number could be mainly configured. And the
>>> column-family number is decided by the state number within your operator.
>>> The last part of index&filter memory cannot be measured well only if you
>>> also cache them in block cache [2] (but this would impact the performance).
>>> If you want to the memory stats of rocksDB, turn on the native metrics
>>> of RocksDB [3] is a good choice.
>>>
>>>
>>> [1]
>>> https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/#node-oom-behavior
>>> [2]
>>> https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#rocksdb-native-metrics
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Xintong Song <tonysong...@gmail.com>
>>> *Sent:* Wednesday, July 24, 2019 11:59
>>> *To:* wvl <lee...@gmail.com>
>>> *Cc:* user <user@flink.apache.org>
>>> *Subject:* Re: Memory constrains running Flink on Kubernetes
>>>
>>> Hi,
>>>
>>> Flink acquires these 'Status_JVM_Memory' metrics through the MXBean
>>> library. According to MXBean document, non-heap is "the Java virtual
>>> machine manages memory other than the heap (referred as non-heap memory)".
>>> Not sure whether that is equivalent to the metaspace. If the
>>> '-XX:MaxMetaspaceSize', it should trigger metaspcae clean up when the limit
>>> is reached.
>>>
>>> As for RocksDB, it mainly uses non-java memory. Heap, non-heap and
>>> direct memory could be considered as java memory (or at least allocated
>>> through the java process). That means, RocksDB is actually using the memory
>>> that is accounted in the total K8s container memory but not accounted in
>>> neither of java heap / non-heap / direct memory, which in your case the 1GB
>>> unaccounted. To leave more memory for RocksDB, you need to either configure
>>> more memory for the K8s containers, or configure less java memory through
>>> the config option 'taskmanager.heap.size'.
>>>
>>> The config option 'taskmanager.heap.size', despite the 'heap' in its
>>> key, also accounts for network memory (which uses direct buffers).
>>> Currently, memory configurations in Flink is quite complicated and
>>> confusing. The community is aware of this, and is planing for an overall
>>> improvement.
>>>
>>> To my understanding, once you set '-XX:MaxMetaspaceSize', there should
>>> be limits on heap, non-heap and direct memory in JVM. You should be able to
>>> find which part that requires memory more than the limit from the java OOM
>>> error message. If there is no java OOM but a K8s container OOM, then it
>>> should be non-java memory used by RocksDB.
>>>
>>> [1]
>>> https://docs.oracle.com/javase/8/docs/api/java/lang/management/MemoryMXBean.html
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Jul 23, 2019 at 8:42 PM wvl <lee...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> We're running a relatively simply Flink application that uses a bunch of
>>> state in RocksDB on Kubernetes.
>>> During the course of development and going to production, we found that
>>> we were often running into memory issues made apparent by Kubernetes
>>> OOMKilled and Java OOM log events.
>>>
>>> In order to tackle these, we're trying to account for all the memory
>>> used in the container, to allow proper tuning.
>>> Metric-wise we have:
>>> - container_memory_working_set_bytes = 6,5GB
>>> - flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
>>> - flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
>>> - flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB
>>>
>>> This is my understanding based on all the documentation and observations:
>>> container_memory_working_set_bytes will be the total amount of memory in
>>> use, disregarding OS page & block cache.
>>> Heap will be heap.
>>> NonHeap is mostly the metaspace.
>>> Direct_Memory is mostly network buffers.
>>>
>>> Running the numbers I have 1 GB unaccounted for. I'm also uncertain as
>>> to RocksDB. According to the docs RocksDB has a "Column Family Write
>>> Buffer" where "You need to budget for 2 x your worst case memory use".
>>> We have 17 ValueStateDescriptors (ignoring state for windows) which I'm
>>> assuming corresponds to a "Column Family" in RockDB. Meaning our budget
>>> should be around 2GB.
>>> Is this accounted for in one of the flink_taskmanager metrics above?
>>> We've also enabled various rocksdb metrics, but it's unclear where this
>>> Write Buffer memory would be represented.
>>>
>>> Finally, we've seen that when our job has issues and is restarted
>>> rapidly, NonHeap_Used grows from an initial 50Mb to 700MB, before our
>>> containers are killed. We're assuming this is due
>>> to no form of cleanup in the metaspace as classes get (re)loaded.
>>>
>>> These are our taskmanager JVM settings: -XX:+UseG1GC
>>> -XX:MaxDirectMemorySize=1G -XX:+UnlockExperimentalVMOptions
>>> -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2
>>> With flink config:
>>>       taskmanager.heap.size: 5000m
>>>       state.backend: rocksdb
>>>       state.backend.incremental: true
>>>       state.backend.rocksdb.timer-service.factory: ROCKSDB
>>>
>>> Based on what we've observed we're thinking about setting
>>> -XX:MaxMetaspaceSize to a reasonable value, so that we at least get an
>>> error message which can easily be traced back to the behavior we're seeing.
>>>
>>> Okay, all that said let's sum up what we're asking here:
>>> - Is there any more insight into how memory is accounted for than our
>>> current metrics?
>>> - Which metric, if any accounts for RocksDB memory usage?
>>> - What's going on with the Metaspace growth we're seeing during job
>>> restarts, is there something we can do about this such as setting
>>> -XX:MaxMetaspaceSize?
>>> - Any other tips to improve reliability running in resource constrained
>>> environments such as Kubernetes?
>>>
>>> Thanks,
>>>
>>> William
>>>
>>>

Reply via email to