Hi, I think your understanding is correct. The results seem a little wired. I'm looking into this and will let you know when there are any findings.
Best, Xingbo 赵飞 <emanonc...@gmail.com> 于2021年7月12日周一 下午4:48写道: > Hi all, > I'm using pyflink to develop a module, whose main functionality is > processing user data based on specific rules. The program involves two > datastreams: data and rule. They have different types, so I connect them > and use a field 'product_id' as the key for key_by method. The code is as > follows (just demo codes, not the actual one): > > import random > > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.functions import KeyedCoProcessFunction > from pyflink.datastream.state import MapStateDescriptor > from pyflink.datastream import RuntimeContext > > > def test(data): > product_ids = set() > for key, value in data.items(): > product_ids.add(value[0]) > return list(product_ids) > > > class MyFunction(KeyedCoProcessFunction): > def open(self, ctx): > data_desc = MapStateDescriptor('data', Types.STRING(), > Types.ROW([Types.INT()])) > self.data = ctx.get_map_state(data_desc) > > rule_desc = MapStateDescriptor('rule', Types.STRING(), > Types.ROW([Types.INT()])) > self.rules = ctx.get_map_state(rule_desc) > > def process_element1(self, data_value, ctx): > row_id, others = data_value[0], data_value[1:] > self.data.put(row_id, others) > result = [] > for key, value_list in self.rules.items(): > product_id, random_0, random_1 = value_list > # Do some calculations > product_ids_of_state_data = test(self.data) > result.append([random_0, random_1, product_id, > product_ids_of_state_data]) > return result > > def process_element2(self, rule_value, ctx): > row_id, others = rule_value[0], rule_value[1:] > self.rules.put(row_id, others) > > def generate_data1(count): > collection = [] > for i in range(count): > collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2]) > return collection > > def generate_data2(count): > collection = [] > for i in range(count): > collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % i, i > * 2]) > return collection > > > def main(): > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > > data = env.from_collection(generate_data1(50)) > rules = env.from_collection([ > ['row_0', 1, 'rule1_value0', 'rule1_value1'], > ['row_1', 2, 'rule2_value0', 'rule2_value1'] > ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), > Types.STRING()])) > results = data.connect(rules).key_by(lambda x: x[1], lambda y: > y[1]).process(MyFunction()) > results.print() > > env.execute("test_job") > > if __name__ == "__main__": > main() > > > When processing the first datastream, which contains user data, I will > access the registered MapState and get the unique product_id in it. > According to the description on the official site: > > Keyed state is maintained in what can be thought of as an embedded >> key/value store. The state is partitioned and distributed strictly together >> with the streams that are read by the stateful operators. Hence, access to >> the key/value state is only possible on *keyed streams*, i.e. after a >> keyed/partitioned data exchange, and *is restricted to the values >> associated with the current event’s key* > > > I assume that each time after accessing the MapState, only one product_id > value should be got. But I get the following outputs after running the > script: > > ['rule1_value0', 'rule1_value1', 1, [1]] >> ['rule1_value0', 'rule1_value1', 1, [1]] >> ['rule1_value0', 'rule1_value1', 1, [1]] >> ['rule2_value0', 'rule2_value1', 2, [1, 2]] >> ['rule1_value0', 'rule1_value1', 1, [1, 2]] >> ['rule1_value0', 'rule1_value1', 1, [1, 2]] >> ['rule2_value0', 'rule2_value1', 2, [1, 2]] >> > > It shows that each user data does be processed based on the according > rules, but there has other product's data (the last element of each row) in > the MapState. And there are other strange points: > 1. I have two functions to generate data. When using data generated by > generate_data2, more likely the above problem occurs, compared to > generate_data1. > 2. When the data size is more than 50, more likely the above problem > occurs. > 3. Same codes and same data, different outputs would be got after running > multiple times. Sometimes the output is as expected, sometimes are not. > > Does anybody know why? Or my understanding of keyed-state is wrong? > > Thanks. > >