Hi Flink will store state in StateBackend, there exist two StateBackends: HeapStateBackend - which will store state in heap, and RocksDBStateBackend -- which will store state in RocksDB.
You can set RocksDB with the following ways:[1] 1. add `env.setStateBackend(...);` in your code 2. add configuration `state.backend: rocksdb` in `flink-conf.yaml` [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#configuring-a-state-backend Best, Congxian Jaswin Shah <jaswin.s...@outlook.com> 于2020年5月19日周二 下午3:59写道: > ++ > ------------------------------ > *From:* Yun Tang <myas...@live.com> > *Sent:* 18 May 2020 23:47 > *To:* Arvid Heise <ar...@ververica.com>; Jaswin Shah < > jaswin.s...@outlook.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Rocksdb implementation > > Hi Jaswin > > As Arvid suggested, it's not encouraged to query the internal RocksDB > directly. Apart from Arvid's solution, I think queryable state [1] might > also help you. I think you just want to know the left entries in both of > map state after several days and query the state should make the meet, > please refer to the official doc and this example [2] to know more details. > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html > [2] > https://github.com/apache/flink/blob/def4cbb762f849ce5b5c9c6bd367fd62bc8f45de/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java#L108-L122 > > Best > Yun Tang > ------------------------------ > *From:* Arvid Heise <ar...@ververica.com> > *Sent:* Monday, May 18, 2020 23:40 > *To:* Jaswin Shah <jaswin.s...@outlook.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Rocksdb implementation > > Hi Jaswin, > > I'd discourage using rocksdb directly. It's more of an implementation > detail of Flink. I'd also discourage to write to Kafka directly without > using our Kafka Sink, as you will receive duplicates upon recovery. > > If you run the KeyedCoProcessFunction continuously anyways, I'd add a > timer (2 days?) [1] for all unmatched records and on triggering of the > timer, output the record through a side output [2], where you do your batch > logic. Then you don't need a separate batch job to clean that up. If you > actually want to output to Kafka for some other application, you just need > to stream the side output to a KafkaProducer. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html > > On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <jaswin.s...@outlook.com> > wrote: > > /** > * Alipay.com Inc. > * Copyright (c) 2004-2020 All Rights Reserved. > */ > package com.paytm.reconsys.functions.processfunctions; > > import com.paytm.reconsys.Constants; > import com.paytm.reconsys.configs.ConfigurationsManager; > import com.paytm.reconsys.enums.DescripancyTypeEnum; > import com.paytm.reconsys.exceptions.MissingConfigurationsException; > import com.paytm.reconsys.messages.ResultMessage; > import com.paytm.reconsys.messages.cart.CartMessage; > import com.paytm.reconsys.messages.cart.Payment; > import com.paytm.reconsys.messages.pg.PGMessage; > import org.apache.commons.lang3.StringUtils; > import org.apache.flink.api.common.state.MapState; > import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; > import org.apache.flink.util.Collector; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.Producer; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.KafkaException; > import org.apache.kafka.common.errors.AuthorizationException; > import org.apache.kafka.common.errors.OutOfOrderSequenceException; > import org.apache.kafka.common.errors.ProducerFencedException; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.flink.api.common.state.MapStateDescriptor; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.configuration.Configuration; > import java.util.Properties; > > /** > * CoProcessFuntion to process cart and pg messages connected using connect > operator. > * @author jaswin.shah > * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM > jaswin.shah Exp $$ > */ > public class CartPGCoprocessFunction extends > KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> { > > /** > * Map state for cart messages, orderId+mid is key and cartMessage is > value. > */ > private MapState<String, CartMessage> cartState = null; > > /** > * Map state for pg messages, orderId+mid is key and pgMessage is value. > */ > private MapState<String, PGMessage> pgState = null; > > /** > * Intializations for cart and pg mapStates > * > * @param config > */ > @Override > public void open(Configuration config) { > MapStateDescriptor<String, CartMessage> cartStateDescriptor = new > MapStateDescriptor<> ( > "cartData", > TypeInformation.of(String.class), > TypeInformation.of(CartMessage.class) > ); > cartState = getRuntimeContext().getMapState(cartStateDescriptor); > > MapStateDescriptor<String, PGMessage> pgStateDescriptor = new > MapStateDescriptor<>( > "pgData", > TypeInformation.of(String.class), > TypeInformation.of(PGMessage.class) > ); > pgState = getRuntimeContext().getMapState(pgStateDescriptor); > } > > /** > * 1. Get orderId+mid from cartMessage and check in PGMapState if an > entry is present. > * 2. If present, match, checkDescripancy, process and delete entry from > pgMapState. > * 3. If not present, add orderId+mid as key and cart object as value in > cartMapState. > * @param cartMessage > * @param context > * @param collector > * @throws Exception > */ > @Override > public void processElement1(CartMessage cartMessage, Context context, > Collector<ResultMessage> collector) throws Exception { > String searchKey = cartMessage.createJoinStringCondition(); > if(pgState.contains(searchKey)) { > generateResultMessage(cartMessage,pgState.get(searchKey)); > pgState.remove(searchKey); > } else { > cartState.put(searchKey,cartMessage); > } > } > > /** > * 1. Get orderId+mid from pgMessage and check in cartMapState if an > entry is present. > * 2. If present, match, checkDescripancy, process and delete entry from > cartMapState. > * 3. If not present, add orderId+mid as key and cart object as value in > pgMapState. > * @param pgMessage > * @param context > * @param collector > * @throws Exception > */ > @Override > public void processElement2(PGMessage pgMessage, Context context, > Collector<ResultMessage> collector) throws Exception { > String searchKey = pgMessage.createJoinStringCondition(); > if(cartState.contains(searchKey)) { > generateResultMessage(cartState.get(searchKey),pgMessage); > cartState.remove(searchKey); > } else { > pgState.put(searchKey,pgMessage); > } > } > > > /** > * Create ResultMessage from cart and pg messages. > * > * @param cartMessage > * @param pgMessage > * @return > */ > private ResultMessage generateResultMessage(CartMessage cartMessage, > PGMessage pgMessage) { > ResultMessage resultMessage = new ResultMessage(); > Payment payment = null; > > //Logic should be in cart: check > for (Payment pay : cartMessage.getPayments()) { > if (StringUtils.equals(Constants.FORWARD_PAYMENT, > pay.mapToPaymentTypeInPG()) && > StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) { > payment = pay; > break; > } > } > resultMessage.setOrderId(cartMessage.getId()); > resultMessage.setMid(payment.getMid()); > > > resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode()); > resultMessage.setPgOrderStatus(pgMessage.getOrderStatus()); > > resultMessage.setCartOrderCompletionTime(payment.getUpdated_at()); > > resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime()); > > resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue()); > resultMessage.setCartOrderAmount(cartMessage.getGrandtotal()); > > resultMessage.setCartPaymethod(payment.getPayment_method()); > resultMessage.setPgPaymethod(pgMessage.getPayMethod()); > > checkDescripancyAndTriggerAlert(resultMessage); > > return resultMessage; > } > > /** > * Evaluate if there is descripancy of any fields between the messages > from two different systems. > * Write all the descripancy logic here. > * > * @param resultMessage > */ > private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) > { > if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), > resultMessage.getPgOrderStatus())) { > > resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY); > //Send message to kafka queue for order status discrepancy. > sendMessageToKafkaTopic(resultMessage.toString()); > } > > if (!StringUtils.equals(resultMessage.getCartOrderAmount(), > resultMessage.getPgOrderAmount())) { > > resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY); > //Send message to kafka queue for pay method discrepancy. > sendMessageToKafkaTopic(resultMessage.toString()); > } > > if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), > resultMessage.getPgPaymethod())) { > > resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY); > //Send message to kafka queue for pay amount discrepancy. > sendMessageToKafkaTopic(resultMessage.toString()); > } > } > > /** > * Send a message to kafka topic > * > * @param message > */ > private void sendMessageToKafkaTopic(String message) { > Properties kafkaProperties = > ConfigurationsManager.getResultSystemKafkaProperties(); > //kafkaProperties.put("transactional.id","trans123"); > Producer<String, String> producer = new > KafkaProducer<>(kafkaProperties, new StringSerializer(), new > StringSerializer()); > //producer.initTransactions(); > try { > //producer.beginTransaction(); > producer.send(new > ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message)); > //producer.commitTransaction(); > } catch (ProducerFencedException | OutOfOrderSequenceException | > AuthorizationException e) { > // We can't recover from these exceptions, so our only option is > to close the producer and exit. > producer.close(); > } catch (KafkaException e) { > producer.abortTransaction(); > } catch (MissingConfigurationsException e) { > e.printStackTrace(); > } > producer.close(); > } > } > > This is the snapshot of implementation I have done > ------------------------------ > *From:* Jaswin Shah <jaswin.s...@outlook.com> > *Sent:* 18 May 2020 13:55 > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Rocksdb implementation > > Hi, > I have implemented the flink job with MapStates. The functionality is > like, > > 1. I have two datastreams which I connect with connect operator and > then call coprocessfunction with every pair of objects. > 2. For element of first datastream, processElement1 method is called > and for an element of second datastream, processElement2 method is called. > 3. I have two MapStates in CoProcessFunction for both streams > separately. > 4. When processElement1 is called, it checks in MapState2 if > corresponding element with given id is present, if present, I match, and > delete. If not present, I add the object in MapState1. > 5. When processElement2 is called, it checks in MapState1 if > corresponding element with given id is present, if present, I match and > delete. I fnot present I add object in MapState2. > 6. Now, I want all the state data to be stored in Rocksdb. > 7. After few days, I want to run a batch streaming job on Rocksdb to > check if there are any objects which have not match found to create a > report of those. > > I need a help how can I store this state data in Rocksdb and how to do > setups, configurations and codes for those which I am not understanding. > Also, is it possible to run batch streaming job on Rocksdb data? > > Help will be highly appreciated. > > Thanks, > Jaswin > > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >