Hello everyone,

I have set up the Flink statefun runtime on Kubernetes according to this tutorial https://github.com/apache/flink-statefun-playground/tree/main/deployments/k8s. I developed 12 custom statefun-Functions in Java and deployed them in the same way as shown in the tutorial. There are a lot of functions instances running at the same time. After receiving about 1 million messages the statefun master restarts and I see this in the log of the statefun worker.

 

```

2024-08-31 18:52:14,327 WARN org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception caught while trying to deliver a message: (attempt #0)ToFunctionRequestSummary(address=Address(func, parse, 0), batchSize=22680, totalSizeInBytes=5947037, numberOfStates=0) java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?] at java.nio.DirectByteBuffer.<init>(Unknown Source) ~[?:?] at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:632) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:607) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:202) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:186) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:136) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:126) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyProtobuf.serializeProtobuf(NettyProtobuf.java:39) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyHandler.write(NettyRequestReplyHandler.java:81) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:306) ~[statefun-flink-distribution.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyClient.writeAndFlush(NettyClient.java:188) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyRequest.onChannelAcquisitionComplete(NettyRequest.java:146) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.statefun.flink.core.nettyclient.NettyClient.lambda$acquireChannel$0(NettyClient.java:129) ~[statefun-flink-core.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [statefun-flink-distribution.jar:3.2.0] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [statefun-flink-distribution.jar:3.2.0] at java.lang.Thread.run(Unknown Source) [?:?]

```

I am seeking advice on how to address this issue. Specifically, I would like to know:

  1. How can I increase memory allocation in the Statefun manifest?
  2. What might be causing this crash?

I would be really grateful for your help and advice. Thank you very much.

Best regards,

Oliver

 

 

Here are some details from the resource configuration and logs for context:

```

RESOURCE_PARAMS extraction logs: jvm_params: -Xmx697932173 -Xms697932173 -XX:MaxDirectMemorySize=300647712 -XX:MaxMetaspaceSize=268435456 dynamic_configs: -D taskmanager.memory.network.min=166429984b -D taskmanager.cpu.cores=2.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=214748368b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=166429984b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=665719939b -D taskmanager.memory.task.heap.size=563714445b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=214748368b logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance. INFO [] - Loading configuration property: jobmanager.rpc.address, statefun-master INFO [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2 INFO [] - Loading configuration property: blob.server.port, 6124 INFO [] - Loading configuration property: jobmanager.rpc.port, 6123 INFO [] - Loading configuration property: taskmanager.rpc.port, 6122 INFO [] - Loading configuration property: classloader.parent-first-patterns.additional, org.apache.flink.statefun;org.apache.kafka;com.google.protobuf INFO [] - Loading configuration property: state.backend, rocksdb INFO [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, ROCKSDB INFO [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, ROCKSDB INFO [] - Loading configuration property: state.backend.incremental, true INFO [] - Loading configuration property: parallelism.default, 1 INFO [] - Loading configuration property: s3.access-key, minioadmin INFO [] - Loading configuration property: s3.secret-key, ****** INFO [] - Loading configuration property: state.checkpoints.dir, s3://checkpoints/subscriptions INFO [] - Loading configuration property: s3.endpoint, http://minio.statefun.svc.cluster.local:9000 INFO [] - Loading configuration property: s3.path-style-access, true INFO [] - Loading configuration property: jobmanager.memory.process.size, 2048m INFO [] - Loading configuration property: taskmanager.memory.process.size, 2048m INFO [] - Loading configuration property: metrics.reporters, prom INFO [] - Loading configuration property: metrics.reporter.prom.class, org.apache.flink.metrics.prometheus.PrometheusReporter INFO [] - Loading configuration property: metrics.reporter.prom.port, 9999 INFO [] - Loading configuration property: taskmanager.latency-tracking-interval, 30000 INFO [] - Loading configuration property: metrics.latency.interval, 30000 INFO [] - Loading configuration property: taskmanager.network.memory.buffer-debloat.enabled, true INFO [] -

Final TaskExecutor Memory configuration: INFO [] - Total Process Memory: 2.000gb (2147483648 bytes) INFO [] -

Total Flink Memory: 1.550gb (1664299824 bytes) INFO [] -

Total JVM Heap Memory: 665.600mb (697932173 bytes) INFO []

- Framework: 128.000mb (134217728 bytes) INFO []

- Task: 537.600mb (563714445 bytes) INFO []

- Total Off-heap Memory: 921.600mb (966367651 bytes) INFO []

- Managed: 634.880mb (665719939 bytes) INFO []

- Total JVM Direct Memory: 286.720mb (300647712 bytes) INFO []

- Framework: 128.000mb (134217728 bytes) INFO []

- Task: 0 bytes INFO [] - Network: 158.720mb (166429984 bytes) INFO []

- JVM Metaspace: 256.000mb (268435456 bytes) INFO []

-JVM Overhead: 204.800mb (214748368 bytes)

```

```

---

apiVersion: v1

kind: ConfigMap

metadata:

  namespace: statefun

  name: flink-config

  labels:

    app: statefun

    release: prometheus

data:

  flink-conf.yaml: |+

    jobmanager.rpc.address: statefun-master

    taskmanager.numberOfTaskSlots: 2

    blob.server.port: 6124

    jobmanager.rpc.port: 6123

    taskmanager.rpc.port: 6122

    classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf

    state.backend: rocksdb

    state.backend.rocksdb.timer-service.factory: ROCKSDB

    state.backend.incremental: true

    parallelism.default: 1

    s3.access-key: minioadmin

    s3.secret-key: minioadmin

    state.checkpoints.dir: s3://checkpoints/subscriptions

    s3.endpoint: http://minio.statefun.svc.cluster.local:9000

    s3.path-style-access: true

    jobmanager.memory.process.size: 2048m

    taskmanager.memory.process.size: 2048m

    #Added

    metrics.reporters: prom

    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

    metrics.reporter.prom.port: 9999

    # Added for latency tracking

    taskmanager.latency-tracking-interval: 30000  # 30000 milliseconds (30 seconds)

    metrics.latency.interval: 30000

    taskmanager.network.memory.buffer-debloat.enabled: true

  log4j-console.properties: |+

          monitorInterval=30

          rootLogger.level = DEBUG

          rootLogger.appenderRef.console.ref = ConsoleAppender

          logger.akka.name = akka

          logger.akka.level = INFO

          logger.kafka.name= org.apache.kafka

          logger.kafka.level = INFO

          logger.hadoop.name = org.apache.hadoop

          logger.hadoop.level = INFO

          logger.zookeeper.name = org.apache.zookeeper

          logger.zookeeper.level = INFO

          appender.console.name = ConsoleAppender

          appender.console.type = CONSOLE

          appender.console.layout.type = PatternLayout

          appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

          logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline

          #Change from OFF to ON

          logger.netty.level = ON

---

apiVersion: v1

kind: Service

metadata:

  name: statefun-master-rest

  namespace: statefun

  annotations:

    prometheus.io/scrape: "true"

    prometheus.io/port: "9999"

spec:

  type: NodePort

  ports:

    - name: rest

      port: 8081

      targetPort: 8081

    - name: metrics

      port: 9999

  selector:

    app: statefun

    component: master

---

apiVersion: v1

kind: Service

metadata:

  name: statefun-master

  namespace: statefun

  annotations:

    prometheus.io/scrape: "true"

    prometheus.io/port: "9999"

spec:

  type: ClusterIP

  ports:

    - name: rpc

      port: 6123

    - name: blob

      port: 6124

    - name: ui

      port: 8081

    - name: metrics

      port: 9999

  selector:

    app: statefun

    component: master

---

apiVersion: apps/v1

kind: Deployment

metadata:

  name: statefun-master

  namespace: statefun

  annotations:

    prometheus.io/scrape: "true"

    prometheus.io/port: "9999"

spec:

  replicas: 1

  selector:

    matchLabels:

      app: statefun

      component: master

  template:

    metadata:

      labels:

        app: statefun

        component: master

    spec:

      containers:

        - name: master

          image: apache/flink-statefun

          imagePullPolicy: IfNotPresent

          env:

            - name: ROLE

              value: master

            - name: MASTER_HOST

              value: statefun-master

          resources:

              requests:

                #Changed from 0.5Gi to 1Gi to 2048m

                memory: "7Gi"

                #Change from 2 to 1

                cpu: "1"

          ports:

            - containerPort: 6123

              name: rpc

            - containerPort: 6124

              name: blob

            - containerPort: 8081

              name: ui

            - containerPort: 9999

              name: metrics

              protocol: TCP

          livenessProbe:

            tcpSocket:

              port: 6123

            initialDelaySeconds: 30

            periodSeconds: 60

          volumeMounts:

            - name: flink-config-volume

              mountPath: /opt/flink/conf

            - name: module-config-volume

              mountPath: /opt/statefun/modules/func

      volumes:

        - name: flink-config-volume

          configMap:

            name: flink-config

            items:

              - key: flink-conf.yaml

                path: flink-conf.yaml

              - key: log4j-console.properties

                path: log4j-console.properties

        - name: module-config-volume

          configMap:

            name: module-config

            items:

              - key: module.yaml

                path: module.yaml

---

apiVersion: apps/v1

kind: Deployment

metadata:

  name: statefun-worker

  namespace: statefun

  annotations:

    prometheus.io/scrape: "true"

    prometheus.io/port: "9999"

spec:

  replicas: 1

  selector:

    matchLabels:

      app: statefun

      component: worker

  template:

    metadata:

      labels:

        app: statefun

        component: worker

    spec:

      containers:

        - name: worker

          image: apache/flink-statefun

          imagePullPolicy: IfNotPresent

          env:

            - name: ROLE

              value: worker

            - name: MASTER_HOST

              value: statefun-master

          resources:

            requests:

              #Changed from 0.5Gi to 1Gi to 2048m

              memory: "7Gi"

              cpu: "1"

          ports:

            - containerPort: 6122

              name: rpc

            - containerPort: 6124

              name: blob

            - containerPort: 8081

              name: ui

            - containerPort: 9999

              name: metrics

              protocol: TCP

          livenessProbe:

            tcpSocket:

              port: 6122

            initialDelaySeconds: 30

            periodSeconds: 60

          volumeMounts:

            - name: flink-config-volume

              mountPath: /opt/flink/conf

            - name: module-config-volume

              mountPath: /opt/statefun/modules/func

      volumes:

        - name: flink-config-volume

          configMap:

            name: flink-config

            items:

              - key: flink-conf.yaml

                path: flink-conf.yaml

              - key: log4j-console.properties

                path: log4j-console.properties

        - name: module-config-volume

          configMap:

            name: module-config

            items:

              - key: module.yaml

                path: module.yaml

```

 

```

apiVersion: v1

kind: ConfigMap

metadata:

  namespace: statefun

  name: module-config

data:

  module.yaml: |+

    kind: io.statefun.endpoints.v2/http

    spec:

      functions: func/*

      urlPathTemplate: http://functions.statefun.svc.cluster.local:8000/statefun

      transport:

          timeouts:

          call: 2 min

          connect: 2 min

          read: 2 min

          write: 2 min

    ---

    kind: io.statefun.kafka.v1/ingress

    spec:

      id: func/in

      address: kafka-cluster-kafka-bootstrap.default.svc:9092

      consumerGroupId: my-group-id

      topics:

        - topic: source

          valueType: io.statefun.types/string

          targets:

            - func/source_func

```

 

 

Reply via email to