Hi Igal,

thanks for these pointers!

I currently deploy a flink jar per docker copy. But this is a spike setup anyway. I will now discard it and switch directly to working in kubernetes.

So, just so I understand this right, the recommended production setup would be:

* Build a docker image containing the job and custom flink-conf.yaml based on this: https://github.com/apache/flink-statefun/tree/master/tools/docker

* Deploy the job image to kubernetes per helm into a standalone StateFun Jobcluster: https://github.com/apache/flink-statefun/tree/master/tools/k8s

* Repeat the above steps for each StateFun Job separately


Two questions left:

1) Does the recommendation of one Cluster per Job in kubernetes setups also hold for "regular" Flink Jobs?

2) Do you (ververica) offer developer training with special focus on Stateful Functions? I would probably feel a bit safer moving into production with that as a background... :-)


Thanks again for your quick and comprehensive replies!

Best regards and a nice evening!

Jan

On 05.11.20 17:55, Igal Shilman wrote:
How do you deploy the job currently?
Are you using the data stream integration / or as a Flink Jar [1]

(also please note, that the directories might be created but without checkpoint interval set, they will be empty)

Regarding your two questions:

That is true that you can theoretically share the same cluster to submit additional jobs besides StateFun. statefun requires a specific set of configurations, that might not apply for your other jobs. Considering your end-goal of eventually using kubernetes, the recommended way is actually using a cluster per job, and StateFun docker images
are a convenient way to package your modules.

[1] https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#flink-jar <https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#flink-jar>


On Thu, Nov 5, 2020 at 5:29 PM Jan Brusch <jan.bru...@neuland-bfi.de <mailto:jan.bru...@neuland-bfi.de>> wrote:

    Hi Igal,

    thanks for your quick and detailed reply! For me, this is the
    really great defining feature of Stateful Functions: Separating
    StreamProcessing "Infrastructure" from Business Logic Code,
    possibly maintained by a different team.

    Regarding your points: I did add the checkpoint interval to the
    flink-conf to to avail. state.checkpoint.dir was already set and
    all the necessary subfolders get created on job startup. They just
    stay empty...

    Thanks for the pointer to the helm charts! Just what I was looking
    for!

    A question regarding StateFun docker images: I would actually
    prefer using them but my fear is that they would take away the my
    options to:

    1) deploy a new release of my StateFun job without killing the
    cluster, because...

    2) ... I would like to schedule regular flink jobs or additional
    StateFun jobs on the same cluster alongside my original job.

    Could you give a quick opinion if these fears are even true and if
    so, what would be a recommended setup to satisfy these use cases?


    Best regards

    Jan


    On 05.11.20 17:02, Igal Shilman wrote:
    Hi Jan,

    The architecture outlined by you, sounds good and we've run
    successfully mixed architectures like this.
    Let me try to address your questions:

    1)
    To enable checkpointing you need to set the relevant values in
    your flink-conf.yaml file.
    execution.checkpointing.interval: <duration> (see [1])
    state.checkpoint.dir: <path> (see [2])

    You can take a look here for an example [3]. The easiest way to
    incorporate the changes would be to add your custom
    flink-conf.yaml into your docker image (here is an example [4]).
    When you will be using kubernetes, you can mount a config map as
    a flink-conf.yaml, check out the helm charts here: [5]

    2)
    When the remote function is unavailable, StateFun would buffer
    the messages addressed to it, upto the specified
    timeout (default would be 1 minute, you can set it here [6])
    before the job is considered to be failed and it would be restarted.
    It seems like in your example you are waiting for 10 seconds, so
    the messages should be delivered.
    Do you set function.spec.timeout or .withMaxRequestDuration() to
    something else?


    Good luck!
    Igal.

    p.s,
    Consider using StateFun docker images[7], see any of the examples
    in the statefun repository.


    [1]
    
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval
    
<https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval>
    [2]
    
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir
    
<https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir>

    [3]
    
https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
    
<https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml>
    [4]
    
https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile#L20
    
<https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile#L20>
    [5]
    https://github.com/apache/flink-statefun/tree/master/tools/k8s
    <https://github.com/apache/flink-statefun/tree/master/tools/k8s>
    [6] look for function.spec.timeout at
    
https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/index.html
    
<https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/index.html>
    [7]
    
https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#images
    
<https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#images>

    On Thu, Nov 5, 2020 at 3:35 PM Jan Brusch
    <jan.bru...@neuland-bfi.de <mailto:jan.bru...@neuland-bfi.de>> wrote:

        Hi,

        I'm currently trying to set up a Flink Stateful Functions Job
        with the
        following architecture:

        * Kinesis Ingress (embedded)

        * Stateful Function (embedded) that calls to and takes
        responses from an
        external business logic function (python worker similar to
        the one in
        the python greeter example)

        * Kinesis Egress (embedded)


        For the time being I am working with a local docker-compose
        cluster, but
        the goal would be to move this to kubernetes for production.
        The stream
        processing itself is working fine, but I can't solve two
        problems with
        respect to Fault Tolerance:

        1) The app is not writing checkpoints or savepoints at all
        (rocksDB,
        local filesystem). A checkpoint dir is created on startup but
        stays
        empty the whole time. When stopping the job, a savepoint dir
        is created
        but the stop ultimately fails with a
        java.util.concurrent.TimeoutException and the job continues
        to run.

        2) When I try and simulate failure in the external Function
        ("docker-compose stop python-worker && sleep 10 &&
        docker-compose start
        python-worker"), I lose all messages in between restarts.
        Although, the
        documentation states that "For both state and messaging,
        Stateful
        Functions is able to provide the exactly-once guarantees
        users expect
        from a modern data processing framework".

        See the relevant parts of my configs below.

        Any input or help would be greatly appreciated.


        Best regards

        Jan


        ------

        flink-conf.yaml

        -------

        jobmanager.rpc.address: jobmanager
        jobmanager.rpc.port: 6123
        jobmanager.memory.process.size: 1600m
        taskmanager.memory.process.size: 1728m
        taskmanager.numberOfTaskSlots: 1
        parallelism.default: 1
        state.backend: rocksdb
        state.backend.rocksdb.timer-service.factory: ROCKSDB
        state.checkpoints.dir: file:///checkpoint-dir
        state.savepoints.dir: file:///checkpoint-dir
        jobmanager.execution.failover-strategy: region
        blob.server.port: 6124
        query.server.port: 6125
        classloader.parent-first-patterns.additional:
        org.apache.flink.statefun;org.apache.kafka;com.google.protobuf


        --------

        docker-compose.yaml

        -------

           jobmanager:
             image: flink:1.11.2-scala_2.12-java8
             expose:
               - "6123"
             ports:
               - "8082:8081"
             volumes:
               - ./streamProcessor/checkpoint-dir:/checkpoint-dir
               -
        
./streamProcessor/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml:ro
             command: jobmanager
             environment:
               - JOB_MANAGER_RPC_ADDRESS=jobmanager
               -
        "FLINK_PROPERTIES=classloader.parent-first-patterns.additional:
        org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"
           taskmanager:
             image: flink:1.11.2-scala_2.12-java8
             expose:
               - "6121"
               - "6122"
             depends_on:
               - jobmanager
             command: taskmanager
             links:
               - "jobmanager:jobmanager"
             environment:
               - JOB_MANAGER_RPC_ADDRESS=jobmanager
               -
        "FLINK_PROPERTIES=classloader.parent-first-patterns.additional:
        org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"

-- neuland – Büro für Informatik GmbH
    Konsul-Smidt-Str. 8g, 28217 Bremen

    Telefon (0421) 380107 57
    Fax (0421) 380107 99
    https://www.neuland-bfi.de  <https://www.neuland-bfi.de>

    https://twitter.com/neuland  <https://twitter.com/neuland>
    https://facebook.com/neulandbfi  <https://facebook.com/neulandbfi>
    https://xing.com/company/neulandbfi  <https://xing.com/company/neulandbfi>


    Geschäftsführer: Thomas Gebauer, Jan Zander
    Registergericht: Amtsgericht Bremen, HRB 23395 HB
    USt-ID. DE 246585501

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply via email to