I figured out my issue. I needed to assign watermarks (e.g. assignTimestampsAndWatermarks) after the fromElements. I could not figure out how the auto-generated code worked. I hooked up a debugger and guessed at the issue.
On Thu, Oct 8, 2020 at 11:09 PM Dan Hill <quietgol...@gmail.com> wrote: > *Summary* > I'm hitting an error when running a test that is related to using > createTemporaryView to convert a Protobuf input stream to Flink Table API. > I'm not sure how to debug "SourceConversion$5.processElement(Unknown > Source)" line. Is this generated code? How can I debug this? > > Any help would be appreciated. Thanks! - Dan > > *Details* > My current input is a protocol buffer stream. I convert it to the Table > API spec using createTemporaryView. The code is hacky. I want to get some > tests implemented before cleaning it up. > > KeyedStream<BatchLog, String> batchLogStream = > env.<BatchLog>fromElements(BatchLog.class, new > LogGenerator.BatchLogIterator().next()) > .keyBy((logRequest) -> logRequest.getUserId()); > > tableEnv.createTemporaryView( > "input_user", > batchLogStream.flatMap(new ToUsers()), > $("userId"), > $("timeEpochMillis"), > $("userTime").rowtime()); > > This appears to work in my prototype (maybe serialization is broken). In > a Flink test, I hit the following error. > > org.apache.flink.runtime.taskmanager.Task: Flat Map -> Map -> > SourceConversion(table=[default.mydb.input_user], fields=[userId, > timeEpochMillis, userTime]) -> Calc(select=[userId, timeEpochMillis]) -> > StreamingFileWriter (2/7) (ae67114dd4175c6fd87063f73706c8ec) switched from > RUNNING to FAILED. java.lang.NullPointerException > at SourceConversion$5.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) > at > ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:18) > at > ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:11) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.base/java.lang.Thread.run(Thread.java:834) > > > I wasn't able to find this exact stacktrace when looking on Google. >