Dian Fu created FLINK-22124:
-------------------------------

             Summary: The job finished without any exception if error was 
thrown during state access
                 Key: FLINK-22124
                 URL: https://issues.apache.org/jira/browse/FLINK-22124
             Project: Flink
          Issue Type: Sub-task
          Components: API / Python
    Affects Versions: 1.13.0
            Reporter: Dian Fu
             Fix For: 1.13.0


For the following job:

{code}
import logging

from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, 
NumberSequenceSource
from pyflink.datastream.execution_mode import RuntimeExecutionMode
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import MapStateDescriptor


env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)
env.set_runtime_mode(RuntimeExecutionMode.BATCH)

seq_num_source = NumberSequenceSource(1, 1000)

file_sink = FileSink \
    
.for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state',
                    Encoder.simple_string_encoder()) \
    
.with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build())
 \
    .build()

ds = env.from_source(
    source=seq_num_source,
    watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
    source_name='file_source',
    type_info=Types.LONG())


class MyKeyedProcessFunction(KeyedProcessFunction):

    def __init__(self):
        self.state = None

    def open(self, runtime_context: RuntimeContext):
        logging.info("open")
        state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG())
        self.state = runtime_context.get_map_state(state_desc)

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        existing = self.state.get(value[0])
        if existing is None:
            result = value[1]
            self.state.put(value[0], result)
        elif existing <= 10:
            result = value[1] + existing
            self.state.put(value[0], result)
        else:
            result = existing
        yield result


ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), 
Types.LONG()])) \
    .key_by(lambda a: a[0]) \
    .process(MyKeyedProcessFunction(), Types.LONG()) \
    .sink_to(file_sink)

env.execute('data_stream_batch_state')
{code}

As it will encounter KeyError for `self.state.get(value[0])`, the job finished 
without any error message. This issue should be addressed. We should make sure 
the error message appears in the log file to help users to figure out what 
happens.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to