[ https://issues.apache.org/jira/browse/BEAM-13234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ryan Thompson updated BEAM-13234: --------------------------------- Fix Version/s: Not applicable Resolution: Duplicate Status: Resolved (was: Open) This is a duplicate of BEAM-12673 > Flake in StreamingWordCountIT.test_streaming_wordcount_it > --------------------------------------------------------- > > Key: BEAM-13234 > URL: https://issues.apache.org/jira/browse/BEAM-13234 > Project: Beam > Issue Type: Bug > Components: test-failures > Reporter: Valentyn Tymofieiev > Assignee: Ryan Thompson > Priority: P1 > Labels: flake > Fix For: Not applicable > > > https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/20516/consoleFull > {noformat} > 22:50:10 =================================== FAILURES > =================================== > 22:50:10 _______________ StreamingWordCountIT.test_streaming_wordcount_it > _______________ > 22:50:10 [gw0] linux -- Python 3.7.10 > /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/build/gradleenv/-1734967052/bin/python3.7 > 22:50:10 > 22:50:10 self = > <apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT > testMethod=test_streaming_wordcount_it> > 22:50:10 > 22:50:10 @pytest.mark.it_postcommit > 22:50:10 def test_streaming_wordcount_it(self): > 22:50:10 # Build expected dataset. > 22:50:10 expected_msg = [('%d: 1' % num).encode('utf-8') > 22:50:10 for num in range(DEFAULT_INPUT_NUMBERS)] > 22:50:10 > 22:50:10 # Set extra options to the pipeline for test purpose > 22:50:10 state_verifier = PipelineStateMatcher(PipelineState.RUNNING) > 22:50:10 pubsub_msg_verifier = PubSubMessageMatcher( > 22:50:10 self.project, self.output_sub.name, expected_msg, > timeout=400) > 22:50:10 extra_opts = { > 22:50:10 'input_subscription': self.input_sub.name, > 22:50:10 'output_topic': self.output_topic.name, > 22:50:10 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION, > 22:50:10 'on_success_matcher': all_of(state_verifier, > pubsub_msg_verifier) > 22:50:10 } > 22:50:10 > 22:50:10 # Generate input data and inject to PubSub. > 22:50:10 self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS) > 22:50:10 > 22:50:10 # Get pipeline options from command argument: > --test-pipeline-options, > 22:50:10 # and start pipeline job by calling pipeline main function. > 22:50:10 streaming_wordcount.run( > 22:50:10 self.test_pipeline.get_full_options_as_args(**extra_opts), > 22:50:10 > save_main_session=False) > 22:50:10 > 22:50:10 apache_beam/examples/streaming_wordcount_it_test.py:104: > 22:50:10 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ > 22:50:10 apache_beam/examples/streaming_wordcount.py:103: in run > 22:50:10 output | beam.io.WriteToPubSub(known_args.output_topic) > 22:50:10 apache_beam/pipeline.py:596: in __exit__ > 22:50:10 self.result = self.run() > 22:50:10 apache_beam/pipeline.py:573: in run > 22:50:10 return self.runner.run_pipeline(self, self._options) > 22:50:10 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ > 22:50:10 > 22:50:10 self = > <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object > at 0x7f1ac01efc90> > 22:50:10 pipeline = <apache_beam.pipeline.Pipeline object at 0x7f1afd515190> > 22:50:10 options = <apache_beam.options.pipeline_options.PipelineOptions > object at 0x7f1ac0298490> > 22:50:10 > 22:50:10 def run_pipeline(self, pipeline, options): > 22:50:10 """Execute test pipeline and verify test matcher""" > 22:50:10 test_options = options.view_as(TestOptions) > 22:50:10 on_success_matcher = test_options.on_success_matcher > 22:50:10 wait_duration = test_options.wait_until_finish_duration > 22:50:10 is_streaming = options.view_as(StandardOptions).streaming > 22:50:10 > 22:50:10 # [BEAM-1889] Do not send this to remote workers also, there > is no need to > 22:50:10 # send this option to remote executors. > 22:50:10 test_options.on_success_matcher = None > 22:50:10 > 22:50:10 self.result = super().run_pipeline(pipeline, options) > 22:50:10 if self.result.has_job: > 22:50:10 # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't > show logs > 22:50:10 # in some cases. > 22:50:10 print('Worker logs: %s' % self.build_console_url(options)) > 22:50:10 > 22:50:10 try: > 22:50:10 self.wait_until_in_state(PipelineState.RUNNING) > 22:50:10 > 22:50:10 if is_streaming and not wait_duration: > 22:50:10 _LOGGER.warning('Waiting indefinitely for streaming job.') > 22:50:10 self.result.wait_until_finish(duration=wait_duration) > 22:50:10 > 22:50:10 if on_success_matcher: > 22:50:10 from hamcrest import assert_that as hc_assert_that > 22:50:10 > hc_assert_that(self.result, > pickler.loads(on_success_matcher)) > 22:50:10 E AssertionError: > 22:50:10 E Expected: (Test pipeline expected terminated in state: > RUNNING and Expected 500 messages.) > 22:50:10 E but: Expected 500 messages. Got 528 messages. Diffs > (item, count): > 22:50:10 E Expected but not in actual: dict_items([(b'18: 1', 1), > (b'23: 1', 1), (b'152: 1', 1), (b'162: 1', 1), (b'168: 1', 1), (b'184: 1', > 1), (b'206: 1', 1), (b'208: 1', 1), (b'215: 1', 1), (b'247: 1', 1), (b'255: > 1', 1), (b'265: 1', 1), (b'276: 1', 1), (b'278: 1', 1), (b'294: 1', 1), > (b'350: 1', 1), (b'356: 1', 1), (b'395: 1', 1), (b'428: 1', 1), (b'450: 1', > 1), (b'474: 1', 1)]) > 22:50:10 E Unexpected: dict_items([(b'384: 1', 1), (b'237: 1', 1), > (b'166: 1', 1), (b'262: 1', 1), (b'5: 1', 1), (b'13: 1', 1), (b'437: 1', 1), > (b'263: 1', 1), (b'423: 1', 1), (b'317: 1', 1), (b'447: 1', 1), (b'125: 1', > 1), (b'270: 1', 1), (b'116: 1', 1), (b'102: 1', 1), (b'326: 1', 1), (b'21: > 1', 1), (b'244: 1', 1), (b'400: 1', 1), (b'117: 1', 1), (b'393: 1', 1), > (b'225: 1', 1), (b'187: 1', 1), (b'210: 1', 1), (b'258: 1', 1), (b'226: 1', > 1), (b'127: 1', 1), (b'84: 1', 1), (b'182: 1', 1), (b'373: 1', 1), (b'104: > 1', 1), (b'382: 1', 1), (b'295: 1', 1), (b'325: 1', 1), (b'113: 1', 1), > (b'470: 1', 1), (b'14: 1', 1), (b'353: 1', 1), (b'333: 1', 1), (b'413: 1', > 1), (b'445: 1', 1), (b'115: 1', 1), (b'109: 1', 1), (b'386: 1', 1), (b'274: > 1', 1), (b'303: 1', 1), (b'77: 1', 1), (b'455: 1', 1), (b'223: 1', 1)]) > 22:50:10 > 22:50:10 apache_beam/runners/dataflow/test_dataflow_runner.py:68: > AssertionError > 22:50:10 ------------------------------ Captured log call > ------------------------------- > {noformat} -- This message was sent by Atlassian Jira (v8.20.7#820007)