Hi Flink community,

I have a custom source that emits an user-defined data type, BaseEvent.  The 
following code works fine when BaseEvent is not POJO.
But, when I changed it to POJO by adding a default constructor, I'm getting 
"Buffer Pool is destroyed" runtime exception on the Collect method.

            DataStream<BaseEvent> eventStream = see.addSource(new 
AgoraSource(configFile, instance));

            DataStream<Tuple4<String, Long, Double, String>> result_order = 
eventStream
                    .filter(e -> e instanceof OrderEvent)
                    .map(e -> (OrderEvent)e)
                    .map(e -> new Tuple3<>(e.SecurityID, Long.valueOf(1), 
Double.valueOf(e.OriginalQuantity))).returns(info_tuple3)
                    .keyBy(e -> e.f0)
                    .timeWindow(Time.seconds(5))
                    .reduce((a, b) -> new Tuple3<>(a.f0, a.f1 + b.f1, a.f2 + 
b.f2))
                    .map(e -> new Tuple4<>(e.f0, e.f1, e.f2, 
"Order")).returns(info_tuple4);

Any idea?

Shuang

=============================================================================== 
Please access the attached hyperlink for an important electronic communications 
disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=============================================================================== 

Reply via email to