[ https://issues.apache.org/jira/browse/BEAM-11971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Beam JIRA Bot reassigned BEAM-11971: ------------------------------------ Assignee: (was: Reuven Lax) > Direct Runner State is null while active timers exist > ----------------------------------------------------- > > Key: BEAM-11971 > URL: https://issues.apache.org/jira/browse/BEAM-11971 > Project: Beam > Issue Type: Bug > Components: runner-direct > Reporter: Reza ardeshir rokni > Priority: P3 > Labels: stale-assigned > Time Spent: 8h 50m > Remaining Estimate: 0h > > State is set to {{null}} while active timer is present, this issue does not > show in other runners. > The following example will reach the IllegalStateException within 10-20 times > of it being run. {{LOOP_COUNT}} does not seem to be a factor as it reproduces > with 100 or 100000 {{LOOP_COUNT}}. The number of keys is a factor as it did > not reproduce with only one key, have not tried with more than 3 keys to see > if it's easier to reproduce. > > {code} > package test; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.coders.BigEndianIntegerCoder; > import org.apache.beam.sdk.coders.KvCoder; > import org.apache.beam.sdk.state.StateSpec; > import org.apache.beam.sdk.state.StateSpecs; > import org.apache.beam.sdk.state.TimeDomain; > import org.apache.beam.sdk.state.Timer; > import org.apache.beam.sdk.state.TimerSpec; > import org.apache.beam.sdk.state.TimerSpecs; > import org.apache.beam.sdk.state.ValueState; > import org.apache.beam.sdk.testing.TestStream; > import org.apache.beam.sdk.transforms.DoFn; > import org.apache.beam.sdk.transforms.PTransform; > import org.apache.beam.sdk.transforms.ParDo; > import org.apache.beam.sdk.transforms.WithKeys; > import org.apache.beam.sdk.values.KV; > import org.apache.beam.sdk.values.PCollection; > import org.joda.time.Duration; > import org.joda.time.Instant; > import java.util.Optional; > > public class Test { > public static void main (String [] args) throws Exception{ > Test.testToFailure(); > } > public static void testToFailure() throws Exception { > int count = 0; > while (true) { > failingTest(); > System.out.println( > String.format("Got to Count %s", String.valueOf(count++))); > } > } > public static void failingTest() throws Exception { > Pipeline p = Pipeline.create(); > Instant now = Instant.now(); > TestStream<Integer> stream = > TestStream.create(BigEndianIntegerCoder.of()) > .addElements(1) > > .advanceWatermarkTo(now.plus(Duration.standardSeconds(1))) > .addElements(2) > > .advanceWatermarkTo(now.plus(Duration.standardSeconds(1))) > .addElements(3) > .advanceWatermarkToInfinity(); > p.apply(stream) > .apply(WithKeys.of(x -> x)) > .setCoder(KvCoder.of(BigEndianIntegerCoder.of(), > BigEndianIntegerCoder.of())) > .apply(new TestToFail()); > p.run(); > } > public static class TestToFail > extends PTransform<PCollection<KV<Integer, Integer>>, > PCollection<Integer>> { > @Override > public PCollection<Integer> expand(PCollection<KV<Integer, Integer>> > input) { > return input.apply(ParDo.of(new LoopingRead())); > } > } > public static class LoopingRead extends DoFn<KV<Integer, Integer>, > Integer> { > static int LOOP_COUNT = 100; > @StateId("value") > private final StateSpec<ValueState<Integer>> value = > StateSpecs.value(BigEndianIntegerCoder.of()); > @StateId("count") > private final StateSpec<ValueState<Integer>> count = > StateSpecs.value(BigEndianIntegerCoder.of()); > @TimerId("actionTimers") > private final TimerSpec timer = > TimerSpecs.timer(TimeDomain.EVENT_TIME); > @ProcessElement > public void processElement( > ProcessContext c, > @StateId("value") ValueState<Integer> value, > @TimerId("actionTimers") Timer timers) { > value.write(c.element().getValue()); > timers.set(c.timestamp().plus(Duration.millis(1000))); > } > /** */ > @OnTimer("actionTimers") > public void onTimer( > OnTimerContext c, > @StateId("value") ValueState<Integer> value, > @StateId("count") ValueState<Integer> count, > @TimerId("actionTimers") Timer timers) { > if (value.read() == null) { > throw new IllegalStateException("BINGO!"); > } > Integer counter = Optional.ofNullable(count.read()).orElse(0) + 1; > count.write(counter); > value.write(value.read() + counter); > if (counter < LOOP_COUNT) { > timers.set(c.timestamp().plus(Duration.standardSeconds(1))); > } > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)