Hi all,
I'm using pyflink to develop a module, whose main functionality is
processing user data based on specific rules. The program involves two
datastreams: data and rule. They have different types, so I connect them
and use a field 'product_id' as the key for key_by method. The code is as
follows (just demo codes, not the actual one):

import random

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedCoProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream import RuntimeContext


def test(data):
    product_ids = set()
    for key, value in data.items():
        product_ids.add(value[0])
    return list(product_ids)


class MyFunction(KeyedCoProcessFunction):
    def open(self, ctx):
        data_desc = MapStateDescriptor('data', Types.STRING(),
Types.ROW([Types.INT()]))
        self.data = ctx.get_map_state(data_desc)

        rule_desc = MapStateDescriptor('rule', Types.STRING(),
Types.ROW([Types.INT()]))
        self.rules = ctx.get_map_state(rule_desc)

    def process_element1(self, data_value, ctx):
        row_id, others = data_value[0], data_value[1:]
        self.data.put(row_id, others)
        result = []
        for key, value_list in self.rules.items():
            product_id, random_0, random_1  = value_list
            # Do some calculations
            product_ids_of_state_data = test(self.data)
            result.append([random_0, random_1, product_id,
product_ids_of_state_data])
        return result

    def process_element2(self, rule_value, ctx):
        row_id, others = rule_value[0], rule_value[1:]
        self.rules.put(row_id, others)

def generate_data1(count):
    collection = []
    for i in range(count):
        collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2])
    return collection

def generate_data2(count):
    collection = []
    for i in range(count):
        collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d'
% i, i * 2])
    return collection


def main():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    data = env.from_collection(generate_data1(50))
    rules = env.from_collection([
        ['row_0', 1, 'rule1_value0', 'rule1_value1'],
        ['row_1', 2, 'rule2_value0', 'rule2_value1']
    ], type_info=Types.ROW([Types.STRING(), Types.INT(),
Types.STRING(), Types.STRING()]))
    results = data.connect(rules).key_by(lambda x: x[1], lambda y:
y[1]).process(MyFunction())
    results.print()

    env.execute("test_job")

if __name__ == "__main__":
    main()


When processing the first datastream, which contains user data, I will
access the registered MapState and get the unique product_id in it.
According to the description on the official site:

Keyed state is maintained in what can be thought of as an embedded
> key/value store. The state is partitioned and distributed strictly together
> with the streams that are read by the stateful operators. Hence, access to
> the key/value state is only possible on *keyed streams*, i.e. after a
> keyed/partitioned data exchange, and *is restricted to the values
> associated with the current event’s key*


I assume that each time after accessing the MapState, only one product_id
value should be got. But I get the following outputs after running the
script:

['rule1_value0', 'rule1_value1', 1, [1]]
> ['rule1_value0', 'rule1_value1', 1, [1]]
> ['rule1_value0', 'rule1_value1', 1, [1]]
> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>

It shows that each user data does be processed based on the according
rules, but there has other product's data (the last element of each row) in
the MapState. And there are other strange points:
1. I have two functions to generate data. When using data generated by
generate_data2, more likely the above problem occurs, compared to
generate_data1.
2. When the data size is more than 50, more likely the above problem occurs.
3. Same codes and same data, different outputs would be got after running
multiple times. Sometimes the output is as expected, sometimes are not.

Does anybody know why? Or my understanding of keyed-state is wrong?

Thanks.

Reply via email to