Hello everyone,
 
 
I have set up the Flink statefun runtime on Kubernetes according to this tutorial https://github.com/apache/flink-statefun-playground/tree/main/deployments/k8s . I developed my own statefun-Functions in Java and deployed them in the same way as shown in the tutorial.
 
Problem:
When receiving kafka-events at a very low rate everything works fine. But when the incoming rate is very high (166 records per second for about an hour), the whole statefun-master crashes after about 8 minutes. In the logs of the master pod the following message can be seen:
 
```
org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException: An error occurred when attempting to invoke function FunctionType(func, fun1).
    at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:74) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:60) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:86) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:88) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: Failure forwarding a message to a remote function Address(func, fun, 1)
    at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:170) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:124) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48) ~[statefun-flink-core.jar:3.2.0]
    ... 28 more
 
```
 
Steps taken:
I tried a lot of settings in the statefun-manifest( increasing parallellism, more memory, ..), but nothing helped. I also tried some additional settings for my Undertow Server, but that did not help either.
 
Observations:
For debugging purposes, I tried using just one function and everything worked fine. But when I use all my functions I get the problems again. I also checked in the web UI and the ingress has 100% backpressure and the union-> function is 100% busy. Just before the crash the union-> function has received about 300 messages less than the ingress. The function that cannot be reached is arbitrary, it changes all the time.
Similar problem to mine:
https://stackoverflow.com/questions/73707416/flink-statefun-under-backpressure-application-crashes
 
 
Question:
My questions is I anyone of you has an idea what the problem could be?
 
 
I would be really grateful for your help and advice. Thank you very much.
 
 
Best regards,
Oliver
 
My statefun mainfest:
 
```
---
apiVersion: v1
kind: ConfigMap
metadata:
  namespace: statefun
  name: flink-config
  labels:
    app: statefun
    release: prometheus
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: statefun-master
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
    state.backend: rocksdb
    state.backend.rocksdb.timer-service.factory: ROCKSDB
    state.backend.incremental: true
    parallelism.default: 1
    s3.access-key: minioadmin
    s3.secret-key: minioadmin
    state.checkpoints.dir: s3://checkpoints/subscriptions
    s3.endpoint: http://minio.statefun.svc.cluster.local:9000
    s3.path-style-access: true
    jobmanager.memory.process.size: 2048m
    taskmanager.memory.process.size: 2048m
    #Added
    metrics.reporters: prom
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.port: 9999
    # Added for latency tracking
    taskmanager.latency-tracking-interval: 30000  # 30000 milliseconds (30 seconds)
    metrics.latency.interval: 30000
    taskmanager.network.memory.buffer-debloat.enabled: true

  log4j-console.properties: |+
          monitorInterval=30
          rootLogger.level = DEBUG
          rootLogger.appenderRef.console.ref = ConsoleAppender
          logger.akka.name = akka
          logger.akka.level = INFO
          logger.kafka.name= org.apache.kafka
          logger.kafka.level = INFO
          logger.hadoop.name = org.apache.hadoop
          logger.hadoop.level = INFO
          logger.zookeeper.name = org.apache.zookeeper
          logger.zookeeper.level = INFO
          appender.console.name = ConsoleAppender
          appender.console.type = CONSOLE
          appender.console.layout.type = PatternLayout
          appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
          logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
          #Change from OFF to ON
          logger.netty.level = ON
---
apiVersion: v1
kind: Service
metadata:
  name: statefun-master-rest
  namespace: statefun
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "9999"
spec:
  type: NodePort
  ports:
    - name: rest
      port: 8081
      targetPort: 8081
    - name: metrics
      port: 9999
  selector:
    app: statefun
    component: master
---
apiVersion: v1
kind: Service
metadata:
  name: statefun-master
  namespace: statefun
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "9999"
spec:
  type: ClusterIP
  ports:
    - name: rpc
      port: 6123
    - name: blob
      port: 6124
    - name: ui
      port: 8081
    - name: metrics
      port: 9999
  selector:
    app: statefun
    component: master
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: statefun-master
  namespace: statefun
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "9999"
spec:
  replicas: 1
  selector:
    matchLabels:
      app: statefun
      component: master
  template:
    metadata:
      labels:
        app: statefun
        component: master
    spec:
      containers:
        - name: master
          image: apache/flink-statefun
          imagePullPolicy: IfNotPresent
          env:
            - name: ROLE
              value: master
            - name: MASTER_HOST
              value: statefun-master
          resources:
              requests:
                #Changed from 0.5Gi to 1Gi to 2048m
                memory: "7Gi"
                #Change from 2 to 1
                cpu: "1"
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob
            - containerPort: 8081
              name: ui
            - containerPort: 9999
              name: metrics
              protocol: TCP
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - name: module-config-volume
              mountPath: /opt/statefun/modules/func
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: module-config-volume
          configMap:
            name: module-config
            items:
              - key: module.yaml
                path: module.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: statefun-worker
  namespace: statefun
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "9999"
spec:
  replicas: 1
  selector:
    matchLabels:
      app: statefun
      component: worker
  template:
    metadata:
      labels:
        app: statefun
        component: worker
    spec:
      containers:
        - name: worker
          image: apache/flink-statefun
          imagePullPolicy: IfNotPresent
          env:
            - name: ROLE
              value: worker
            - name: MASTER_HOST
              value: statefun-master
          resources:
            requests:
              #Changed from 0.5Gi to 1Gi to 2048m
              memory: "7Gi"
              cpu: "1"
          ports:
            - containerPort: 6122
              name: rpc
            - containerPort: 6124
              name: blob
            - containerPort: 8081
              name: ui
            - containerPort: 9999
              name: metrics
              protocol: TCP
          livenessProbe:
            tcpSocket:
              port: 6122
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - name: module-config-volume
              mountPath: /opt/statefun/modules/func
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: module-config-volume
          configMap:
            name: module-config
            items:
              - key: module.yaml
                path: module.yaml
```
 
```
apiVersion: v1
kind: ConfigMap
metadata:
  namespace: statefun
  name: module-config
data:
  module.yaml: |+
    kind: io.statefun.endpoints.v2/http
    spec:
      functions: func/*
      urlPathTemplate: http://functions.statefun.svc.cluster.local:8000/statefun
      transport:
          timeouts:
          call: 2 min
          connect: 2 min
          read: 2 min
          write: 2 min
    ---
    kind: io.statefun.kafka.v1/ingress
    spec:
      id: func/in
      address: kafka-cluster-kafka-bootstrap.default.svc:9092
      consumerGroupId: my-group-id
      topics:
        - topic: source
          valueType: io.statefun.types/string
          targets:
            - func/source_func
```
 
 
My Undertow Server:
 
```
    public static void main(String[] args) throws IOException {
        final StatefulFunctions functions = new StatefulFunctions();
        ....
      

        final RequestReplyHandler requestReplyHandler = functions.requestReplyHandler();
        final Undertow httpServer =
                Undertow.builder()
                        .addHttpListener(8000, "0.0.0.0")
                        .setHandler(new UndertowHttpHandler(requestReplyHandler))
                        .setWorkerThreads(100)
                        .setServerOption(UndertowOptions.NO_REQUEST_TIMEOUT, 100 * 1000)   // 60 seconds no request timeout
                        .setServerOption(UndertowOptions.ENABLE_HTTP2, true)
                        .build();
        try {
            httpServer.start();
        }
        catch (Exception e){
            System.out.println(e);
            throw new RuntimeException("Server start: "+e);
        }
    }
}
 
```
 
 

Reply via email to