[ https://issues.apache.org/jira/browse/FLINK-20046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dian Fu closed FLINK-20046. --------------------------- Resolution: Fixed Fixed in master(1.12.0) via b9b9ff3e88b1de42895229099ec149473de1a055 > StreamTableAggregateTests.test_map_view_iterate is instable > ----------------------------------------------------------- > > Key: FLINK-20046 > URL: https://issues.apache.org/jira/browse/FLINK-20046 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.12.0 > Reporter: Dian Fu > Assignee: Wei Zhong > Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279&view=logs&j=821b528f-1eed-5598-a3b4-7f748b13f261&t=4fad9527-b9a5-5015-1b70-8356e5c91490 > {code} > 2020-11-07T22:50:57.4180758Z _______________ > StreamTableAggregateTests.test_map_view_iterate ________________ > 2020-11-07T22:50:57.4181301Z > 2020-11-07T22:50:57.4181965Z self = > <pyflink.table.tests.test_aggregate.StreamTableAggregateTests > testMethod=test_map_view_iterate> > 2020-11-07T22:50:57.4182348Z > 2020-11-07T22:50:57.4182535Z def test_map_view_iterate(self): > 2020-11-07T22:50:57.4182812Z test_iterate = > udaf(TestIterateAggregateFunction()) > 2020-11-07T22:50:57.4183320Z > self.t_env.get_config().set_idle_state_retention(datetime.timedelta(days=1)) > 2020-11-07T22:50:57.4183763Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4297555Z "python.fn-execution.bundle.size", > "2") > 2020-11-07T22:50:57.4297922Z # trigger the cache eviction in a bundle. > 2020-11-07T22:50:57.4308028Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4308653Z "python.state.cache-size", "2") > 2020-11-07T22:50:57.4308945Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4309382Z "python.map-state.read-cache-size", > "2") > 2020-11-07T22:50:57.4309676Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4310428Z "python.map-state.write-cache-size", > "2") > 2020-11-07T22:50:57.4310701Z > self.t_env.get_config().get_configuration().set_string( > 2020-11-07T22:50:57.4311130Z > "python.map-state.iterate-response-batch-size", "2") > 2020-11-07T22:50:57.4311361Z t = self.t_env.from_elements( > 2020-11-07T22:50:57.4311691Z [(1, 'Hi_', 'hi'), > 2020-11-07T22:50:57.4312004Z (1, 'Hi', 'hi'), > 2020-11-07T22:50:57.4312316Z (2, 'hello', 'hello'), > 2020-11-07T22:50:57.4312639Z (3, 'Hi_', 'hi'), > 2020-11-07T22:50:57.4312975Z (3, 'Hi', 'hi'), > 2020-11-07T22:50:57.4313285Z (4, 'hello', 'hello'), > 2020-11-07T22:50:57.4313609Z (5, 'Hi2_', 'hi'), > 2020-11-07T22:50:57.4313908Z (5, 'Hi2', 'hi'), > 2020-11-07T22:50:57.4314238Z (6, 'hello2', 'hello'), > 2020-11-07T22:50:57.4314558Z (7, 'Hi', 'hi'), > 2020-11-07T22:50:57.4315053Z (8, 'hello', 'hello'), > 2020-11-07T22:50:57.4315396Z (9, 'Hi2', 'hi'), > 2020-11-07T22:50:57.4315773Z (13, 'Hi3', 'hi')], ['a', 'b', 'c']) > 2020-11-07T22:50:57.4316023Z > self.t_env.create_temporary_view("source", t) > 2020-11-07T22:50:57.4316299Z table_with_retract_message = > self.t_env.sql_query( > 2020-11-07T22:50:57.4316615Z "select LAST_VALUE(b) as b, > LAST_VALUE(c) as c from source group by a") > 2020-11-07T22:50:57.4316919Z result = > table_with_retract_message.group_by(t.c) \ > 2020-11-07T22:50:57.4317197Z > .select(test_iterate(t.b).alias("a"), t.c) \ > 2020-11-07T22:50:57.4317619Z .select(col("a").get(0).alias("a"), > 2020-11-07T22:50:57.4318111Z col("a").get(1).alias("b"), > 2020-11-07T22:50:57.4318357Z col("a").get(2).alias("c"), > 2020-11-07T22:50:57.4318586Z col("a").get(3).alias("d"), > 2020-11-07T22:50:57.4318814Z t.c.alias("e")) > 2020-11-07T22:50:57.4319023Z assert_frame_equal( > 2020-11-07T22:50:57.4319208Z > result.to_pandas(), > 2020-11-07T22:50:57.4319408Z pd.DataFrame([ > 2020-11-07T22:50:57.4319872Z ["hello,hello2", "1,3", > 'hello:3,hello2:1', 2, "hello"], > 2020-11-07T22:50:57.4320398Z ["Hi,Hi2,Hi3", "1,2,3", > "Hi:3,Hi2:2,Hi3:1", 3, "hi"]], > 2020-11-07T22:50:57.4321047Z columns=['a', 'b', 'c', 'd', > 'e'])) > 2020-11-07T22:50:57.4321198Z > 2020-11-07T22:50:57.4321385Z pyflink/table/tests/test_aggregate.py:468: > 2020-11-07T22:50:57.4321648Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-11-07T22:50:57.4322040Z pyflink/table/table.py:807: in to_pandas > 2020-11-07T22:50:57.4322299Z .collectAsPandasDataFrame(self._j_table, > max_arrow_batch_size) > 2020-11-07T22:50:57.4322794Z > .tox/py35-cython/lib/python3.5/site-packages/py4j/java_gateway.py:1286: in > __call__ > 2020-11-07T22:50:57.4323103Z answer, self.gateway_client, self.target_id, > self.name) > 2020-11-07T22:50:57.4323351Z pyflink/util/exceptions.py:147: in deco > 2020-11-07T22:50:57.4323537Z return f(*a, **kw) > 2020-11-07T22:50:57.4323783Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2020-11-07T22:50:57.4323963Z > 2020-11-07T22:50:57.4324225Z answer = 'xro8653' > 2020-11-07T22:50:57.4324496Z gateway_client = > <py4j.java_gateway.GatewayClient object at 0x7fe5c619db70> > 2020-11-07T22:50:57.4324943Z target_id = > 'z:org.apache.flink.table.runtime.arrow.ArrowUtils' > 2020-11-07T22:50:57.4325312Z name = 'collectAsPandasDataFrame' > 2020-11-07T22:50:57.4325439Z > 2020-11-07T22:50:57.4325839Z def get_return_value(answer, gateway_client, > target_id=None, name=None): > 2020-11-07T22:50:57.4326420Z """Converts an answer received from the > Java gateway into a Python object. > 2020-11-07T22:50:57.4326648Z > 2020-11-07T22:50:57.4326881Z For example, string representation of > integers are converted to Python > 2020-11-07T22:50:57.4327193Z integer, string representation of > objects are converted to JavaObject > 2020-11-07T22:50:57.4327451Z instances, etc. > 2020-11-07T22:50:57.4327614Z > 2020-11-07T22:50:57.4327819Z :param answer: the string returned by > the Java gateway > 2020-11-07T22:50:57.4328157Z :param gateway_client: the gateway > client used to communicate with the Java > 2020-11-07T22:50:57.4329738Z Gateway. Only necessary if the > answer is a reference (e.g., object, > 2020-11-07T22:50:57.4330018Z list, map) > 2020-11-07T22:50:57.4330273Z :param target_id: the name of the object > from which the answer comes from > 2020-11-07T22:50:57.4330588Z (e.g., *object1* in > `object1.hello()`). Optional. > 2020-11-07T22:50:57.4330873Z :param name: the name of the member from > which the answer comes from > 2020-11-07T22:50:57.4331170Z (e.g., *hello* in > `object1.hello()`). Optional. > 2020-11-07T22:50:57.4331375Z """ > 2020-11-07T22:50:57.4331542Z if is_error(answer)[0]: > 2020-11-07T22:50:57.4331761Z if len(answer) > 1: > 2020-11-07T22:50:57.4331954Z type = answer[1] > 2020-11-07T22:50:57.4332222Z value = > OUTPUT_CONVERTER[type](answer[2:], gateway_client) > 2020-11-07T22:50:57.4332531Z if answer[1] == REFERENCE_TYPE: > 2020-11-07T22:50:57.4332757Z raise Py4JJavaError( > 2020-11-07T22:50:57.4333016Z "An error occurred while > calling {0}{1}{2}.\n". > 2020-11-07T22:50:57.4333303Z > format(target_id, ".", > name), value) > 2020-11-07T22:50:57.4333700Z E py4j.protocol.Py4JJavaError: > An error occurred while calling > z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame. > 2020-11-07T22:50:57.4334558Z E : > java.lang.RuntimeException: Could not remove element ',,,1,hi', should never > happen. > 2020-11-07T22:50:57.4335019Z E at > org.apache.flink.table.runtime.arrow.ArrowUtils.filterOutRetractRows(ArrowUtils.java:708) > 2020-11-07T22:50:57.4335479Z E at > org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:635) > 2020-11-07T22:50:57.4336238Z E at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2020-11-07T22:50:57.4336645Z E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2020-11-07T22:50:57.4337099Z E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2020-11-07T22:50:57.4337485Z E at > java.lang.reflect.Method.invoke(Method.java:498) > 2020-11-07T22:50:57.4337911Z E at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > 2020-11-07T22:50:57.4338410Z E at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > 2020-11-07T22:50:57.4338859Z E at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > 2020-11-07T22:50:57.4339324Z E at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > 2020-11-07T22:50:57.4339810Z E at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > 2020-11-07T22:50:57.4340260Z E at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > 2020-11-07T22:50:57.4340651Z E at > java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)