Hi,
I'm currently trying to set up a Flink Stateful Functions Job with the
following architecture:
* Kinesis Ingress (embedded)
* Stateful Function (embedded) that calls to and takes responses from an
external business logic function (python worker similar to the one in
the python greeter example)
* Kinesis Egress (embedded)
For the time being I am working with a local docker-compose cluster, but
the goal would be to move this to kubernetes for production. The stream
processing itself is working fine, but I can't solve two problems with
respect to Fault Tolerance:
1) The app is not writing checkpoints or savepoints at all (rocksDB,
local filesystem). A checkpoint dir is created on startup but stays
empty the whole time. When stopping the job, a savepoint dir is created
but the stop ultimately fails with a
java.util.concurrent.TimeoutException and the job continues to run.
2) When I try and simulate failure in the external Function
("docker-compose stop python-worker && sleep 10 && docker-compose start
python-worker"), I lose all messages in between restarts. Although, the
documentation states that "For both state and messaging, Stateful
Functions is able to provide the exactly-once guarantees users expect
from a modern data processing framework".
See the relevant parts of my configs below.
Any input or help would be greatly appreciated.
Best regards
Jan
------
flink-conf.yaml
-------
jobmanager.rpc.address: jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.checkpoints.dir: file:///checkpoint-dir
state.savepoints.dir: file:///checkpoint-dir
jobmanager.execution.failover-strategy: region
blob.server.port: 6124
query.server.port: 6125
classloader.parent-first-patterns.additional:
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
--------
docker-compose.yaml
-------
jobmanager:
image: flink:1.11.2-scala_2.12-java8
expose:
- "6123"
ports:
- "8082:8081"
volumes:
- ./streamProcessor/checkpoint-dir:/checkpoint-dir
-
./streamProcessor/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml:ro
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- "FLINK_PROPERTIES=classloader.parent-first-patterns.additional:
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"
taskmanager:
image: flink:1.11.2-scala_2.12-java8
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- "FLINK_PROPERTIES=classloader.parent-first-patterns.additional:
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"