Folks,

I wrote a custom Data source to test me CEP logic. The custom data source
looks like :

public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
    private boolean running = true;
    private final Random random;

    public CustomerDataSource() {
        this.random = new Random();
    }

    @Override
    public void run(SourceContext<CustomerMessage> ctx) throws Exception {
        while (running) {
            new CustomerDataGen().generateMessages().
                    forEach(element -> ctx.collect(element));

            Thread.sleep(10000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

public class CustomerDataGen {

    public CustomerDataGen() {
        this.random = new Random();
    }
    @Override
    public List<CustomerMessage> generateMessages() throws
InterruptedException {
        List<CustomerMessage> messages = new ArrayList<CustomerMessage>();

        messages.add(getMessage());
        return messages;
    }

    private CustomerMessage getMessage() {
        Instant time = Instant.now();
        Timestamp eventTimeStamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
        Timestamp creationTimeStamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()
-1).setNanos(0).build();
        return CustomerMessage.newBuilder().
                setName("SomeCustomer").
                setEventTimestamp(eventTimeStamp).
                setCustomerId("01234").
                addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
                setEmail("custo...@foo.com").
                build();
    }
}

In my Main program :

.........
env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class,
ProtobufSerializer.class);
env.addSource(new CustomerDataSource());

env.execute();


When I run the program, I get the following exception :


Caused by: java.lang.NullPointerException
        at 
shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
        at java.util.AbstractList.add(AbstractList.java:108)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
        at 
com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
        at java.util.ArrayList.forEach(ArrayList.java:1249)
        at 
com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)

I am having a tough time figuring out why. Can someone help me out as
to where am I going wrong?

Reply via email to