Hi Jaswin,
    I think the event time timer and process time timer in Flink should be 
fully decoupled: the event time timer is trigger by the watermark received, and 
the processing time is trigger by physical clock, and you may think them as two 
seperated timelines and have no guarantee on their relative speed. Therefore, I 
think the result of computing the deadline with event time and register it as 
processing time should be nondetermined, and it depends on the gap between 
event time and processing time.

Best,
 Yun



 ------------------原始邮件 ------------------
发件人:Jaswin Shah <jaswin.s...@outlook.com>
发送时间:Sat May 23 22:08:57 2020
收件人:user@flink.apache.org <user@flink.apache.org>, Arvid Heise 
<ar...@ververica.com>, Yun Tang <myas...@live.com>
主题:Re: onTimer method in CoProcessFunction in flink

Hi Yun,
Actually this problem is solved now. I have been stuck in other problem of 
timeoutcallbacks. Here, I am receiving the callbacks too early and the eventime 
registrations was somehow failing, might be it was needing some special 
handling. I need to know if this callback registration is wrong or is there 
something wrong.
Do we need some special handling for event time semantecs usages?
Thanks,
Jaswin

From: Jaswin Shah <jaswin.s...@outlook.com>
Sent: 22 May 2020 20:30
To: user@flink.apache.org <user@flink.apache.org>; Arvid Heise 
<ar...@ververica.com>; Yun Tang <myas...@live.com>
Subject: onTimer method in CoProcessFunction in flink
How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then 
streamout the sideoutput data to kafka topic. I am not understanding how to 
stream out the sideoutput data like where should I write that processing logic. 
Below is the code snippet I have done so far


/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, 
ResultMessage> {

    private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartMessage> cartState = null;

    /**
     * Map state for pg messages, orderId+mid is key and pgMessage is value.
     */
    private static MapState<String, PaymentNotifyRequestWrapper> pgState = null;


    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, CartMessage> cartStateDescriptor = new 
MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartMessage.class)
        );
        cartState = getRuntimeContext().getMapState(cartStateDescriptor);

        MapStateDescriptor<String, PaymentNotifyRequestWrapper> 
pgStateDescriptor = new MapStateDescriptor<>(
            Constants.PG_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(PaymentNotifyRequestWrapper.class)
        );
        pgState = getRuntimeContext().getMapState(pgStateDescriptor);
    }

    /**
     *
     * @return
     */

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<ResultMessage> out) throws Exception {
        
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
     * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, 
Collector<ResultMessage> collector) throws Exception {
        
context.timerService().registerEventTimeTimer(context.timestamp()+3600000);

        String searchKey = cartMessage.createJoinStringCondition();
        PaymentNotifyRequestWrapper paymentNotifyObject = 
pgState.get(searchKey);
        if(Objects.nonNull(paymentNotifyObject)) {
            generateResultMessage(cartMessage,paymentNotifyObject,collector);
            pgState.remove(searchKey);
        } else {
            cartState.put(searchKey,cartMessage);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
     * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector<ResultMessage> collector) throws Exception {
        
context.timerService().registerEventTimeTimer(context.timestamp()+3600000);
        String searchKey = pgMessage.createJoinStringCondition();
        CartMessage cartMessage = cartState.get(searchKey);
        if(Objects.nonNull(cartMessage)) {
            generateResultMessage(cartMessage,pgMessage,collector);
            cartState.remove(searchKey);
        } else {
            pgState.put(searchKey,pgMessage);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, 
PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, 
pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        
resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        
resultMessage.setPgOrderAmount(Double.valueOf(pgMessage.getOrderAmount().getValue()));
        resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

        logger.info("cart amount {} pg amount {} 
",resultMessage.getCartOrderAmount(),resultMessage.getPgOrderAmount());
        resultMessage.setCartPaymethod(payment.getPayment_method());
        
resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from 
two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, 
Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), 
resultMessage.getPgOrderStatus())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if 
(!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), 
resultMessage.getPgPaymethod())) {
            
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}


Help will be highly appreciated.

Reply via email to