Thanks for reporting the issue Chris! Would you mind opening a JIRA issue [1] to track the bug for Flink 1.4?
Thank you, Fabian [1] https://issues.apache.org/jira/browse/FLINK 2018-04-25 21:11 GMT+02:00 Chris Schneider <cschnei...@scaleunlimited.com>: > Hi Gang, > > FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also > tried cherry-picking the commit that fixed FLINK-8268 > <https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80> > to > Flink 1.4.0, but that resulted in the same failure mode. > > I guess the takeaway is that this streaming test code harness support > (which everyone should note is not yet part of the public Flink API) was > apparently fragile in 1.4.0. > > FYI, > > - Chris > > > On Apr 18, 2018, at 8:07 PM, Chris Schneider < > cschnei...@scaleunlimited.com> wrote: > > Hi Ted, > > I should have written that we’re using Flink 1.4.0. > > Thanks for the suggestion re: FLINK-8268 > <https://issues.apache.org/jira/browse/FLINK-8268>; it could well be the > issue (though the pull request > <https://github.com/apache/flink/pull/5193/files> appears fairly complex > so I’ll need some time to study it). > > Best Regards, > > - Chris > > On Apr 18, 2018, at 6:33 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > Which release are you using ? > > See if the work around from FLINK-8268 helps. > > Cheers > > On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider < > cschnei...@scaleunlimited.com> wrote: > >> Hi Gang, >> >> I’m having trouble getting my streaming unit test to work. The following >> code: >> >> @Test >> public void testDemo() throws Throwable { >> OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> >> testHarness = >> new KeyedOneInputStreamOperatorTestHarness<String, >> CrawlStateUrl, CrawlStateUrl>( >> new StreamFlatMap<>(new DomainDBFunction()), >> new PldKeySelector<CrawlStateUrl>(), >> BasicTypeInfo.STRING_TYPE_INFO, >> 1, >> 1, >> 0); >> testHarness.setup(); >> testHarness.open(); >> >> for (int i = 0; i < 10; i++) { >> String urlString = String.format("https://domain-%d.com/page1 >> ", i); >> CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString)); >> testHarness.processElement(new StreamRecord<>(url)); >> } >> testHarness.snapshot(0L, 0L); >> } >> >> >> Generates the following exception: >> >> DomainDBFunctionTest.testDemo >> testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest) >> java.lang.Exception: Could not complete snapshot 0 for operator MockTask >> (1/1). >> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor.snapshotState(AbstractStreamOperator.java:379) >> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHa >> rness.snapshot(AbstractStreamOperatorTestHarness.java:459) >> at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTe >> st.testDemo(DomainDBFunctionTest.java:51) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall( >> FrameworkMethod.java:50) >> at org.junit.internal.runners.model.ReflectiveCallable.run(Refl >> ectiveCallable.java:12) >> at org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr >> ameworkMethod.java:47) >> at org.junit.internal.runners.statements.InvokeMethod.evaluate( >> InvokeMethod.java:17) >> at org.junit.internal.runners.statements.RunBefores.evaluate( >> RunBefores.java:26) >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit >> 4ClassRunner.java:78) >> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit >> 4ClassRunner.java:57) >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >> at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference. >> run(JUnit4TestReference.java:50) >> at org.eclipse.jdt.internal.junit.runner.TestExecution.run( >> TestExecution.java:38) >> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTe >> sts(RemoteTestRunner.java:459) >> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTe >> sts(RemoteTestRunner.java:675) >> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run( >> RemoteTestRunner.java:382) >> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main( >> RemoteTestRunner.java:192) >> Caused by: java.lang.NullPointerException >> at org.apache.flink.util.Preconditions.checkNotNull(Preconditio >> ns.java:58) >> at org.apache.flink.streaming.util.functions.StreamingFunctionU >> tils.snapshotFunctionState(StreamingFunctionUtils.java:95) >> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp >> erator.snapshotState(AbstractUdfStreamOperator.java:90) >> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor.snapshotState(AbstractStreamOperator.java:357) >> ... 26 more >> >> I tried explicitly calling testHarness.setStateBackend(new >> MemoryStateBackend()), but that didn’t seem to help. I could provide >> more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, >> RawUrl, etc.), but that doesn’t seem like it would have much to do with the >> problem. >> >> Any advice would be most welcome. >> >> Thanks, >> >> - Chris >> >> ----------------------------------------- >> Chris Schneider >> http://www.scaleunlimited.com >> custom big data solutions >> ----------------------------------------- >> >> > > ----------------------------------------- > Chris Schneider > http://www.scaleunlimited.com > custom big data solutions > ----------------------------------------- > > > ----------------------------------------- > Chris Schneider > http://www.scaleunlimited.com > custom big data solutions > ----------------------------------------- > >