Hello everyone,
I have set up the Flink statefun runtime on Kubernetes according to this tutorial https://github.com/apache/flink-statefun-playground/tree/main/deployments/k8s. I developed 12 custom statefun-Functions in Java and deployed them in the same way as shown in the tutorial. There are a lot of functions instances running at the same time. After receiving about 1 million messages the statefun master restarts and I see this in the log of the statefun worker.
```
2024-08-31 18:52:14,327 WARN org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception caught while trying to deliver a message: (attempt #0)ToFunctionRequestSummary(address=Address(func, parse, 0), batchSize=22680, totalSizeInBytes=5947037, numberOfStates=0) java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?] at java.nio.DirectByteBuffer.<init>(Unknown Source) ~[?:?] at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:632) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:607) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:202) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:186) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:136) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:126) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyProtobuf.serializeProtobuf(NettyProtobuf.java:39) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyHandler.write(NettyRequestReplyHandler.java:81) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:306) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyClient.writeAndFlush(NettyClient.java:188) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyRequest.onChannelAcquisitionComplete(NettyRequest.java:146) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyClient.lambda$acquireChannel$0(NettyClient.java:129) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [statefun-flink-distribution.jar:3.2.0] at java.lang.Thread.run(Unknown Source) [?:?]
```
I am seeking advice on how to address this issue. Specifically, I would like to know:
- How can I increase memory allocation in the Statefun manifest?
- What might be causing this crash?
I would be really grateful for your help and advice. Thank you very much.
Best regards,
Oliver
Here are some details from the resource configuration and logs for context:
```
RESOURCE_PARAMS extraction logs: jvm_params: -Xmx697932173 -Xms697932173 -XX:MaxDirectMemorySize=300647712 -XX:MaxMetaspaceSize=268435456 dynamic_configs: -D taskmanager.memory.network.min=166429984b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=214748368b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=166429984b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=665719939b -D taskmanager.memory.task.heap.size=563714445b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=214748368b logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance. INFO [] - Loading configuration property: jobmanager.rpc.address, statefun-master INFO [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2 INFO [] - Loading configuration property: blob.server.port, 6124 INFO [] - Loading configuration property: jobmanager.rpc.port, 6123 INFO [] - Loading configuration property: taskmanager.rpc.port, 6122 INFO [] - Loading configuration property: classloader.parent-first-patterns.additional, org.apache.flink.statefun;org.apache.kafka;com.google.protobuf INFO [] - Loading configuration property: state.backend, rocksdb INFO [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, ROCKSDB INFO [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, ROCKSDB INFO [] - Loading configuration property: state.backend.incremental, true INFO [] - Loading configuration property: parallelism.default, 1 INFO [] - Loading configuration property: s3.access-key, minioadmin INFO [] - Loading configuration property: s3.secret-key, ****** INFO [] - Loading configuration property: state.checkpoints.dir, s3://checkpoints/subscriptions INFO [] - Loading configuration property: s3.endpoint, http://minio.statefun.svc.cluster.local:9000 INFO [] - Loading configuration property: s3.path-style-access, true INFO [] - Loading configuration property: jobmanager.memory.process.size, 2048m INFO [] - Loading configuration property: taskmanager.memory.process.size, 2048m INFO [] - Loading configuration property: metrics.reporters, prom INFO [] - Loading configuration property: metrics.reporter.prom.class, org.apache.flink.metrics.prometheus.PrometheusReporter INFO [] - Loading configuration property: metrics.reporter.prom.port, 9999 INFO [] - Loading configuration property: taskmanager.latency-tracking-interval, 30000 INFO [] - Loading configuration property: metrics.latency.interval, 30000 INFO [] - Loading configuration property: taskmanager.network.memory.buffer-debloat.enabled, true INFO [] -
Final TaskExecutor Memory configuration: INFO [] - Total Process Memory: 2.000gb (2147483648 bytes) INFO [] -
Total Flink Memory: 1.550gb (1664299824 bytes) INFO [] -
Total JVM Heap Memory: 665.600mb (697932173 bytes) INFO []
- Framework: 128.000mb (134217728 bytes) INFO []
- Task: 537.600mb (563714445 bytes) INFO []
- Total Off-heap Memory: 921.600mb (966367651 bytes) INFO []
- Managed: 634.880mb (665719939 bytes) INFO []
- Total JVM Direct Memory: 286.720mb (300647712 bytes) INFO []
- Framework: 128.000mb (134217728 bytes) INFO []
- Task: 0 bytes INFO [] - Network: 158.720mb (166429984 bytes) INFO []
- JVM Metaspace: 256.000mb (268435456 bytes) INFO []
-JVM Overhead: 204.800mb (214748368 bytes)
```
```
---
apiVersion: v1
kind: ConfigMap
metadata:
namespace: statefun
name: flink-config
labels:
app: statefun
release: prometheus
data:
flink-conf.yaml: |+
jobmanager.rpc.address: statefun-master
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.backend.incremental: true
parallelism.default: 1
s3.access-key: minioadmin
s3.secret-key: minioadmin
state.checkpoints.dir: s3://checkpoints/subscriptions
s3.endpoint: http://minio.statefun.svc.cluster.local:9000
s3.path-style-access: true
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 2048m
#Added
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9999
# Added for latency tracking
taskmanager.latency-tracking-interval: 30000 # 30000 milliseconds (30 seconds)
metrics.latency.interval: 30000
taskmanager.network.memory.buffer-debloat.enabled: true
log4j-console.properties: |+
monitorInterval=30
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
#Change from OFF to ON
logger.netty.level = ON
---
apiVersion: v1
kind: Service
metadata:
name: statefun-master-rest
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
- name: metrics
port: 9999
selector:
app: statefun
component: master
---
apiVersion: v1
kind: Service
metadata:
name: statefun-master
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
- name: metrics
port: 9999
selector:
app: statefun
component: master
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-master
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
replicas: 1
selector:
matchLabels:
app: statefun
component: master
template:
metadata:
labels:
app: statefun
component: master
spec:
containers:
- name: master
image: apache/flink-statefun
imagePullPolicy: IfNotPresent
env:
- name: ROLE
value: master
- name: MASTER_HOST
value: statefun-master
resources:
requests:
#Changed from 0.5Gi to 1Gi to 2048m
memory: "7Gi"
#Change from 2 to 1
cpu: "1"
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
- containerPort: 9999
name: metrics
protocol: TCP
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: module-config-volume
mountPath: /opt/statefun/modules/func
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: module-config-volume
configMap:
name: module-config
items:
- key: module.yaml
path: module.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-worker
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
replicas: 1
selector:
matchLabels:
app: statefun
component: worker
template:
metadata:
labels:
app: statefun
component: worker
spec:
containers:
- name: worker
image: apache/flink-statefun
imagePullPolicy: IfNotPresent
env:
- name: ROLE
value: worker
- name: MASTER_HOST
value: statefun-master
resources:
requests:
#Changed from 0.5Gi to 1Gi to 2048m
memory: "7Gi"
cpu: "1"
ports:
- containerPort: 6122
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
- containerPort: 9999
name: metrics
protocol: TCP
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: module-config-volume
mountPath: /opt/statefun/modules/func
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: module-config-volume
configMap:
name: module-config
items:
- key: module.yaml
path: module.yaml
```
```
apiVersion: v1
kind: ConfigMap
metadata:
namespace: statefun
name: module-config
data:
module.yaml: |+
kind: io.statefun.endpoints.v2/http
spec:
functions: func/*
urlPathTemplate: http://functions.statefun.svc.cluster.local:8000/statefun
transport:
timeouts:
call: 2 min
connect: 2 min
read: 2 min
write: 2 min
---
kind: io.statefun.kafka.v1/ingress
spec:
id: func/in
address: kafka-cluster-kafka-bootstrap.default.svc:9092
consumerGroupId: my-group-id
topics:
- topic: source
valueType: io.statefun.types/string
targets:
- func/source_func
```