Thanks. In addition, I run the program in a local mini cluster mode, not
sure if it would affect the results.

Xingbo Huang <hxbks...@gmail.com> 于2021年7月12日周一 下午9:02写道:

> Hi,
>
> I think your understanding is correct. The results seem a little wired.
> I'm looking into this and will let you know when there are any findings.
>
> Best,
> Xingbo
>
> 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午4:48写道:
>
>> Hi all,
>> I'm using pyflink to develop a module, whose main functionality is
>> processing user data based on specific rules. The program involves two
>> datastreams: data and rule. They have different types, so I connect them
>> and use a field 'product_id' as the key for key_by method. The code is as
>> follows (just demo codes, not the actual one):
>>
>> import random
>>
>> from pyflink.common.typeinfo import Types
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.datastream.functions import KeyedCoProcessFunction
>> from pyflink.datastream.state import MapStateDescriptor
>> from pyflink.datastream import RuntimeContext
>>
>>
>> def test(data):
>>     product_ids = set()
>>     for key, value in data.items():
>>         product_ids.add(value[0])
>>     return list(product_ids)
>>
>>
>> class MyFunction(KeyedCoProcessFunction):
>>     def open(self, ctx):
>>         data_desc = MapStateDescriptor('data', Types.STRING(), 
>> Types.ROW([Types.INT()]))
>>         self.data = ctx.get_map_state(data_desc)
>>
>>         rule_desc = MapStateDescriptor('rule', Types.STRING(), 
>> Types.ROW([Types.INT()]))
>>         self.rules = ctx.get_map_state(rule_desc)
>>
>>     def process_element1(self, data_value, ctx):
>>         row_id, others = data_value[0], data_value[1:]
>>         self.data.put(row_id, others)
>>         result = []
>>         for key, value_list in self.rules.items():
>>             product_id, random_0, random_1  = value_list
>>             # Do some calculations
>>             product_ids_of_state_data = test(self.data)
>>             result.append([random_0, random_1, product_id, 
>> product_ids_of_state_data])
>>         return result
>>
>>     def process_element2(self, rule_value, ctx):
>>         row_id, others = rule_value[0], rule_value[1:]
>>         self.rules.put(row_id, others)
>>
>> def generate_data1(count):
>>     collection = []
>>     for i in range(count):
>>         collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2])
>>     return collection
>>
>> def generate_data2(count):
>>     collection = []
>>     for i in range(count):
>>         collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % i, 
>> i * 2])
>>     return collection
>>
>>
>> def main():
>>     env = StreamExecutionEnvironment.get_execution_environment()
>>     env.set_parallelism(1)
>>
>>     data = env.from_collection(generate_data1(50))
>>     rules = env.from_collection([
>>         ['row_0', 1, 'rule1_value0', 'rule1_value1'],
>>         ['row_1', 2, 'rule2_value0', 'rule2_value1']
>>     ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), 
>> Types.STRING()]))
>>     results = data.connect(rules).key_by(lambda x: x[1], lambda y: 
>> y[1]).process(MyFunction())
>>     results.print()
>>
>>     env.execute("test_job")
>>
>> if __name__ == "__main__":
>>     main()
>>
>>
>> When processing the first datastream, which contains user data, I will
>> access the registered MapState and get the unique product_id in it.
>> According to the description on the official site:
>>
>> Keyed state is maintained in what can be thought of as an embedded
>>> key/value store. The state is partitioned and distributed strictly together
>>> with the streams that are read by the stateful operators. Hence, access to
>>> the key/value state is only possible on *keyed streams*, i.e. after a
>>> keyed/partitioned data exchange, and *is restricted to the values
>>> associated with the current event’s key*
>>
>>
>> I assume that each time after accessing the MapState, only one product_id
>> value should be got. But I get the following outputs after running the
>> script:
>>
>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>> ['rule1_value0', 'rule1_value1', 1, [1]]
>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>>>
>>
>> It shows that each user data does be processed based on the according
>> rules, but there has other product's data (the last element of each row) in
>> the MapState. And there are other strange points:
>> 1. I have two functions to generate data. When using data generated by
>> generate_data2, more likely the above problem occurs, compared to
>> generate_data1.
>> 2. When the data size is more than 50, more likely the above problem
>> occurs.
>> 3. Same codes and same data, different outputs would be got after running
>> multiple times. Sometimes the output is as expected, sometimes are not.
>>
>> Does anybody know why? Or my understanding of keyed-state is wrong?
>>
>> Thanks.
>>
>>

Reply via email to