[
https://issues.apache.org/jira/browse/BEAM-3880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17546953#comment-17546953
]
Kenneth Knowles commented on BEAM-3880:
---------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/18764
> Streaming test hangs
> --------------------
>
> Key: BEAM-3880
> URL: https://issues.apache.org/jira/browse/BEAM-3880
> Project: Beam
> Issue Type: Bug
> Components: testing
> Affects Versions: 2.3.0
> Reporter: Lara
> Priority: P3
>
> More information on SO:
> [https://stackoverflow.com/questions/49266481/unit-test-hangs-forever-if-dofn-resets-event-timers]
> Test hangs when run despite fairly simple semantics. Modified a wordcount
> test with the code in the SO to generate a full example.
>
> {code:java}
> package com.example;
> import org.apache.beam.sdk.testing.TestStream;
> import java.util.Arrays;
> import java.util.List;
> import org.joda.time.Instant;
> import org.joda.time.Duration;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.values.TimestampedValue;
> import com.example.WordCount.CountWords;
> import com.example.WordCount.ExtractWordsFn;
> import com.example.WordCount.FormatAsTextFn;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.coders.StringUtf8Coder;
> import org.apache.beam.sdk.testing.PAssert;
> import org.apache.beam.sdk.testing.TestPipeline;
> import org.apache.beam.sdk.testing.ValidatesRunner;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.DoFnTester;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.PCollection;
> import org.hamcrest.CoreMatchers;
> import org.junit.Assert;
> import org.junit.Rule;
> import org.junit.Test;
> import org.junit.experimental.categories.Category;
> import org.junit.runner.RunWith;
> import org.junit.runners.JUnit4;
> /**
> * Tests of WordCount.
> */
> @RunWith(JUnit4.class)
> public class WordCountTest {
> static class KeyElements extends DoFn<String, KV<String, String>> {
> @ProcessElement
> public void processElement(ProcessContext context) {
> final String[] parts = context.element().split(":");
> if (parts.length == 2) {
> context.output(KV.of(parts[0], parts[1]));
> }
> }
> }
> static class TimerDoFn extends DoFn<KV<String, String>, KV<String, String>> {
> @TimerId("expiry")
> private final TimerSpec timerSpec =
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
> @ProcessElement
> public void processElement(ProcessContext context, @TimerId("expiry")
> Timer timer) {
> timer.set(context.timestamp().plus(Duration.standardHours(1)));
> final KV<String, String> e = context.element();
> context.output(KV.of(e.getKey(), e.getValue() + "_output"));
> }
> @OnTimer("expiry")
> public void onExpiry(OnTimerContext context) {
> // do nothing
> }
> }
> @Rule
> public TestPipeline p = TestPipeline.create();
> /** Example test that tests a PTransform by using an in-memory input and
> inspecting the output. */
> @Test
> @Category(ValidatesRunner.class)
> public void testCountWords() throws Exception {
> TestStream<String> stream = TestStream
> .create(StringUtf8Coder.of())
> .addElements(
> TimestampedValue.of("a:0", new Instant(0)),
> TimestampedValue.of("a:1", new Instant(1)),
> TimestampedValue.of("a:2", new Instant(2)),
> TimestampedValue.of("a:3", new Instant(3)))
> .advanceWatermarkToInfinity();
> PCollection<KV<String, String>> result = p
> .apply(stream)
> .apply(ParDo.of(new KeyElements()))
> .apply(ParDo.of(new TimerDoFn()));
> PAssert.that(result).containsInAnyOrder(
> KV.of("a", "0_output"),
> KV.of("a", "1_output"),
> KV.of("a", "2_output"),
> KV.of("a", "3_output"));
> p.run();
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)