[ 
https://issues.apache.org/jira/browse/BEAM-3880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17546953#comment-17546953
 ] 

Kenneth Knowles commented on BEAM-3880:
---------------------------------------

This issue has been migrated to https://github.com/apache/beam/issues/18764

> Streaming test hangs
> --------------------
>
>                 Key: BEAM-3880
>                 URL: https://issues.apache.org/jira/browse/BEAM-3880
>             Project: Beam
>          Issue Type: Bug
>          Components: testing
>    Affects Versions: 2.3.0
>            Reporter: Lara
>            Priority: P3
>
> More information on SO:
> [https://stackoverflow.com/questions/49266481/unit-test-hangs-forever-if-dofn-resets-event-timers]
> Test hangs when run despite fairly simple semantics. Modified a wordcount 
> test with the code in the SO to generate a full example.
>  
> {code:java}
> package com.example;
> import org.apache.beam.sdk.testing.TestStream;
> import java.util.Arrays;
> import java.util.List;
> import org.joda.time.Instant;
> import org.joda.time.Duration;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.values.TimestampedValue;
> import com.example.WordCount.CountWords;
> import com.example.WordCount.ExtractWordsFn;
> import com.example.WordCount.FormatAsTextFn;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.coders.StringUtf8Coder;
> import org.apache.beam.sdk.testing.PAssert;
> import org.apache.beam.sdk.testing.TestPipeline;
> import org.apache.beam.sdk.testing.ValidatesRunner;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.DoFnTester;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.PCollection;
> import org.hamcrest.CoreMatchers;
> import org.junit.Assert;
> import org.junit.Rule;
> import org.junit.Test;
> import org.junit.experimental.categories.Category;
> import org.junit.runner.RunWith;
> import org.junit.runners.JUnit4;
> /**
> * Tests of WordCount.
> */
> @RunWith(JUnit4.class)
> public class WordCountTest {
> static class KeyElements extends DoFn<String, KV<String, String>> {
>    @ProcessElement
>    public void processElement(ProcessContext context) {
>        final String[] parts = context.element().split(":");
>        if (parts.length == 2) {
>            context.output(KV.of(parts[0], parts[1]));
>        }
>    }
> }
> static class TimerDoFn extends DoFn<KV<String, String>, KV<String, String>> {
>    @TimerId("expiry")
>    private final TimerSpec timerSpec = 
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>    @ProcessElement
>    public void processElement(ProcessContext context, @TimerId("expiry") 
> Timer timer) {
>        timer.set(context.timestamp().plus(Duration.standardHours(1)));
>        final KV<String, String> e = context.element();
>        context.output(KV.of(e.getKey(), e.getValue() + "_output"));
>    }
>    @OnTimer("expiry")
>    public void onExpiry(OnTimerContext context) {
>        // do nothing
>    }
> }
>  @Rule
>  public TestPipeline p = TestPipeline.create();
>  /** Example test that tests a PTransform by using an in-memory input and 
> inspecting the output. */
>  @Test
>  @Category(ValidatesRunner.class)
>  public void testCountWords() throws Exception {
> TestStream<String> stream = TestStream
>            .create(StringUtf8Coder.of())
>            .addElements(
>                    TimestampedValue.of("a:0", new Instant(0)),
>                    TimestampedValue.of("a:1", new Instant(1)),
>                    TimestampedValue.of("a:2", new Instant(2)),
>                    TimestampedValue.of("a:3", new Instant(3)))
>            .advanceWatermarkToInfinity();
>    PCollection<KV<String, String>> result = p
>            .apply(stream)
>            .apply(ParDo.of(new KeyElements()))
>            .apply(ParDo.of(new TimerDoFn()));
>    PAssert.that(result).containsInAnyOrder(
>            KV.of("a", "0_output"),
>            KV.of("a", "1_output"),
>            KV.of("a", "2_output"),
>            KV.of("a", "3_output"));
>    p.run();  
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to