[ 
https://issues.apache.org/jira/browse/FLINK-9262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Schneider updated FLINK-9262:
-----------------------------------
    Description: 
Although KeyedOneInputStreamOperatorTestHarness and other 
AbstractStreamOperatorTestHarness subclasses are not yet part of the public 
Flink API, we have been trying to make use of them for unit testing our map 
functions. The following code throws NPE from the attempt to collect a snapshot 
on Flink 1.4.0 (even after applying [the 
fix|https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80]
 for FLINK-8268), but appears to work properly on Flink 1.5-SNAPSHOT:
{code:java}
package com.scaleunlimited.flinkcrawler.functions;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Test;

public class FlinkIssueTest {
    
    @SuppressWarnings("serial")
    private static class MyProcessFunction extends RichFlatMapFunction<String, 
String> {

        @Override
        public void flatMap(String input, Collector<String> collector) throws 
Exception {
            collector.collect(input);
        }
    }
    
    @SuppressWarnings({
            "serial", "hiding"
    })
    private static class MyKeySelector<String> implements KeySelector<String, 
String> {

        @Override
        public String getKey(String input) throws Exception {
            return input;
        }
    }

    @Test
    public void test() throws Throwable {
        KeyedOneInputStreamOperatorTestHarness<String, String, String> 
testHarness =
            new KeyedOneInputStreamOperatorTestHarness<String, String, String>(
                new StreamFlatMap<>(new MyProcessFunction()),
                new MyKeySelector<String>(),
                BasicTypeInfo.STRING_TYPE_INFO,
                1,
                1,
                0);
        testHarness.setup();
        testHarness.open();
        
        for (int i = 0; i < 10; i++) {
            String urlString = String.format("https://domain-%d.com/page1";, i);
            testHarness.processElement(new StreamRecord<>(urlString));
        }
        testHarness.snapshot(0L, 0L);
    }
}

{code}
Output:
{noformat}
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
    at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
    at 
com.scaleunlimited.flinkcrawler.functions.FlinkIssueTest.test(FlinkIssueTest.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
    at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
    ... 25 more

{noformat}
 

  was:
Although KeyedOneInputStreamOperatorTestHarness and other 
AbstractStreamOperatorTestHarness subclasses are not yet part of the public 
Flink API, we have been trying to make use of them for unit testing our map 
functions. The following code throws NPE from the attempt to collect a snapshot 
on Flink 1.4.0 (even after applying [the 
fix|https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80]
 for [FLINK-8268|https://issues.apache.org/jira/browse/FLINK-8268]), but 
appears to work properly on Flink 1.5-SNAPSHOT:

 

 
{code:java}
package com.scaleunlimited.flinkcrawler.functions;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Test;

public class FlinkIssueTest {
    
    @SuppressWarnings("serial")
    private static class MyProcessFunction extends RichFlatMapFunction<String, 
String> {

        @Override
        public void flatMap(String input, Collector<String> collector) throws 
Exception {
            collector.collect(input);
        }
    }
    
    @SuppressWarnings({
            "serial", "hiding"
    })
    private static class MyKeySelector<String> implements KeySelector<String, 
String> {

        @Override
        public String getKey(String input) throws Exception {
            return input;
        }
    }

    @Test
    public void test() throws Throwable {
        KeyedOneInputStreamOperatorTestHarness<String, String, String> 
testHarness =
            new KeyedOneInputStreamOperatorTestHarness<String, String, String>(
                new StreamFlatMap<>(new MyProcessFunction()),
                new MyKeySelector<String>(),
                BasicTypeInfo.STRING_TYPE_INFO,
                1,
                1,
                0);
        testHarness.setup();
        testHarness.open();
        
        for (int i = 0; i < 10; i++) {
            String urlString = String.format("https://domain-%d.com/page1";, i);
            testHarness.processElement(new StreamRecord<>(urlString));
        }
        testHarness.snapshot(0L, 0L);
    }
}

{code}
Output:

 

 
{noformat}
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
    at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
    at 
com.scaleunlimited.flinkcrawler.functions.FlinkIssueTest.test(FlinkIssueTest.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
    at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
    ... 25 more

{noformat}
 


> KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot
> -------------------------------------------------------------------
>
>                 Key: FLINK-9262
>                 URL: https://issues.apache.org/jira/browse/FLINK-9262
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming, Tests
>    Affects Versions: 1.4.0
>         Environment: macOS X High Sierra 10.13.4
> (ancient) Eclipse Luna v.4.4.1 
> JRE System Library [Java SE 8 [1.8.0_131]]
> Java 8 Update 171 build 11
>            Reporter: Chris Schneider
>            Priority: Major
>
> Although KeyedOneInputStreamOperatorTestHarness and other 
> AbstractStreamOperatorTestHarness subclasses are not yet part of the public 
> Flink API, we have been trying to make use of them for unit testing our map 
> functions. The following code throws NPE from the attempt to collect a 
> snapshot on Flink 1.4.0 (even after applying [the 
> fix|https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80]
>  for FLINK-8268), but appears to work properly on Flink 1.5-SNAPSHOT:
> {code:java}
> package com.scaleunlimited.flinkcrawler.functions;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.streaming.api.operators.StreamFlatMap;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
> import org.apache.flink.util.Collector;
> import org.junit.Test;
> public class FlinkIssueTest {
>     
>     @SuppressWarnings("serial")
>     private static class MyProcessFunction extends 
> RichFlatMapFunction<String, String> {
>         @Override
>         public void flatMap(String input, Collector<String> collector) throws 
> Exception {
>             collector.collect(input);
>         }
>     }
>     
>     @SuppressWarnings({
>             "serial", "hiding"
>     })
>     private static class MyKeySelector<String> implements KeySelector<String, 
> String> {
>         @Override
>         public String getKey(String input) throws Exception {
>             return input;
>         }
>     }
>     @Test
>     public void test() throws Throwable {
>         KeyedOneInputStreamOperatorTestHarness<String, String, String> 
> testHarness =
>             new KeyedOneInputStreamOperatorTestHarness<String, String, 
> String>(
>                 new StreamFlatMap<>(new MyProcessFunction()),
>                 new MyKeySelector<String>(),
>                 BasicTypeInfo.STRING_TYPE_INFO,
>                 1,
>                 1,
>                 0);
>         testHarness.setup();
>         testHarness.open();
>         
>         for (int i = 0; i < 10; i++) {
>             String urlString = String.format("https://domain-%d.com/page1";, 
> i);
>             testHarness.processElement(new StreamRecord<>(urlString));
>         }
>         testHarness.snapshot(0L, 0L);
>     }
> }
> {code}
> Output:
> {noformat}
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>     at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>     at 
> com.scaleunlimited.flinkcrawler.functions.FlinkIssueTest.test(FlinkIssueTest.java:51)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at 
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>     at 
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>     at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>     at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
>     at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>     at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: java.lang.NullPointerException
>     at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
>     at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>     ... 25 more
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to