[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-17923:
----------------------------
    Description: 
For the following job:
{code}
import logging
import os
import shutil
import sys
import tempfile

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf


def word_count():
    content = "line Licensed to the Apache Software Foundation ASF under one " \
              "line or more contributor license agreements See the NOTICE file 
" \
              "line distributed with this work for additional information " \
              "line regarding copyright ownership The ASF licenses this file " \
              "to you under the Apache License Version the " \
              "License you may not use this file except in compliance " \
              "with the License"

    t_config = TableConfig()
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env, t_config)

    # register Results table in table environment
    tmp_dir = tempfile.gettempdir()
    result_path = tmp_dir + '/result'
    if os.path.exists(result_path):
        try:
            if os.path.isfile(result_path):
                os.remove(result_path)
            else:
                shutil.rmtree(result_path)
        except OSError as e:
            logging.error("Error removing directory: %s - %s.", e.filename, 
e.strerror)

    logging.info("Results directory: %s", result_path)

    sink_ddl = """
        create table Results(
            word VARCHAR,
            `count` BIGINT
        ) with (
            'connector' = 'blackhole'
        )
        """
    t_env.sql_update(sink_ddl)

    @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
    def inc(count):
        return count + 1
    t_env.register_function("inc", inc)

    elements = [(word, 1) for word in content.split(" ")]
    t_env.from_elements(elements, ["word", "count"]) \
         .group_by("word") \
         .select("word, count(1) as count") \
         .select("word, inc(count) as count") \
         .insert_into("Results")

    t_env.execute("word_count")


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")

    word_count()
{code}

It will throw the following exception if rocksdb state backend is used:
{code}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from 
any of the 1 provided restore options.
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
        ... 9 more
Caused by: java.io.IOException: Failed to acquire shared cache resource for 
RocksDB
        at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        ... 11 more
Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not 
created the shared memory resource of size 536870920. Not enough memory left to 
reserve from the slot's managed memory.
        at 
org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
        at 
org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
        at 
org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
        at 
org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
        at 
org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
        at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
        ... 15 more
Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
not allocate 536870920 bytes. Only 454033416 bytes are remaining.
        at 
org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
        at 
org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
        ... 20 more
{code}

  was:
For the following job:
{code}
import logging
import os
import shutil
import sys
import tempfile

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf


def word_count():
    content = "line Licensed to the Apache Software Foundation ASF under one " \
              "line or more contributor license agreements See the NOTICE file 
" \
              "line distributed with this work for additional information " \
              "line regarding copyright ownership The ASF licenses this file " \
              "to you under the Apache License Version the " \
              "License you may not use this file except in compliance " \
              "with the License"

    t_config = TableConfig()
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env, t_config)

    # register Results table in table environment
    tmp_dir = tempfile.gettempdir()
    result_path = tmp_dir + '/result'
    if os.path.exists(result_path):
        try:
            if os.path.isfile(result_path):
                os.remove(result_path)
            else:
                shutil.rmtree(result_path)
        except OSError as e:
            logging.error("Error removing directory: %s - %s.", e.filename, 
e.strerror)

    logging.info("Results directory: %s", result_path)

    sink_ddl = """
        create table Results(
            word VARCHAR,
            `count` BIGINT
        ) with (
            'connector' = 'blackhole'
        )
        """
    t_env.sql_update(sink_ddl)

    @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
    def inc(count):
        return count + 1
    t_env.register_function("inc", inc)

    elements = [(word, 1) for word in content.split(" ")]
    t_env.from_elements(elements, ["word", "count"]) \
         .group_by("word") \
         .select("word, count(1) as count") \
         .select("word, inc(count) as count") \
         .insert_into("Results")

    t_env.execute("word_count")


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")

    word_count()
{code}

It will throw the following exception:
{code}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from 
any of the 1 provided restore options.
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
        ... 9 more
Caused by: java.io.IOException: Failed to acquire shared cache resource for 
RocksDB
        at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        ... 11 more
Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not 
created the shared memory resource of size 536870920. Not enough memory left to 
reserve from the slot's managed memory.
        at 
org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
        at 
org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
        at 
org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
        at 
org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
        at 
org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
        at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
        ... 15 more
Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
not allocate 536870920 bytes. Only 454033416 bytes are remaining.
        at 
org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
        at 
org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
        ... 20 more
{code}


> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17923
>                 URL: https://issues.apache.org/jira/browse/FLINK-17923
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python, Runtime / State Backends
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Dian Fu
>            Priority: Major
>             Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
>     content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>               "line or more contributor license agreements See the NOTICE 
> file " \
>               "line distributed with this work for additional information " \
>               "line regarding copyright ownership The ASF licenses this file 
> " \
>               "to you under the Apache License Version the " \
>               "License you may not use this file except in compliance " \
>               "with the License"
>     t_config = TableConfig()
>     env = StreamExecutionEnvironment.get_execution_environment()
>     t_env = StreamTableEnvironment.create(env, t_config)
>     # register Results table in table environment
>     tmp_dir = tempfile.gettempdir()
>     result_path = tmp_dir + '/result'
>     if os.path.exists(result_path):
>         try:
>             if os.path.isfile(result_path):
>                 os.remove(result_path)
>             else:
>                 shutil.rmtree(result_path)
>         except OSError as e:
>             logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
>     logging.info("Results directory: %s", result_path)
>     sink_ddl = """
>         create table Results(
>             word VARCHAR,
>             `count` BIGINT
>         ) with (
>             'connector' = 'blackhole'
>         )
>         """
>     t_env.sql_update(sink_ddl)
>     @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
>     def inc(count):
>         return count + 1
>     t_env.register_function("inc", inc)
>     elements = [(word, 1) for word in content.split(" ")]
>     t_env.from_elements(elements, ["word", "count"]) \
>          .group_by("word") \
>          .select("word, count(1) as count") \
>          .select("word, inc(count) as count") \
>          .insert_into("Results")
>     t_env.execute("word_count")
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
>     word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) 
> from any of the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
>       ... 9 more
> Caused by: java.io.IOException: Failed to acquire shared cache resource for 
> RocksDB
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>       ... 11 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could 
> not created the shared memory resource of size 536870920. Not enough memory 
> left to reserve from the slot's managed memory.
>       at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
>       at 
> org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>       at 
> org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
>       ... 15 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
> not allocate 536870920 bytes. Only 454033416 bytes are remaining.
>       at 
> org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
>       ... 20 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to