Hi,

I'm currently trying to set up a Flink Stateful Functions Job with the following architecture:

* Kinesis Ingress (embedded)

* Stateful Function (embedded) that calls to and takes responses from an external business logic function (python worker similar to the one in the python greeter example)

* Kinesis Egress (embedded)


For the time being I am working with a local docker-compose cluster, but the goal would be to move this to kubernetes for production. The stream processing itself is working fine, but I can't solve two problems with respect to Fault Tolerance:

1) The app is not writing checkpoints or savepoints at all (rocksDB, local filesystem). A checkpoint dir is created on startup but stays empty the whole time. When stopping the job, a savepoint dir is created but the stop ultimately fails with a java.util.concurrent.TimeoutException and the job continues to run.

2) When I try and simulate failure in the external Function ("docker-compose stop python-worker && sleep 10 && docker-compose start python-worker"), I lose all messages in between restarts. Although, the documentation states that "For both state and messaging, Stateful Functions is able to provide the exactly-once guarantees users expect from a modern data processing framework".

See the relevant parts of my configs below.

Any input or help would be greatly appreciated.


Best regards

Jan


------

flink-conf.yaml

-------

jobmanager.rpc.address: jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.checkpoints.dir: file:///checkpoint-dir
state.savepoints.dir: file:///checkpoint-dir
jobmanager.execution.failover-strategy: region
blob.server.port: 6124
query.server.port: 6125
classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf


--------

docker-compose.yaml

-------

  jobmanager:
    image: flink:1.11.2-scala_2.12-java8
    expose:
      - "6123"
    ports:
      - "8082:8081"
    volumes:
      - ./streamProcessor/checkpoint-dir:/checkpoint-dir
      - ./streamProcessor/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml:ro
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
      - "FLINK_PROPERTIES=classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"
  taskmanager:
    image: flink:1.11.2-scala_2.12-java8
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
      - "FLINK_PROPERTIES=classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"

Reply via email to