Hi Flink community, I have a custom source that emits an user-defined data type, BaseEvent. The following code works fine when BaseEvent is not POJO. But, when I changed it to POJO by adding a default constructor, I'm getting "Buffer Pool is destroyed" runtime exception on the Collect method.
DataStream<BaseEvent> eventStream = see.addSource(new AgoraSource(configFile, instance)); DataStream<Tuple4<String, Long, Double, String>> result_order = eventStream .filter(e -> e instanceof OrderEvent) .map(e -> (OrderEvent)e) .map(e -> new Tuple3<>(e.SecurityID, Long.valueOf(1), Double.valueOf(e.OriginalQuantity))).returns(info_tuple3) .keyBy(e -> e.f0) .timeWindow(Time.seconds(5)) .reduce((a, b) -> new Tuple3<>(a.f0, a.f1 + b.f1, a.f2 + b.f2)) .map(e -> new Tuple4<>(e.f0, e.f1, e.f2, "Order")).returns(info_tuple4); Any idea? Shuang =============================================================================== Please access the attached hyperlink for an important electronic communications disclaimer: http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html ===============================================================================