You are right that a ValueState can keep a single value at any point of
time. It is scoped to the current key of the operator though. So it
keeps a single value for a key.

If your
CartJoinColumnsSelector()/new PGJoinColumnsSelector() are basically the
same thing a ValueState should be enough. It will always be scoped to
the result of new CartJoinColumnsSelector()/new PGJoinColumnsSelector().
I assumed it is the same because you are always using the ctx.getCurrent
in the onTimer method.

See this example [1]. There even though a ValueState is used, we
calculate counts per key.




> If I understand correctly, you are trying to tell that I should have
> valueState of Map?
> Thanks for responding Dawid.
> I would like to know more about MapState solution you talked about. As
> per my understanding valueState maintains a single value at any point
> of time. So, here what I want to maintain is the first streams
> information until matching event have not found in second stream. So,
> in that case how valueState could benefit me? Can you please explain
> me that, might be I have understood it incorrectly what you are trying
> to convey here.
> Hi Jaswin,
> I can't see any obvious problems in your code. It looks rather
> correct. What exactly do you mean that "callback is coming earlier
> than registered callback timeout"? Could you explain that with some
> examples?
> As for the different timezones. Flink does not make any assumptions on
> the timestamp. It uses it simply as longs. I'd suggest revisiting your
> timestamp extraction logic to make sure it performs the extraction
> correctly. I don't know how your data encodes the timestamps, but I
> think you have a bug or two there ;)
> The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this
> field has timestamps in UTC, but you are parsing it in your JVM local
> time zone. You treat the 'Z' as a literal
> (
> I don't know what the other field represents but you are also parsing
> it in a local timezone. If the field represents local date it is
> probably correct.
> To mitigate those problems I'd strongly recommend using the java.time
> API. For the extractCartTimeStamp you could use
> Instant.parse("...").toEpochMilli. It expects the format you are
> receiving. For the extractPGTimeStamp you could use
> LocalDateTime.parse("..."), by default it uses the format you are
> receiving. Then you should convert the local date time to an instant
> LocalDateTime.parse("...").atZone(/* the zone which this date
> represents */).toInstant().toEpochMilli(); This has nothing to do with
> Flink though ;)
> BTW one Flink issue I can see is that I think you don't need to use a
> MapState there. Any kind of state in a KeyedCoProcessFunction is
> always scoped to the current key. Therefore if you only ever put items
> under the currentKey you will have at most single element in your map.
> Think of the MapState as a map of maps MapState<UserKey, Value> =
> Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should
> be enough imo.
> Best,
> Dawid
>> Here, I am registering the callback time for an even with processing
>> time and calculating the time value as events time + expiryTimeout value.
>> Can this be the issue here due to hybrid timings usage?
>> Also, do we need any special handling if we use event time semantics
>> for callback timeouts registrations?
>> Thanks,
>> Jaswin
>> Hi,
>> I am running flink job with following functionality:
>>  1. I consume stream1 and stream2 from two kafka topics and assign
>>     the watermarks to the events of two streams by extracting the
>>     timestamps from the events in streams.
>>  2. Then, I am connecting two streams and calling
>>     KeyedCoProcessFunction on connectedStream.
>>  3. I have processElement1 method and processElement2 methods which
>>     receive the events of two streams 1 and 2 and do the join logic
>>     as shown in below code snippet.
>>  4. I have shared mapstate for two streams.
>>  5. When an event comes to processElement method, I register the
>>     callback time for that message to ensure if corresponding
>>     matching message is not arrived from other stream, I will send
>>     the message to sideOutput on invocation of callback method i.e.
>>     onTimer.
>> Something is getting wrong in the callback times registrations for
>> events due to which for many messages of stream2 the callback is
>> coming earlier than registered callback timeout.
>> Also, the events from stream 2 are based on GMT times +5:30 as I can
>> see in the timevalue in event message, for stream1 it;s normal TZ
>> only. Though I am weak in analysing the timeout formats so could be
>> wrong in analysis this side.
>> Below is code snippets I have implemented for KeyedCoProcessFunctions
>> and timestamp calculations and watermarks registrations.
>> /** * CoProcessFuntion to process cart and pg messages connected
>> using connect operator. * @author jaswin.shah * @version $Id:
>>, v 0.1 2020-05-15 11:52 PM jaswin.shah
>> Exp $$ */ public class CartPGCoprocessFunction extends 
>> KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, 
>> ResultMessage> {
>>     private static final Logger logger = 
>> LoggerFactory.getLogger(CartPGCoprocessFunction.class); /** * Map state for 
>> cart messages, orderId+mid is key and
>> cartMessage is value. */ private static MapState<String, CartPG> cartPgState 
>> = null; /** * Intializations for cart and pg mapStates * * @param
>> config */ @Override public void open(Configuration config) {
>>         MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new 
>> MapStateDescriptor<> (
>>             Constants.CART_DATA, TypeInformation.of(String.class), 
>> TypeInformation.of(CartPG.class)
>>         ); cartPgState = 
>> getRuntimeContext().getMapState(cartPgMapStateDescriptor); }
>>     /** * * @return */ @Override public void onTimer(long timestamp, 
>> OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
>>"On timer called key is {}",ctx.getCurrentKey()); String 
>> searchKey = ctx.getCurrentKey(); CartPG  cartPg = 
>> cartPgState.get(searchKey); if(Objects.nonNull(cartPg)) {
>>             ctx.output(CartPGSideOutput.getOutputTag(), 
>> cartPgState.get(ctx.getCurrentKey())); cartPgState.remove(searchKey); }
>>     }
>>     /** * 1. Get orderId+mid from cartMessage and check in PGMapState if
>> an entry is present. * 2. If present, match, checkDescripancy,
>> process and delete entry from pgMapState. * 3. If not present, add
>> orderId+mid as key and cart object as value in cartMapState. * @param
>> cartMessage * @param context * @param collector * @throws Exception
>> */ @Override public void processElement1(CartMessage cartMessage, Context 
>> context, Collector<ResultMessage> collector) throws Exception {
>>         Long cartEventTimeStamp = context.timestamp();"cart 
>> time : {} ",cartEventTimeStamp); 
>> context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ 
>> ConfigurationsManager.getMaxWaitTimeForPGMessage()); String searchKey = 
>> cartMessage.createJoinStringCondition(); CartPG cartPG = 
>> cartPgState.get(searchKey); if(Objects.nonNull(cartPG) && 
>> Objects.nonNull(cartPG.getPgMessage())) {
>> generateResultMessage(cartMessage,cartPG.getPgMessage(),collector); 
>> cartPgState.remove(searchKey); } else {
>>             cartPG = new CartPG(); cartPG.setCartMessage(cartMessage); 
>> cartPgState.put(searchKey,cartPG); }
>>     }
>>     /** * 1. Get orderId+mid from pgMessage and check in cartMapState if
>> an entry is present. * 2. If present, match, checkDescripancy,
>> process and delete entry from cartMapState. * 3. If not present, add
>> orderId+mid as key and cart object as value in pgMapState. * @param
>> pgMessage * @param context * @param collector * @throws Exception */
>> @Override public void processElement2(PaymentNotifyRequestWrapper pgMessage, 
>> Context context, Collector<ResultMessage> collector) throws Exception {
>>         Long pgEventTimeStamp = context.timestamp();"pg time : 
>> {} ",pgEventTimeStamp); 
>> context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());
>>  String searchKey = pgMessage.createJoinStringCondition(); CartPG cartPG = 
>> cartPgState.get(searchKey); if(Objects.nonNull(cartPG) && 
>> Objects.nonNull(cartPG.getCartMessage())) {
>> generateResultMessage(cartPG.getCartMessage(),pgMessage,collector); 
>> cartPgState.remove(searchKey); } else {
>>             cartPG = new CartPG(); cartPG.setPgMessage(pgMessage); 
>> cartPgState.put(searchKey,cartPG); }
>>     }
>>     /** * Create ResultMessage from cart and pg messages. * * @param
>> cartMessage * @param pgMessage * @return */ private void
>> generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper 
>> pgMessage,Collector<ResultMessage> collector) {
>>         ResultMessage resultMessage = new ResultMessage(); Payment payment = 
>> null; //Logic should be in cart: check for (Payment pay : 
>> cartMessage.getPayments()) {
>>             if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
>> pay.mapToPaymentTypeInPG()) && 
>> StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
>>                 payment = pay; break; }
>>         }
>>         if(Objects.isNull(payment)) {
>>             return; }
>>         resultMessage.setOrderId(cartMessage.getId()); 
>> resultMessage.setMid(payment.getMid()); 
>> resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode()); 
>> resultMessage.setPgOrderStatus(pgMessage.getOrderStatus()); 
>> resultMessage.setCartOrderCompletionTime(payment.getUpdated_at()); 
>> resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime()); 
>> resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue()); 
>> resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));
>>  resultMessage.setCartPaymethod(payment.getPayment_method()); 
>> resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());
>>  checkDescripancyAndCollectResult(resultMessage,collector); }
>>     /** * Evaluate if there is descripancy of any fields between the
>> messages from two different systems. * Write all the descripancy
>> logic here. * * @param resultMessage */ private void
>> checkDescripancyAndCollectResult(ResultMessage resultMessage, 
>> Collector<ResultMessage> collector) {
>>         if 
>> (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), 
>> resultMessage.getPgOrderStatus())) {
>> resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
>>  collector.collect(resultMessage.clone()); }
>>         if 
>> (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount()))
>>  {
>> resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
>>  collector.collect(resultMessage.clone()); }
>>         if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), 
>> resultMessage.getPgPaymethod())) {
>> resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
>>  collector.collect(resultMessage.clone()); }
>>     }
>> }
>> /** * Connect to cart and pg streams and process * * @param
>> cartStream * @param pgStream * @return */ private 
>> SingleOutputStreamOperator<ResultMessage> 
>> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> 
>> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> 
>> pgStream) {
>>     return cartStream.connect(pgStream).keyBy(new 
>> CartJoinColumnsSelector(),new PGJoinColumnsSelector())
>>         .process(new CartPGCoprocessFunction()); }
>> private final static SimpleDateFormat cartInputFormat = new 
>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); private final static 
>> SimpleDateFormat pgInputFormat = new 
>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); public static Long 
>> extractCartTimeStamp(CartMessage cartMessage){
>>     try {
>>         Date orderTimeStamp = 
>> cartInputFormat.parse(cartMessage.fetchOrderCompletionTime()); return 
>> orderTimeStamp.getTime(); } catch (ParseException e) {
>>         logger.error("Exception in converting cart message timeStamp..",e); }
>>     return; }
>> public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
>>     try {
>>         Date orderTimeStamp = 
>> pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime()); return 
>> orderTimeStamp.getTime(); } catch (ParseException e) {
>>         logger.error("Exception in converting pg message timeStamp..",e); }
>>     return; }
>> private SingleOutputStreamOperator<CartMessage> 
>> processCartStream(ParameterTool parameter, StreamExecutionEnvironment 
>> executionEnvironment) {
>>     //1. Consume cartStream SingleOutputStreamOperator<CartMessage> 
>> cartStream = executionEnvironment.addSource(createCartConsumer()); 
>>; //2. Filter cart messages 
>> SingleOutputStreamOperator<CartMessage> filteredCartStream = 
>> cartStream.filter(new CartFilterFunction())    ; //3. Map carts data 
>> filteredCartStream = CartMappingService.mapCartsData(filteredCartStream); 
>> //4. Assign timestamps and watermarks 
>> filteredCartStream.assignTimestampsAndWatermarks(new 
>> BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS))))
>>  {
>>         @Override public long extractTimestamp(CartMessage cartMessage) {
>>             return DateTimeUtils.extractCartTimeStamp(cartMessage); }
>>     }); return filteredCartStream; }
>> private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> 
>> processPgStream(ParameterTool parameter, StreamExecutionEnvironment 
>> executionEnvironment) {
>>     //1. Consume pg streams 
>> SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = 
>> executionEnvironment.addSource(createPGConsumer()); 
>>; //2. Assign timestamps and watermarks to 
>> pg messages pgStream.assignTimestampsAndWatermarks(new 
>> BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS))))
>>  {
>>         @Override public long extractTimestamp(PaymentNotifyRequestWrapper 
>> pgMessage) {
>>             return DateTimeUtils.extractPGTimeStamp(pgMessage); }
>>     }); return pgStream; }
>> Can anyone please help what can be the issue here and if there is
>> somewrong time values handled in the code here.
>> Help will be highly appreciated.

