Hi Alex, Since you use customized MultiStorePacketState class as the value state type, it should use kryo serializer [1] to serialize your class via accessing RocksDB state or checkpoint via FileSystemStateBackend, and I don't know whether Kryo would serialize your transient field. If you're not familiar with Flink's serialization stack, I think you could check behaviors below:
1. Without any checkpoint restore, use FileSystemStateBackend to see whether the transient field could be read as expected, the answer should be yes. 2. After restoring from checkpoint, check whether could read the transient field back if using FileSystemStateBackend. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class Best Yun Tang ________________________________ From: Alex Drobinsky <alex.drobin...@gmail.com> Sent: Monday, October 11, 2021 22:37 To: JING ZHANG <beyond1...@gmail.com> Cc: User-Flink <user@flink.apache.org> Subject: Re: Reset of transient variables in state to default values. It would be difficult to provide even a semblance of the complete product , however I could try to provide enough details to reproduce the problem. Standard source would do: DataStream<byte[]> stream = env.addSource( new FlinkKafkaConsumer<>(topic, new AbstractDeserializationSchema<byte[]>() { @Override public byte[] deserialize(byte[] bytes) throws IOException { return bytes; } }, properties)).name(topic); The operator body something like: public class MultiStorePacketFunction extends KeyedProcessFunction<String, SplitterToMultiStore, ClassifierOutput> implements Serializable { private transient ValueState<MultiStorePacketState> state; @Override public void processElement(SplitterToMultiStore packet, Context ctx, Collector<ClassifierOutput> out) throws Exception { if (packet.hasPackets()) { storedPackets.inc(packet.getPackets().getPacketsCount()); } MultiStorePacketState so = state.value(); if (process(packet, out, so, ctx)) { state.update(null); state.clear(); } else { state.update(so); } } public String generateNextFilename(String sessionKey, int partNumber) { String path = DirectoryService.getInstance().bookDirectory(); return path + File.separator + sessionKey + "-" + partNumber + ".pcap"; } private void storeContent(Collector<ClassifierOutput> collector, MultiStorePacketState state, SplitterToMultiStore packets) throws Exception { assert (packets != null); assert (packets.hasPackets()); if ( state.currentFile == null) { openFile(collector, state, packets); } Utils.continueWriteToPcap(state.currentFile, packets.getPackets().getPacketsList()); state.fileOffset = state.currentFile.length(); tryToCloseFile(collector, state); } static public String extractExportedFileName(String fileName) { String path[] = fileName.split("/"); return path[path.length - 2] + "/" + path[path.length - 1]; } private void openFile(Collector<ClassifierOutput> collector, MultiStorePacketState state, SplitterToMultiStore packets) throws Exception { state.fileIsOpened = true; state.fileName = generateNextFilename(state.sessionKey, state.partNumber); state.exportedFileName = extractExportedFileName(state.fileName); // -> Here RandomAccessFile created state.currentFile = Utils.startWriteToPcap(state.fileName, packets.getPackets().getPacketsList()); state.fileOffset = state.currentFile.length(); state.partNumber++; } private void tryToCloseFile(Collector<ClassifierOutput> collector, MultiStorePacketState state) throws IOException { if (state.currentFile.length() < StorePacketConfigurationParameters.partSizeLimit) { return; } closeFile(collector, state); } private void closeFile(Collector<ClassifierOutput> collector, MultiStorePacketState state) throws IOException { state.currentFile.close(); state.currentFile = null; state.fileIsOpened = false; ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder(); outputBuilder.getUsualBuilder().setFileName(state.exportedFileName); outputBuilder.setSessionType(SessionType.Multi); outputBuilder.setSessionKey(state.sessionKey); var classifierOutput = outputBuilder.build(); state.sessionMetadata.add(classifierOutput); collector.collect(classifierOutput); } public boolean process(SplitterToMultiStore packet, Collector<ClassifierOutput> collector, MultiStorePacketState so, Context context) throws Exception { // First message if (packet.hasClassificationResult()) { sendClassificationResult(packet, collector, so); return false; } // Last message if (packet.hasSessionClosure()) { if (so.isCoverageIncorrect) { return true; } handleSessionClosure(packet, collector, so, context); return true; } if (so.isCoverageIncorrect) { return false; } storeContent(collector, so, packet); // File could be already close e.g. it reached expected size. if (so.currentFile != null) { setupTimer(so, context.timerService()); } return false; } private void handleSessionClosure(SplitterToMultiStore packet, Collector<ClassifierOutput> collector, MultiStorePacketState so, Context context) throws IOException { if (so.currentFile != null) { closeFile(collector, so); } ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder(); outputBuilder.setSessionKey(packet.getSessionKey()); outputBuilder.setSessionType(packet.getSessionType()); var messageBuilder = outputBuilder.getLastBuilder(); messageBuilder.addAllAggregatedSession(so.sessionMetadata); outputBuilder.setLast(messageBuilder.build()); var output = outputBuilder.build(); collector.collect(output); context.timerService().deleteProcessingTimeTimer(so.timerValue); state.clear(); } private void sendClassificationResult(SplitterToMultiStore packet, Collector<ClassifierOutput> collector, MultiStorePacketState so) { var coverageResult = CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID()); so.isCoverageIncorrect = !coverageResult.getValue0(); if (so.isCoverageIncorrect) { return; } ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder(); outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult()); outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID()); outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN()); outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId()); outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1()); outputBuilder.setSessionKey(packet.getSessionKey()); outputBuilder.setSessionType(packet.getSessionType()); so.sessionKey = packet.getSessionKey(); var classifierOutput = outputBuilder.build(); so.sessionMetadata.add(classifierOutput); collector.collect(classifierOutput); } @Override public void open(Configuration config) { ValueStateDescriptor<MultiStorePacketState> descriptor = new ValueStateDescriptor<MultiStorePacketState>( "MultiStorePacketState", // the state name TypeInformation.of(new TypeHint<MultiStorePacketState>() {}), // type information new MultiStorePacketState()); // default value of the state, if nothing was set state = getRuntimeContext().getState(descriptor); StorePacketConfigurationParameters.init(); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<ClassifierOutput> collector) throws Exception { MultiStorePacketState so = state.value(); if (so.currentFile != null) { closeFile(collector, so); } ctx.timerService().deleteProcessingTimeTimer(so.timerValue); state.update(so); } private void setupTimer(MultiStorePacketState so, TimerService timerService) { // Cancel previous timer timerService.deleteProcessingTimeTimer(so.timerValue); // Register new timer so.timerValue = (timerService.currentProcessingTime() + StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000; timerService.registerProcessingTimeTimer(so.timerValue); } } The state class looks like following: public class MultiStorePacketState implements Serializable { private /*static*/ Logger logger = LoggerFactory.getLogger(MultiStorePacketState.class); public transient RandomAccessFile currentFile = null; public long timerValue; public String fileName; public String exportedFileName; public String sessionKey; public long fileOffset = 0; public int partNumber = 0; public boolean isCoverageIncorrect = false; public boolean fileIsOpened = false; public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>(); private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { ois.defaultReadObject(); logger.info("Deserialized MultiStorePacketState: " + this.toString()); // No need to do anything in case of empty file if (fileName.isEmpty()) { return; } currentFile = new RandomAccessFile(fileName,"rw"); currentFile.seek(fileOffset); } } Input & output are Proto files , you could replace input with byte[] , and remove output generation and calls to collector. The test should generate & store data for a while, so at least once checkpoint would be triggered. I used a checkpoint interval 5000ms on a quite slow system. Every data chunk is about 1k. Utils.startWriteToPcap - new RandomAccessFile() Utils.writeToPcap - should be replaced with currentFile.write() пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com<mailto:beyond1...@gmail.com>>: Hi Alex, It is a little weird. Would you please provide the program which could reproduce the problem, including DataStream job code and related classes code. I need some debug to find out the reason. Best, JING ZHANG Alex Drobinsky <alex.drobin...@gmail.com<mailto:alex.drobin...@gmail.com>> 于2021年10月11日周一 下午5:50写道: Hi Jing Zhang, I'm using the FileSystem backend. I also implemented ReadObject function to support proper restart procedure: private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { ois.defaultReadObject(); logger.info("Deserialized MultiStorePacketState: " + this.toString()); // No need to do anything in case of empty file if (fileName.isEmpty()) { return; } currentFile = new RandomAccessFile(fileName,"rw"); currentFile.seek(fileOffset); } However, according to logs this function wasn't called. Btw, it could be beneficial to add this kind of State object e.g. FileState which will encapsulate serialization / deserialization for RandomAccessFile although the concept itself is a bit contradictory to regular state. Currently, I implemented and tested a workaround via addition of the boolean variable isFileOpened, however it's awkward because I need to check the state of the transient variable every time I use state.value(). So should it be expected that transient variables in state would be resetted to default values ? пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com<mailto:beyond1...@gmail.com>>: Hi, Alex What state backend do you choose? If you choose MemoryStateBackend or FsStateBackend, `transient` keyword may not have effect because MemoryStateBackend does not serialize state for regular read/write accesses but keeps it as objects on the heap. If you choose RocksDBStateBackend, I thought it was expected behavior because RocksDBStateBackend stores all state as byte arrays in embedded RocksDB instances. Therefore, it de/serializes the state of a key for every read/write access. CurrentFile is null because the transient variable would not be serialized by default. Best, JING ZHANG Alex Drobinsky <alex.drobin...@gmail.com<mailto:alex.drobin...@gmail.com>> 于2021年10月11日周一 下午4:33写道: Dear flink community, I have following state class ( irrelevant fields removed ) public class MultiStorePacketState implements Serializable { public transient RandomAccessFile currentFile = null; public long timerValue; public String fileName; public String exportedFileName; public String sessionKey; public long fileOffset = 0; } Once in a while, currentFile became nullified, this happens after I extract state via MultiStorePacketState so = state.value(); The frequency of this behaviour is similar to checkpoint interval ( checkpoint interval defined as 5 seconds and first occurence of this problem is also 5 seconds), otherwise I don't have any clues to a possible explanation. Is it an expected side effect of checkpoint procedure ? Best regards, Alex