Hi,

I think your understanding is correct. The results seem a little wired. I'm
looking into this and will let you know when there are any findings.

Best,
Xingbo

赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午4:48写道:

> Hi all,
> I'm using pyflink to develop a module, whose main functionality is
> processing user data based on specific rules. The program involves two
> datastreams: data and rule. They have different types, so I connect them
> and use a field 'product_id' as the key for key_by method. The code is as
> follows (just demo codes, not the actual one):
>
> import random
>
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import KeyedCoProcessFunction
> from pyflink.datastream.state import MapStateDescriptor
> from pyflink.datastream import RuntimeContext
>
>
> def test(data):
>     product_ids = set()
>     for key, value in data.items():
>         product_ids.add(value[0])
>     return list(product_ids)
>
>
> class MyFunction(KeyedCoProcessFunction):
>     def open(self, ctx):
>         data_desc = MapStateDescriptor('data', Types.STRING(), 
> Types.ROW([Types.INT()]))
>         self.data = ctx.get_map_state(data_desc)
>
>         rule_desc = MapStateDescriptor('rule', Types.STRING(), 
> Types.ROW([Types.INT()]))
>         self.rules = ctx.get_map_state(rule_desc)
>
>     def process_element1(self, data_value, ctx):
>         row_id, others = data_value[0], data_value[1:]
>         self.data.put(row_id, others)
>         result = []
>         for key, value_list in self.rules.items():
>             product_id, random_0, random_1  = value_list
>             # Do some calculations
>             product_ids_of_state_data = test(self.data)
>             result.append([random_0, random_1, product_id, 
> product_ids_of_state_data])
>         return result
>
>     def process_element2(self, rule_value, ctx):
>         row_id, others = rule_value[0], rule_value[1:]
>         self.rules.put(row_id, others)
>
> def generate_data1(count):
>     collection = []
>     for i in range(count):
>         collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2])
>     return collection
>
> def generate_data2(count):
>     collection = []
>     for i in range(count):
>         collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % i, i 
> * 2])
>     return collection
>
>
> def main():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>
>     data = env.from_collection(generate_data1(50))
>     rules = env.from_collection([
>         ['row_0', 1, 'rule1_value0', 'rule1_value1'],
>         ['row_1', 2, 'rule2_value0', 'rule2_value1']
>     ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), 
> Types.STRING()]))
>     results = data.connect(rules).key_by(lambda x: x[1], lambda y: 
> y[1]).process(MyFunction())
>     results.print()
>
>     env.execute("test_job")
>
> if __name__ == "__main__":
>     main()
>
>
> When processing the first datastream, which contains user data, I will
> access the registered MapState and get the unique product_id in it.
> According to the description on the official site:
>
> Keyed state is maintained in what can be thought of as an embedded
>> key/value store. The state is partitioned and distributed strictly together
>> with the streams that are read by the stateful operators. Hence, access to
>> the key/value state is only possible on *keyed streams*, i.e. after a
>> keyed/partitioned data exchange, and *is restricted to the values
>> associated with the current event’s key*
>
>
> I assume that each time after accessing the MapState, only one product_id
> value should be got. But I get the following outputs after running the
> script:
>
> ['rule1_value0', 'rule1_value1', 1, [1]]
>> ['rule1_value0', 'rule1_value1', 1, [1]]
>> ['rule1_value0', 'rule1_value1', 1, [1]]
>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>>
>
> It shows that each user data does be processed based on the according
> rules, but there has other product's data (the last element of each row) in
> the MapState. And there are other strange points:
> 1. I have two functions to generate data. When using data generated by
> generate_data2, more likely the above problem occurs, compared to
> generate_data1.
> 2. When the data size is more than 50, more likely the above problem
> occurs.
> 3. Same codes and same data, different outputs would be got after running
> multiple times. Sometimes the output is as expected, sometimes are not.
>
> Does anybody know why? Or my understanding of keyed-state is wrong?
>
> Thanks.
>
>

Reply via email to