Hello Flink users,

I have an application of around 10 enrichment joins. All events are read from kafka and have event timestamps. The joins are built using .cogroup, with a global window, triggering on every 1 event, plus a custom evictor that drops records once a newer record for the same ID has been processed. Deletes are represented by empty events with timestamp and ID (tombstones). That way, we can drop records when business logic dictates, as opposed to when a maximum retention has been attained. The application runs RocksDBStateBackend, on Kubernetes on AWS with local SSDs.

Unit tests show that the joins produce expected results. On an 8 node cluster, watermark output progress seems to indicate I should be able to bootstrap my state of around 500GB in around 1 day. I am able to save and restore savepoints for the first half an hour of run time.

My current trouble is that after around 50GB of state, I stop being able to reliably take checkpoints or savepoints. Some time after that, I start getting a variety of failures where the first suspicious log event is a generic cluster connectivity error, such as:

    1) java.io.IOException: Connecting the channel failed: Connecting
    to remote task manager + '/10.67.7.101:38955' has failed. This
    might indicate that the remote task manager has been lost.

    2) org.apache.flink.runtime.io.network.netty.exception
    .RemoteTransportException: Connection unexpectedly closed by remote
    task manager 'null'. This might indicate that the remote task
    manager was lost.

    3) Association with remote system
    [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
    gated for [50] ms. Reason: [Association failed with
    [akka.tcp://flink@10.67.6.66:34987]] Caused by:
    [java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum savable state size.

I could enable HA, but for the time being I have been leaving it out to avoid the possibility of masking deterministic faults.

Below are my configurations.

Thanks in advance for any advice.

Regards,


Jeff Henrikson



Flink version: 1.10

Configuration set via code:
    parallelism=8
    maxParallelism=64
    setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
    setTolerableCheckpointFailureNumber(1000)
    setMaxConcurrentCheckpoints(1)

enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    RocksDBStateBackend
    setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
    setNumberOfTransferThreads(25)
    setDbStoragePath points to a local nvme SSD

Configuration in flink-conf.yaml:

    jobmanager.rpc.address: localhost
    jobmanager.rpc.port: 6123
    jobmanager.heap.size: 28000m
    taskmanager.memory.process.size: 28000m
    taskmanager.memory.jvm-metaspace.size: 512m
    taskmanager.numberOfTaskSlots: 1
    parallelism.default: 1
    jobmanager.execution.failover-strategy: full

    cluster.evenly-spread-out-slots: false

    taskmanager.memory.network.fraction: 0.2           # default 0.1
    taskmanager.memory.framework.off-heap.size: 2GB
    taskmanager.memory.task.off-heap.size: 2GB
    taskmanager.network.memory.buffers-per-channel: 32 # default 2
taskmanager.memory.managed.fraction: 0.4 # docs say default 0.1, but something seems to set 0.4
    taskmanager.memory.task.off-heap.size: 2048MB      # default 128M

    state.backend.fs.memory-threshold: 1048576
    state.backend.fs.write-buffer-size: 10240000
    state.backend.local-recovery: true
    state.backend.rocksdb.writebuffer.size: 64MB
    state.backend.rocksdb.writebuffer.count: 8
    state.backend.rocksdb.writebuffer.number-to-merge: 4
    state.backend.rocksdb.timer-service.factory: heap
    state.backend.rocksdb.block.cache-size: 64000000 # default 8MB
    state.backend.rocksdb.write-batch-size: 16000000 # default 2MB

    web.checkpoints.history: 250

Reply via email to