Hi Jaswin,

     If I understand right, I think you could add the logic in the onTimer 
callback. In this callback, OnTimerContext.output(xx, outputTag) could be used 
to output data to the specific sideout. Besides, you should need a new state to 
store the elements to output in the onTimer callback. A similar example might 
be [1].

Best,
 Yun
    ​ 

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example




 ------------------原始邮件 ------------------
发件人:Jaswin Shah <jaswin.s...@outlook.com>
发送时间:Fri May 22 23:00:43 2020
收件人:user@flink.apache.org <user@flink.apache.org>, Arvid Heise 
<ar...@ververica.com>, Yun Tang <myas...@live.com>
主题:onTimer method in CoProcessFunction in flink

How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then 
streamout the sideoutput data to kafka topic. I am not understanding how to 
stream out the sideoutput data like where should I write that processing logic. 
Below is the code snippet I have done so far


/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, 
ResultMessage> {

    private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartMessage> cartState = null;

    /**
     * Map state for pg messages, orderId+mid is key and pgMessage is value.
     */
    private static MapState<String, PaymentNotifyRequestWrapper> pgState = null;


    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, CartMessage> cartStateDescriptor = new 
MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartMessage.class)
        );
        cartState = getRuntimeContext().getMapState(cartStateDescriptor);

        MapStateDescriptor<String, PaymentNotifyRequestWrapper> 
pgStateDescriptor = new MapStateDescriptor<>(
            Constants.PG_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(PaymentNotifyRequestWrapper.class)
        );
        pgState = getRuntimeContext().getMapState(pgStateDescriptor);
    }

    /**
     *
     * @return
     */

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<ResultMessage> out) throws Exception {
        
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
     * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, 
Collector<ResultMessage> collector) throws Exception {
        
context.timerService().registerEventTimeTimer(context.timestamp()+3600000);

        String searchKey = cartMessage.createJoinStringCondition();
        PaymentNotifyRequestWrapper paymentNotifyObject = 
pgState.get(searchKey);
        if(Objects.nonNull(paymentNotifyObject)) {
            generateResultMessage(cartMessage,paymentNotifyObject,collector);
            pgState.remove(searchKey);
        } else {
            cartState.put(searchKey,cartMessage);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
     * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector<ResultMessage> collector) throws Exception {
        
context.timerService().registerEventTimeTimer(context.timestamp()+3600000);
        String searchKey = pgMessage.createJoinStringCondition();
        CartMessage cartMessage = cartState.get(searchKey);
        if(Objects.nonNull(cartMessage)) {
            generateResultMessage(cartMessage,pgMessage,collector);
            cartState.remove(searchKey);
        } else {
            pgState.put(searchKey,pgMessage);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, 
PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, 
pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        
resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        
resultMessage.setPgOrderAmount(Double.valueOf(pgMessage.getOrderAmount().getValue()));
        resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

        logger.info("cart amount {} pg amount {} 
",resultMessage.getCartOrderAmount(),resultMessage.getPgOrderAmount());
        resultMessage.setCartPaymethod(payment.getPayment_method());
        
resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from 
two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, 
Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), 
resultMessage.getPgOrderStatus())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if 
(!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), 
resultMessage.getPgPaymethod())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}


Help will be highly appreciated.

Reply via email to