[
https://issues.apache.org/jira/browse/BEAM-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sam Rohde closed BEAM-9803.
---------------------------
Fix Version/s: Not applicable
Resolution: Duplicate
> test_streaming_wordcount flaky
> ------------------------------
>
> Key: BEAM-9803
> URL: https://issues.apache.org/jira/browse/BEAM-9803
> Project: Beam
> Issue Type: Test
> Components: sdk-py-core, test-failures
> Reporter: Ning Kang
> Assignee: Sam Rohde
> Priority: Major
> Fix For: Not applicable
>
>
> {code:java}
> Regressionapache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_streaming_wordcount
> (from py37-cython)Failing for the past 1 build (Since #12462 )Took 7.7
> sec.Error MessageAssertionError: DataFrame are different DataFrame shape
> mismatch [left]: (10, 4) [right]: (6, 4)Stacktraceself =
> <apache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest
> testMethod=test_streaming_wordcount>
> @unittest.skipIf(
> sys.version_info < (3, 5, 3),
> 'The tests require at least Python 3.6 to work.')
> def test_streaming_wordcount(self):
> class WordExtractingDoFn(beam.DoFn):
> def process(self, element):
> text_line = element.strip()
> words = text_line.split()
> return words
>
> # Add the TestStream so that it can be cached.
> ib.options.capturable_sources.add(TestStream)
> ib.options.capture_duration = timedelta(seconds=5)
>
> p = beam.Pipeline(
> runner=interactive_runner.InteractiveRunner(),
> options=StandardOptions(streaming=True))
>
> data = (
> p
> | TestStream()
> .advance_watermark_to(0)
> .advance_processing_time(1)
> .add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
> .advance_watermark_to(20)
> .advance_processing_time(1)
> .add_elements(['that', 'is', 'the', 'question'])
> | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable
>
> counts = (
> data
> | 'split' >> beam.ParDo(WordExtractingDoFn())
> | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
> | 'group' >> beam.GroupByKey()
> | 'count' >> beam.Map(lambda wordones: (wordones[0],
> sum(wordones[1]))))
>
> # Watch the local scope for Interactive Beam so that referenced
> PCollections
> # will be cached.
> ib.watch(locals())
>
> # This is normally done in the interactive_utils when a transform is
> # applied but needs an IPython environment. So we manually run this
> here.
> ie.current_env().track_user_pipelines()
>
> # Create a fake limiter that cancels the BCJ once the main job receives
> the
> # expected amount of results.
> class FakeLimiter:
> def __init__(self, p, pcoll):
> self.p = p
> self.pcoll = pcoll
>
> def is_triggered(self):
> result = ie.current_env().pipeline_result(self.p)
> if result:
> try:
> results = result.get(self.pcoll)
> except ValueError:
> return False
> return len(results) >= 10
> return False
>
> # This sets the limiters to stop reading when the test receives 10
> elements
> # or after 5 seconds have elapsed (to eliminate the possibility of
> hanging).
> ie.current_env().options.capture_control.set_limiters_for_test(
> [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
>
> # This tests that the data was correctly cached.
> pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
> expected_data_df = pd.DataFrame([
> ('to', 0, [IntervalWindow(0, 10)], pane_info),
> ('be', 0, [IntervalWindow(0, 10)], pane_info),
> ('or', 0, [IntervalWindow(0, 10)], pane_info),
> ('not', 0, [IntervalWindow(0, 10)], pane_info),
> ('to', 0, [IntervalWindow(0, 10)], pane_info),
> ('be', 0, [IntervalWindow(0, 10)], pane_info),
> ('that', 20000000, [IntervalWindow(20, 30)], pane_info),
> ('is', 20000000, [IntervalWindow(20, 30)], pane_info),
> ('the', 20000000, [IntervalWindow(20, 30)], pane_info),
> ('question', 20000000, [IntervalWindow(20, 30)], pane_info)
> ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable
>
> data_df = ib.collect(data, include_window_info=True)
> > pd.testing.assert_frame_equal(expected_data_df, data_df)
> E AssertionError: DataFrame are different
> E
> E DataFrame shape mismatch
> E [left]: (10, 4)
> E [right]: (6, 4)
> apache_beam/runners/interactive/interactive_runner_test.py:238: AssertionError
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)