Hi Igal,

thanks for your quick and detailed reply! For me, this is the really great defining feature of Stateful Functions: Separating StreamProcessing "Infrastructure" from Business Logic Code, possibly maintained by a different team.

Regarding your points: I did add the checkpoint interval to the flink-conf to to avail. state.checkpoint.dir was already set and all the necessary subfolders get created on job startup. They just stay empty...

Thanks for the pointer to the helm charts! Just what I was looking for!

A question regarding StateFun docker images: I would actually prefer using them but my fear is that they would take away the my options to:

1) deploy a new release of my StateFun job without killing the cluster, because...

2) ... I would like to schedule regular flink jobs or additional StateFun jobs on the same cluster alongside my original job.

Could you give a quick opinion if these fears are even true and if so, what would be a recommended setup to satisfy these use cases?


Best regards

Jan


On 05.11.20 17:02, Igal Shilman wrote:
Hi Jan,

The architecture outlined by you, sounds good and we've run successfully mixed architectures like this.
Let me try to address your questions:

1)
To enable checkpointing you need to set the relevant values in your flink-conf.yaml file.
execution.checkpointing.interval: <duration> (see [1])
state.checkpoint.dir: <path> (see [2])

You can take a look here for an example [3]. The easiest way to incorporate the changes would be to add your custom flink-conf.yaml into your docker image (here is an example [4]). When you will be using kubernetes, you can mount a config map as a flink-conf.yaml, check out the helm charts here: [5]

2)
When the remote function is unavailable, StateFun would buffer the messages addressed to it, upto the specified timeout (default would be 1 minute, you can set it here [6]) before the job is considered to be failed and it would be restarted. It seems like in your example you are waiting for 10 seconds, so the messages should be delivered. Do you set function.spec.timeout or .withMaxRequestDuration() to something else?


Good luck!
Igal.

p.s,
Consider using StateFun docker images[7], see any of the examples in the statefun repository.


[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval <https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval> [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir <https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir> [3] https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml <https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml> [4] https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile#L20 <https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile#L20> [5] https://github.com/apache/flink-statefun/tree/master/tools/k8s <https://github.com/apache/flink-statefun/tree/master/tools/k8s> [6] look for function.spec.timeout at https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/index.html <https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/index.html> [7] https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#images <https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#images>

On Thu, Nov 5, 2020 at 3:35 PM Jan Brusch <jan.bru...@neuland-bfi.de <mailto:jan.bru...@neuland-bfi.de>> wrote:

    Hi,

    I'm currently trying to set up a Flink Stateful Functions Job with
    the
    following architecture:

    * Kinesis Ingress (embedded)

    * Stateful Function (embedded) that calls to and takes responses
    from an
    external business logic function (python worker similar to the one in
    the python greeter example)

    * Kinesis Egress (embedded)


    For the time being I am working with a local docker-compose
    cluster, but
    the goal would be to move this to kubernetes for production. The
    stream
    processing itself is working fine, but I can't solve two problems
    with
    respect to Fault Tolerance:

    1) The app is not writing checkpoints or savepoints at all (rocksDB,
    local filesystem). A checkpoint dir is created on startup but stays
    empty the whole time. When stopping the job, a savepoint dir is
    created
    but the stop ultimately fails with a
    java.util.concurrent.TimeoutException and the job continues to run.

    2) When I try and simulate failure in the external Function
    ("docker-compose stop python-worker && sleep 10 && docker-compose
    start
    python-worker"), I lose all messages in between restarts.
    Although, the
    documentation states that "For both state and messaging, Stateful
    Functions is able to provide the exactly-once guarantees users expect
    from a modern data processing framework".

    See the relevant parts of my configs below.

    Any input or help would be greatly appreciated.


    Best regards

    Jan


    ------

    flink-conf.yaml

    -------

    jobmanager.rpc.address: jobmanager
    jobmanager.rpc.port: 6123
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    taskmanager.numberOfTaskSlots: 1
    parallelism.default: 1
    state.backend: rocksdb
    state.backend.rocksdb.timer-service.factory: ROCKSDB
    state.checkpoints.dir: file:///checkpoint-dir
    state.savepoints.dir: file:///checkpoint-dir
    jobmanager.execution.failover-strategy: region
    blob.server.port: 6124
    query.server.port: 6125
    classloader.parent-first-patterns.additional:
    org.apache.flink.statefun;org.apache.kafka;com.google.protobuf


    --------

    docker-compose.yaml

    -------

       jobmanager:
         image: flink:1.11.2-scala_2.12-java8
         expose:
           - "6123"
         ports:
           - "8082:8081"
         volumes:
           - ./streamProcessor/checkpoint-dir:/checkpoint-dir
           -
    ./streamProcessor/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml:ro
         command: jobmanager
         environment:
           - JOB_MANAGER_RPC_ADDRESS=jobmanager
           -
    "FLINK_PROPERTIES=classloader.parent-first-patterns.additional:
    org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"
       taskmanager:
         image: flink:1.11.2-scala_2.12-java8
         expose:
           - "6121"
           - "6122"
         depends_on:
           - jobmanager
         command: taskmanager
         links:
           - "jobmanager:jobmanager"
         environment:
           - JOB_MANAGER_RPC_ADDRESS=jobmanager
           -
    "FLINK_PROPERTIES=classloader.parent-first-patterns.additional:
    org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply via email to