Hi Jaswin, If I understand right, I think you could add the logic in the onTimer callback. In this callback, OnTimerContext.output(xx, outputTag) could be used to output data to the specific sideout. Besides, you should need a new state to store the elements to output in the onTimer callback. A similar example might be [1].
Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example ------------------原始邮件 ------------------ 发件人:Jaswin Shah <jaswin.s...@outlook.com> 发送时间:Fri May 22 23:00:43 2020 收件人:user@flink.apache.org <user@flink.apache.org>, Arvid Heise <ar...@ververica.com>, Yun Tang <myas...@live.com> 主题:onTimer method in CoProcessFunction in flink How can I identify the type of element for which onTime is called in flink? I want to store the objects for which onTimer is called to sideOutputs and then streamout the sideoutput data to kafka topic. I am not understanding how to stream out the sideoutput data like where should I write that processing logic. Below is the code snippet I have done so far /** * CoProcessFuntion to process cart and pg messages connected using connect operator. * @author jaswin.shah * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$ */ public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> { private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class); /** * Map state for cart messages, orderId+mid is key and cartMessage is value. */ private static MapState<String, CartMessage> cartState = null; /** * Map state for pg messages, orderId+mid is key and pgMessage is value. */ private static MapState<String, PaymentNotifyRequestWrapper> pgState = null; /** * Intializations for cart and pg mapStates * * @param config */ @Override public void open(Configuration config) { MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> ( Constants.CART_DATA, TypeInformation.of(String.class), TypeInformation.of(CartMessage.class) ); cartState = getRuntimeContext().getMapState(cartStateDescriptor); MapStateDescriptor<String, PaymentNotifyRequestWrapper> pgStateDescriptor = new MapStateDescriptor<>( Constants.PG_DATA, TypeInformation.of(String.class), TypeInformation.of(PaymentNotifyRequestWrapper.class) ); pgState = getRuntimeContext().getMapState(pgStateDescriptor); } /** * * @return */ @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception { } /** * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present. * 2. If present, match, checkDescripancy, process and delete entry from pgMapState. * 3. If not present, add orderId+mid as key and cart object as value in cartMapState. * @param cartMessage * @param context * @param collector * @throws Exception */ @Override public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception { context.timerService().registerEventTimeTimer(context.timestamp()+3600000); String searchKey = cartMessage.createJoinStringCondition(); PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey); if(Objects.nonNull(paymentNotifyObject)) { generateResultMessage(cartMessage,paymentNotifyObject,collector); pgState.remove(searchKey); } else { cartState.put(searchKey,cartMessage); } } /** * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present. * 2. If present, match, checkDescripancy, process and delete entry from cartMapState. * 3. If not present, add orderId+mid as key and cart object as value in pgMapState. * @param pgMessage * @param context * @param collector * @throws Exception */ @Override public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception { context.timerService().registerEventTimeTimer(context.timestamp()+3600000); String searchKey = pgMessage.createJoinStringCondition(); CartMessage cartMessage = cartState.get(searchKey); if(Objects.nonNull(cartMessage)) { generateResultMessage(cartMessage,pgMessage,collector); cartState.remove(searchKey); } else { pgState.put(searchKey,pgMessage); } } /** * Create ResultMessage from cart and pg messages. * * @param cartMessage * @param pgMessage * @return */ private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) { ResultMessage resultMessage = new ResultMessage(); Payment payment = null; //Logic should be in cart: check for (Payment pay : cartMessage.getPayments()) { if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) { payment = pay; break; } } if(Objects.isNull(payment)) { return; } resultMessage.setOrderId(cartMessage.getId()); resultMessage.setMid(payment.getMid()); resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode()); resultMessage.setPgOrderStatus(pgMessage.getOrderStatus()); resultMessage.setCartOrderCompletionTime(payment.getUpdated_at()); resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime()); resultMessage.setPgOrderAmount(Double.valueOf(pgMessage.getOrderAmount().getValue())); resultMessage.setCartOrderAmount(cartMessage.getGrandtotal()); logger.info("cart amount {} pg amount {} ",resultMessage.getCartOrderAmount(),resultMessage.getPgOrderAmount()); resultMessage.setCartPaymethod(payment.getPayment_method()); resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod()); checkDescripancyAndCollectResult(resultMessage,collector); } /** * Evaluate if there is descripancy of any fields between the messages from two different systems. * Write all the descripancy logic here. * * @param resultMessage */ private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) { if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) { resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY); collector.collect(resultMessage.clone()); } if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) { resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY); collector.collect(resultMessage.clone()); } if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) { resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY); collector.collect(resultMessage.clone()); } } } Help will be highly appreciated.