You are right that a ValueState can keep a single value at any point of time. It is scoped to the current key of the operator though. So it keeps a single value for a key.
If your cartMessage.createJoinStringCondition()/pgMessage.createJoinStringCondition()/new CartJoinColumnsSelector()/new PGJoinColumnsSelector() are basically the same thing a ValueState should be enough. It will always be scoped to the result of new CartJoinColumnsSelector()/new PGJoinColumnsSelector(). I assumed it is the same because you are always using the ctx.getCurrent in the onTimer method. See this example [1]. There even though a ValueState is used, we calculate counts per key. Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example On 25/05/2020 11:17, Jaswin Shah wrote: > If I understand correctly, you are trying to tell that I should have > valueState of Map? > ------------------------------------------------------------------------ > *From:* Jaswin Shah > *Sent:* 25 May 2020 14:43 > *To:* Dawid Wysakowicz <dwysakow...@apache.org>; user@flink.apache.org > <user@flink.apache.org>; ankit.sing...@paytm.com > <ankit.sing...@paytm.com>; isha.sing...@paytm.com <isha.sing...@paytm.com> > *Subject:* Re: Timeout Callbacks issue -Flink > > Thanks for responding Dawid. > I would like to know more about MapState solution you talked about. As > per my understanding valueState maintains a single value at any point > of time. So, here what I want to maintain is the first streams > information until matching event have not found in second stream. So, > in that case how valueState could benefit me? Can you please explain > me that, might be I have understood it incorrectly what you are trying > to convey here. > > Thanks, > Jaswin > > > ------------------------------------------------------------------------ > *From:* Dawid Wysakowicz > *Sent:* Monday, May 25, 2020 14:23 > *To:* Jaswin Shah; user@flink.apache.org; ankit.sing...@paytm.com; > isha.sing...@paytm.com > *Subject:* Re: Timeout Callbacks issue -Flink > > Hi Jaswin, > > I can't see any obvious problems in your code. It looks rather > correct. What exactly do you mean that "callback is coming earlier > than registered callback timeout"? Could you explain that with some > examples? > > > As for the different timezones. Flink does not make any assumptions on > the timestamp. It uses it simply as longs. I'd suggest revisiting your > timestamp extraction logic to make sure it performs the extraction > correctly. I don't know how your data encodes the timestamps, but I > think you have a bug or two there ;) > > > The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this > field has timestamps in UTC, but you are parsing it in your JVM local > time zone. You treat the 'Z' as a literal > (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). > I don't know what the other field represents but you are also parsing > it in a local timezone. If the field represents local date it is > probably correct. > > > To mitigate those problems I'd strongly recommend using the java.time > API. For the extractCartTimeStamp you could use > Instant.parse("...").toEpochMilli. It expects the format you are > receiving. For the extractPGTimeStamp you could use > LocalDateTime.parse("..."), by default it uses the format you are > receiving. Then you should convert the local date time to an instant > LocalDateTime.parse("...").atZone(/* the zone which this date > represents */).toInstant().toEpochMilli(); This has nothing to do with > Flink though ;) > > > BTW one Flink issue I can see is that I think you don't need to use a > MapState there. Any kind of state in a KeyedCoProcessFunction is > always scoped to the current key. Therefore if you only ever put items > under the currentKey you will have at most single element in your map. > Think of the MapState as a map of maps MapState<UserKey, Value> = > Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should > be enough imo. > > > Best, > > Dawid > > > On 23/05/2020 14:39, Jaswin Shah wrote: >> ++ >> Here, I am registering the callback time for an even with processing >> time and calculating the time value as events time + expiryTimeout value. >> >> Can this be the issue here due to hybrid timings usage? >> Also, do we need any special handling if we use event time semantics >> for callback timeouts registrations? >> >> Thanks, >> Jaswin >> ------------------------------------------------------------------------ >> *From:* Jaswin Shah <jaswin.s...@outlook.com> >> <mailto:jaswin.s...@outlook.com> >> *Sent:* 23 May 2020 17:18 >> *To:* user@flink.apache.org <mailto:user@flink.apache.org> >> <user@flink.apache.org> <mailto:user@flink.apache.org>; Arvid Heise >> <ar...@ververica.com> <mailto:ar...@ververica.com>; Yun Tang >> <myas...@live.com> <mailto:myas...@live.com> >> *Subject:* Timeout Callbacks issue -Flink >> >> Hi, >> I am running flink job with following functionality: >> >> 1. I consume stream1 and stream2 from two kafka topics and assign >> the watermarks to the events of two streams by extracting the >> timestamps from the events in streams. >> 2. Then, I am connecting two streams and calling >> KeyedCoProcessFunction on connectedStream. >> 3. I have processElement1 method and processElement2 methods which >> receive the events of two streams 1 and 2 and do the join logic >> as shown in below code snippet. >> 4. I have shared mapstate for two streams. >> 5. When an event comes to processElement method, I register the >> callback time for that message to ensure if corresponding >> matching message is not arrived from other stream, I will send >> the message to sideOutput on invocation of callback method i.e. >> onTimer. >> >> Something is getting wrong in the callback times registrations for >> events due to which for many messages of stream2 the callback is >> coming earlier than registered callback timeout. >> Also, the events from stream 2 are based on GMT times +5:30 as I can >> see in the timevalue in event message, for stream1 it;s normal TZ >> only. Though I am weak in analysing the timeout formats so could be >> wrong in analysis this side. >> >> Below is code snippets I have implemented for KeyedCoProcessFunctions >> and timestamp calculations and watermarks registrations. >> /** * CoProcessFuntion to process cart and pg messages connected >> using connect operator. * @author jaswin.shah * @version $Id: >> CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah >> Exp $$ */ public class CartPGCoprocessFunction extends >> KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, >> ResultMessage> { >> >> private static final Logger logger = >> LoggerFactory.getLogger(CartPGCoprocessFunction.class); /** * Map state for >> cart messages, orderId+mid is key and >> cartMessage is value. */ private static MapState<String, CartPG> cartPgState >> = null; /** * Intializations for cart and pg mapStates * * @param >> config */ @Override public void open(Configuration config) { >> >> MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new >> MapStateDescriptor<> ( >> Constants.CART_DATA, TypeInformation.of(String.class), >> TypeInformation.of(CartPG.class) >> ); cartPgState = >> getRuntimeContext().getMapState(cartPgMapStateDescriptor); } >> >> /** * * @return */ @Override public void onTimer(long timestamp, >> OnTimerContext ctx, Collector<ResultMessage> out) throws Exception { >> logger.info("On timer called key is {}",ctx.getCurrentKey()); String >> searchKey = ctx.getCurrentKey(); CartPG cartPg = >> cartPgState.get(searchKey); if(Objects.nonNull(cartPg)) { >> ctx.output(CartPGSideOutput.getOutputTag(), >> cartPgState.get(ctx.getCurrentKey())); cartPgState.remove(searchKey); } >> } >> >> /** * 1. Get orderId+mid from cartMessage and check in PGMapState if >> an entry is present. * 2. If present, match, checkDescripancy, >> process and delete entry from pgMapState. * 3. If not present, add >> orderId+mid as key and cart object as value in cartMapState. * @param >> cartMessage * @param context * @param collector * @throws Exception >> */ @Override public void processElement1(CartMessage cartMessage, Context >> context, Collector<ResultMessage> collector) throws Exception { >> Long cartEventTimeStamp = context.timestamp(); logger.info("cart >> time : {} ",cartEventTimeStamp); >> context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ >> ConfigurationsManager.getMaxWaitTimeForPGMessage()); String searchKey = >> cartMessage.createJoinStringCondition(); CartPG cartPG = >> cartPgState.get(searchKey); if(Objects.nonNull(cartPG) && >> Objects.nonNull(cartPG.getPgMessage())) { >> >> generateResultMessage(cartMessage,cartPG.getPgMessage(),collector); >> cartPgState.remove(searchKey); } else { >> cartPG = new CartPG(); cartPG.setCartMessage(cartMessage); >> cartPgState.put(searchKey,cartPG); } >> } >> >> /** * 1. Get orderId+mid from pgMessage and check in cartMapState if >> an entry is present. * 2. If present, match, checkDescripancy, >> process and delete entry from cartMapState. * 3. If not present, add >> orderId+mid as key and cart object as value in pgMapState. * @param >> pgMessage * @param context * @param collector * @throws Exception */ >> @Override public void processElement2(PaymentNotifyRequestWrapper pgMessage, >> Context context, Collector<ResultMessage> collector) throws Exception { >> >> Long pgEventTimeStamp = context.timestamp(); logger.info("pg time : >> {} ",pgEventTimeStamp); >> context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage()); >> String searchKey = pgMessage.createJoinStringCondition(); CartPG cartPG = >> cartPgState.get(searchKey); if(Objects.nonNull(cartPG) && >> Objects.nonNull(cartPG.getCartMessage())) { >> >> generateResultMessage(cartPG.getCartMessage(),pgMessage,collector); >> cartPgState.remove(searchKey); } else { >> cartPG = new CartPG(); cartPG.setPgMessage(pgMessage); >> cartPgState.put(searchKey,cartPG); } >> } >> >> /** * Create ResultMessage from cart and pg messages. * * @param >> cartMessage * @param pgMessage * @return */ private void >> generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper >> pgMessage,Collector<ResultMessage> collector) { >> ResultMessage resultMessage = new ResultMessage(); Payment payment = >> null; //Logic should be in cart: check for (Payment pay : >> cartMessage.getPayments()) { >> if (StringUtils.equals(Constants.FORWARD_PAYMENT, >> pay.mapToPaymentTypeInPG()) && >> StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) { >> payment = pay; break; } >> } >> if(Objects.isNull(payment)) { >> return; } >> >> resultMessage.setOrderId(cartMessage.getId()); >> resultMessage.setMid(payment.getMid()); >> resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode()); >> resultMessage.setPgOrderStatus(pgMessage.getOrderStatus()); >> resultMessage.setCartOrderCompletionTime(payment.getUpdated_at()); >> resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime()); >> resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue()); >> resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal()))); >> resultMessage.setCartPaymethod(payment.getPayment_method()); >> resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod()); >> checkDescripancyAndCollectResult(resultMessage,collector); } >> >> /** * Evaluate if there is descripancy of any fields between the >> messages from two different systems. * Write all the descripancy >> logic here. * * @param resultMessage */ private void >> checkDescripancyAndCollectResult(ResultMessage resultMessage, >> Collector<ResultMessage> collector) { >> >> if >> (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), >> resultMessage.getPgOrderStatus())) { >> >> resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY); >> collector.collect(resultMessage.clone()); } >> >> if >> (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) >> { >> >> resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY); >> collector.collect(resultMessage.clone()); } >> >> if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), >> resultMessage.getPgPaymethod())) { >> >> resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY); >> collector.collect(resultMessage.clone()); } >> } >> } >> >> /** * Connect to cart and pg streams and process * * @param >> cartStream * @param pgStream * @return */ private >> SingleOutputStreamOperator<ResultMessage> >> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> >> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> >> pgStream) { >> return cartStream.connect(pgStream).keyBy(new >> CartJoinColumnsSelector(),new PGJoinColumnsSelector()) >> .process(new CartPGCoprocessFunction()); } >> >> >> private final static SimpleDateFormat cartInputFormat = new >> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); private final static >> SimpleDateFormat pgInputFormat = new >> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); public static Long >> extractCartTimeStamp(CartMessage cartMessage){ >> try { >> Date orderTimeStamp = >> cartInputFormat.parse(cartMessage.fetchOrderCompletionTime()); return >> orderTimeStamp.getTime(); } catch (ParseException e) { >> logger.error("Exception in converting cart message timeStamp..",e); } >> return Instant.now().toEpochMilli(); } >> >> public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){ >> try { >> Date orderTimeStamp = >> pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime()); return >> orderTimeStamp.getTime(); } catch (ParseException e) { >> logger.error("Exception in converting pg message timeStamp..",e); } >> return Instant.now().toEpochMilli(); } >> >> >> private SingleOutputStreamOperator<CartMessage> >> processCartStream(ParameterTool parameter, StreamExecutionEnvironment >> executionEnvironment) { >> //1. Consume cartStream SingleOutputStreamOperator<CartMessage> >> cartStream = executionEnvironment.addSource(createCartConsumer()); >> cartStream.name(Constants.CART_SYSTEM); //2. Filter cart messages >> SingleOutputStreamOperator<CartMessage> filteredCartStream = >> cartStream.filter(new CartFilterFunction()) ; //3. Map carts data >> filteredCartStream = CartMappingService.mapCartsData(filteredCartStream); >> //4. Assign timestamps and watermarks >> filteredCartStream.assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) >> { >> @Override public long extractTimestamp(CartMessage cartMessage) { >> return DateTimeUtils.extractCartTimeStamp(cartMessage); } >> }); return filteredCartStream; } >> >> private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> >> processPgStream(ParameterTool parameter, StreamExecutionEnvironment >> executionEnvironment) { >> >> //1. Consume pg streams >> SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = >> executionEnvironment.addSource(createPGConsumer()); >> pgStream.name(Constants.PG_SYSTEM); //2. Assign timestamps and watermarks to >> pg messages pgStream.assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) >> { >> @Override public long extractTimestamp(PaymentNotifyRequestWrapper >> pgMessage) { >> return DateTimeUtils.extractPGTimeStamp(pgMessage); } >> }); return pgStream; } >> >> Can anyone please help what can be the issue here and if there is >> somewrong time values handled in the code here. >> >> Help will be highly appreciated.
signature.asc
Description: OpenPGP digital signature