Hello everyone,
I have set up the Flink statefun runtime on Kubernetes according to this tutorial https://github.com/apache/flink-statefun-playground/tree/main/deployments/k8s . I developed my own statefun-Functions in Java and deployed them in the same way as shown in the tutorial.
Problem:
When receiving kafka-events at a very low rate everything works fine. But when the incoming rate is very high (166 records per second for about an hour), the whole statefun-master crashes after about 8 minutes. In the logs of the master pod the following message can be seen:
```
org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException: An error occurred when attempting to invoke function FunctionType(func, fun1).
at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:74) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:60) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:86) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:88) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: Failure forwarding a message to a remote function Address(func, fun, 1)
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:170) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:124) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48) ~[statefun-flink-core.jar:3.2.0]
... 28 more
at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:74) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:60) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:86) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:88) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: Failure forwarding a message to a remote function Address(func, fun, 1)
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:170) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:124) ~[statefun-flink-core.jar:3.2.0]
at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48) ~[statefun-flink-core.jar:3.2.0]
... 28 more
```
Steps taken:
I tried a lot of settings in the statefun-manifest( increasing parallellism, more memory, ..), but nothing helped. I also tried some additional settings for my Undertow Server, but that did not help either.
Observations:
For debugging purposes, I tried using just one function and everything worked fine. But when I use all my functions I get the problems again. I also checked in the web UI and the ingress has 100% backpressure and the union-> function is 100% busy. Just before the crash the union-> function has received about 300 messages less than the ingress. The function that cannot be reached is arbitrary, it changes all the time.
Similar problem to mine:
https://stackoverflow.com/questions/73707416/flink-statefun-under-backpressure-application-crashes
Question:
My questions is I anyone of you has an idea what the problem could be?
I would be really grateful for your help and advice. Thank you very much.
Best regards,
Oliver
My statefun mainfest:
```
---
apiVersion: v1
kind: ConfigMap
metadata:
namespace: statefun
name: flink-config
labels:
app: statefun
release: prometheus
data:
flink-conf.yaml: |+
jobmanager.rpc.address: statefun-master
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.backend.incremental: true
parallelism.default: 1
s3.access-key: minioadmin
s3.secret-key: minioadmin
state.checkpoints.dir: s3://checkpoints/subscriptions
s3.endpoint: http://minio.statefun.svc.cluster.local:9000
s3.path-style-access: true
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 2048m
#Added
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9999
# Added for latency tracking
taskmanager.latency-tracking-interval: 30000 # 30000 milliseconds (30 seconds)
metrics.latency.interval: 30000
taskmanager.network.memory.buffer-debloat.enabled: true
apiVersion: v1
kind: ConfigMap
metadata:
namespace: statefun
name: flink-config
labels:
app: statefun
release: prometheus
data:
flink-conf.yaml: |+
jobmanager.rpc.address: statefun-master
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.backend.incremental: true
parallelism.default: 1
s3.access-key: minioadmin
s3.secret-key: minioadmin
state.checkpoints.dir: s3://checkpoints/subscriptions
s3.endpoint: http://minio.statefun.svc.cluster.local:9000
s3.path-style-access: true
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 2048m
#Added
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9999
# Added for latency tracking
taskmanager.latency-tracking-interval: 30000 # 30000 milliseconds (30 seconds)
metrics.latency.interval: 30000
taskmanager.network.memory.buffer-debloat.enabled: true
log4j-console.properties: |+
monitorInterval=30
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
#Change from OFF to ON
logger.netty.level = ON
---
apiVersion: v1
kind: Service
metadata:
name: statefun-master-rest
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
- name: metrics
port: 9999
selector:
app: statefun
component: master
---
apiVersion: v1
kind: Service
metadata:
name: statefun-master
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
- name: metrics
port: 9999
selector:
app: statefun
component: master
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-master
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
replicas: 1
selector:
matchLabels:
app: statefun
component: master
template:
metadata:
labels:
app: statefun
component: master
spec:
containers:
- name: master
image: apache/flink-statefun
imagePullPolicy: IfNotPresent
env:
- name: ROLE
value: master
- name: MASTER_HOST
value: statefun-master
resources:
requests:
#Changed from 0.5Gi to 1Gi to 2048m
memory: "7Gi"
#Change from 2 to 1
cpu: "1"
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
- containerPort: 9999
name: metrics
protocol: TCP
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: module-config-volume
mountPath: /opt/statefun/modules/func
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: module-config-volume
configMap:
name: module-config
items:
- key: module.yaml
path: module.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-worker
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
replicas: 1
selector:
matchLabels:
app: statefun
component: worker
template:
metadata:
labels:
app: statefun
component: worker
spec:
containers:
- name: worker
image: apache/flink-statefun
imagePullPolicy: IfNotPresent
env:
- name: ROLE
value: worker
- name: MASTER_HOST
value: statefun-master
resources:
requests:
#Changed from 0.5Gi to 1Gi to 2048m
memory: "7Gi"
cpu: "1"
ports:
- containerPort: 6122
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
- containerPort: 9999
name: metrics
protocol: TCP
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: module-config-volume
mountPath: /opt/statefun/modules/func
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: module-config-volume
configMap:
name: module-config
items:
- key: module.yaml
path: module.yaml
apiVersion: v1
kind: Service
metadata:
name: statefun-master-rest
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
- name: metrics
port: 9999
selector:
app: statefun
component: master
---
apiVersion: v1
kind: Service
metadata:
name: statefun-master
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
- name: metrics
port: 9999
selector:
app: statefun
component: master
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-master
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
replicas: 1
selector:
matchLabels:
app: statefun
component: master
template:
metadata:
labels:
app: statefun
component: master
spec:
containers:
- name: master
image: apache/flink-statefun
imagePullPolicy: IfNotPresent
env:
- name: ROLE
value: master
- name: MASTER_HOST
value: statefun-master
resources:
requests:
#Changed from 0.5Gi to 1Gi to 2048m
memory: "7Gi"
#Change from 2 to 1
cpu: "1"
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
- containerPort: 9999
name: metrics
protocol: TCP
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: module-config-volume
mountPath: /opt/statefun/modules/func
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: module-config-volume
configMap:
name: module-config
items:
- key: module.yaml
path: module.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-worker
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
replicas: 1
selector:
matchLabels:
app: statefun
component: worker
template:
metadata:
labels:
app: statefun
component: worker
spec:
containers:
- name: worker
image: apache/flink-statefun
imagePullPolicy: IfNotPresent
env:
- name: ROLE
value: worker
- name: MASTER_HOST
value: statefun-master
resources:
requests:
#Changed from 0.5Gi to 1Gi to 2048m
memory: "7Gi"
cpu: "1"
ports:
- containerPort: 6122
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
- containerPort: 9999
name: metrics
protocol: TCP
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: module-config-volume
mountPath: /opt/statefun/modules/func
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: module-config-volume
configMap:
name: module-config
items:
- key: module.yaml
path: module.yaml
```
```
apiVersion: v1
kind: ConfigMap
metadata:
namespace: statefun
name: module-config
data:
module.yaml: |+
kind: io.statefun.endpoints.v2/http
spec:
functions: func/*
urlPathTemplate: http://functions.statefun.svc.cluster.local:8000/statefun
transport:
timeouts:
call: 2 min
connect: 2 min
read: 2 min
write: 2 min
---
kind: io.statefun.kafka.v1/ingress
spec:
id: func/in
address: kafka-cluster-kafka-bootstrap.default.svc:9092
consumerGroupId: my-group-id
topics:
- topic: source
valueType: io.statefun.types/string
targets:
- func/source_func
kind: ConfigMap
metadata:
namespace: statefun
name: module-config
data:
module.yaml: |+
kind: io.statefun.endpoints.v2/http
spec:
functions: func/*
urlPathTemplate: http://functions.statefun.svc.cluster.local:8000/statefun
transport:
timeouts:
call: 2 min
connect: 2 min
read: 2 min
write: 2 min
---
kind: io.statefun.kafka.v1/ingress
spec:
id: func/in
address: kafka-cluster-kafka-bootstrap.default.svc:9092
consumerGroupId: my-group-id
topics:
- topic: source
valueType: io.statefun.types/string
targets:
- func/source_func
```
My Undertow Server:
```
public static void main(String[] args) throws IOException {
final StatefulFunctions functions = new StatefulFunctions();
....
final StatefulFunctions functions = new StatefulFunctions();
....
final RequestReplyHandler requestReplyHandler = functions.requestReplyHandler();
final Undertow httpServer =
Undertow.builder()
.addHttpListener(8000, "0.0.0.0")
.setHandler(new UndertowHttpHandler(requestReplyHandler))
.setWorkerThreads(100)
.setServerOption(UndertowOptions.NO_REQUEST_TIMEOUT, 100 * 1000) // 60 seconds no request timeout
.setServerOption(UndertowOptions.ENABLE_HTTP2, true)
.build();
try {
httpServer.start();
}
catch (Exception e){
System.out.println(e);
throw new RuntimeException("Server start: "+e);
}
}
}
```