Hi, For RocksDB state backend, it will pick the registered kryo serializer for normal read/write use and checkpint/restore. Moreover, since key-values are serialized to store in RocksDB, it actually deep copy them to avoid later unexpected modification.
For FileSystem/HashMap state backend, it will pick the registered kryo serializer only for checkpoint/restore. Since java-based state backend would not deep copy key-values for performance reasons, it might be changed unexpectedly if user misused, which might make the field reset to default value. Best, Yun Tang ________________________________ From: Arvid Heise <ar...@apache.org> Sent: Monday, October 18, 2021 20:30 To: Alex Drobinsky <alex.drobin...@gmail.com> Cc: JING ZHANG <beyond1...@gmail.com>; Yun Tang <myas...@live.com>; User-Flink <user@flink.apache.org> Subject: Re: Reset of transient variables in state to default values. That's what I would try out, but I'm not sure if the statebackend would pick that up. @Yun Tang<mailto:myas...@live.com> do you know more? On Mon, Oct 18, 2021 at 9:37 AM Alex Drobinsky <alex.drobin...@gmail.com<mailto:alex.drobin...@gmail.com>> wrote: Hi Arvid, It sounds like a good direction, do I need to register my state class with KryoSerializer , similar to this ? env.getConfig().registerTypeWithKryoSerializer(IPSessionOrganizer.proto.SourceOutput.class, ProtobufSerializer.class); пн, 18 окт. 2021 г. в 10:32, Arvid Heise <ar...@apache.org<mailto:ar...@apache.org>>: Hi Alex, could you also log the identifity hashcode (or something similar) of the related instance? I suspect that it's not the field that is set to null but that you get a clone where the field is null. In that case, you need to add a specific KryoSerializer to initialize it (or just go with a lazy access pattern all the way). On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky <alex.drobin...@gmail.com<mailto:alex.drobin...@gmail.com>> wrote: Hi Jing, Job doesn't restart from the checkpoint, it's a brand new clean job , no exceptions happened during execution, no restarts :) The state is a Keyed State so a new key means a new State - in this situation a currentFile is equal to null - as expected and handled without issues. Before I even thought to inquire about my questions, the first thing I did - I added log messages with the value of currentFile in any place it could be changed. So I checked that before I release my control to Flink, currentFile has the correct value and after I receive value from state in the next iteration it's set to null. The checkpoints by themselves could be irrelevant to the problem, the only indication of connection is my assumption based on observation that the interval between first event and first occurrence of nullification is exactly the same as the checkpoint interval. Yun Tang - you are correct, it's a KryoSerializer, if I remove the "transient" qualifier from currentFile, it crashes inside of KryoSerializer because RandomAccessFile isn't serializable. Which also points to the fact that at least once serialization was actually executed. I will try an alternative approach - I will add my own writeObject implementation, it should work :) Best regards, Alex вт, 12 окт. 2021 г. в 15:07, JING ZHANG <beyond1...@gmail.com<mailto:beyond1...@gmail.com>>: Hi Alex, Since you use `FileSystemStateBackend`, I think currentFile became nullified once in a while is not caused by period checkpoint. Because if job is running without failover or restore from checkpoint, read/write value state on `FileSystemStateBackend` does not cause serialization and deserialization at all. I have already simplify your coding and verify this point. If you are interested, you can find this simplified code in the attachment of the email. There are some possible reasons come to my mind, I hope this helps. 1. Does job restore from checkpoint/savepoint? This may caused by failover or user trigger stop-with-savepoint. 2. If job does not restore from checkpoint or savepoint. 2.1 After read the MultiStorePacketState from ValueState, is there somewhere in your program to update the currentFile field to null again? Because the state stored in heap, it may be changed if program updates its value somewhere. 2.2 When the currentFile became null, is there any possible that current key never appear before? that is it's the first time that the current key appears, so get state would return default value(a new MultiStorePacketState instance with null currentFile) Best, JING ZHANG Yun Tang <myas...@live.com<mailto:myas...@live.com>> 于2021年10月12日周二 下午4:41写道: Hi Alex, Since you use customized MultiStorePacketState class as the value state type, it should use kryo serializer [1] to serialize your class via accessing RocksDB state or checkpoint via FileSystemStateBackend, and I don't know whether Kryo would serialize your transient field. If you're not familiar with Flink's serialization stack, I think you could check behaviors below: 1. Without any checkpoint restore, use FileSystemStateBackend to see whether the transient field could be read as expected, the answer should be yes. 2. After restoring from checkpoint, check whether could read the transient field back if using FileSystemStateBackend. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class Best Yun Tang ________________________________ From: Alex Drobinsky <alex.drobin...@gmail.com<mailto:alex.drobin...@gmail.com>> Sent: Monday, October 11, 2021 22:37 To: JING ZHANG <beyond1...@gmail.com<mailto:beyond1...@gmail.com>> Cc: User-Flink <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Reset of transient variables in state to default values. It would be difficult to provide even a semblance of the complete product , however I could try to provide enough details to reproduce the problem. Standard source would do: DataStream<byte[]> stream = env.addSource( new FlinkKafkaConsumer<>(topic, new AbstractDeserializationSchema<byte[]>() { @Override public byte[] deserialize(byte[] bytes) throws IOException { return bytes; } }, properties)).name(topic); The operator body something like: public class MultiStorePacketFunction extends KeyedProcessFunction<String, SplitterToMultiStore, ClassifierOutput> implements Serializable { private transient ValueState<MultiStorePacketState> state; @Override public void processElement(SplitterToMultiStore packet, Context ctx, Collector<ClassifierOutput> out) throws Exception { if (packet.hasPackets()) { storedPackets.inc(packet.getPackets().getPacketsCount()); } MultiStorePacketState so = state.value(); if (process(packet, out, so, ctx)) { state.update(null); state.clear(); } else { state.update(so); } } public String generateNextFilename(String sessionKey, int partNumber) { String path = DirectoryService.getInstance().bookDirectory(); return path + File.separator + sessionKey + "-" + partNumber + ".pcap"; } private void storeContent(Collector<ClassifierOutput> collector, MultiStorePacketState state, SplitterToMultiStore packets) throws Exception { assert (packets != null); assert (packets.hasPackets()); if ( state.currentFile == null) { openFile(collector, state, packets); } Utils.continueWriteToPcap(state.currentFile, packets.getPackets().getPacketsList()); state.fileOffset = state.currentFile.length(); tryToCloseFile(collector, state); } static public String extractExportedFileName(String fileName) { String path[] = fileName.split("/"); return path[path.length - 2] + "/" + path[path.length - 1]; } private void openFile(Collector<ClassifierOutput> collector, MultiStorePacketState state, SplitterToMultiStore packets) throws Exception { state.fileIsOpened = true; state.fileName = generateNextFilename(state.sessionKey, state.partNumber); state.exportedFileName = extractExportedFileName(state.fileName); // -> Here RandomAccessFile created state.currentFile = Utils.startWriteToPcap(state.fileName, packets.getPackets().getPacketsList()); state.fileOffset = state.currentFile.length(); state.partNumber++; } private void tryToCloseFile(Collector<ClassifierOutput> collector, MultiStorePacketState state) throws IOException { if (state.currentFile.length() < StorePacketConfigurationParameters.partSizeLimit) { return; } closeFile(collector, state); } private void closeFile(Collector<ClassifierOutput> collector, MultiStorePacketState state) throws IOException { state.currentFile.close(); state.currentFile = null; state.fileIsOpened = false; ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder(); outputBuilder.getUsualBuilder().setFileName(state.exportedFileName); outputBuilder.setSessionType(SessionType.Multi); outputBuilder.setSessionKey(state.sessionKey); var classifierOutput = outputBuilder.build(); state.sessionMetadata.add(classifierOutput); collector.collect(classifierOutput); } public boolean process(SplitterToMultiStore packet, Collector<ClassifierOutput> collector, MultiStorePacketState so, Context context) throws Exception { // First message if (packet.hasClassificationResult()) { sendClassificationResult(packet, collector, so); return false; } // Last message if (packet.hasSessionClosure()) { if (so.isCoverageIncorrect) { return true; } handleSessionClosure(packet, collector, so, context); return true; } if (so.isCoverageIncorrect) { return false; } storeContent(collector, so, packet); // File could be already close e.g. it reached expected size. if (so.currentFile != null) { setupTimer(so, context.timerService()); } return false; } private void handleSessionClosure(SplitterToMultiStore packet, Collector<ClassifierOutput> collector, MultiStorePacketState so, Context context) throws IOException { if (so.currentFile != null) { closeFile(collector, so); } ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder(); outputBuilder.setSessionKey(packet.getSessionKey()); outputBuilder.setSessionType(packet.getSessionType()); var messageBuilder = outputBuilder.getLastBuilder(); messageBuilder.addAllAggregatedSession(so.sessionMetadata); outputBuilder.setLast(messageBuilder.build()); var output = outputBuilder.build(); collector.collect(output); context.timerService().deleteProcessingTimeTimer(so.timerValue); state.clear(); } private void sendClassificationResult(SplitterToMultiStore packet, Collector<ClassifierOutput> collector, MultiStorePacketState so) { var coverageResult = CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID()); so.isCoverageIncorrect = !coverageResult.getValue0(); if (so.isCoverageIncorrect) { return; } ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder(); outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult()); outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID()); outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN()); outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId()); outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1()); outputBuilder.setSessionKey(packet.getSessionKey()); outputBuilder.setSessionType(packet.getSessionType()); so.sessionKey = packet.getSessionKey(); var classifierOutput = outputBuilder.build(); so.sessionMetadata.add(classifierOutput); collector.collect(classifierOutput); } @Override public void open(Configuration config) { ValueStateDescriptor<MultiStorePacketState> descriptor = new ValueStateDescriptor<MultiStorePacketState>( "MultiStorePacketState", // the state name TypeInformation.of(new TypeHint<MultiStorePacketState>() {}), // type information new MultiStorePacketState()); // default value of the state, if nothing was set state = getRuntimeContext().getState(descriptor); StorePacketConfigurationParameters.init(); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<ClassifierOutput> collector) throws Exception { MultiStorePacketState so = state.value(); if (so.currentFile != null) { closeFile(collector, so); } ctx.timerService().deleteProcessingTimeTimer(so.timerValue); state.update(so); } private void setupTimer(MultiStorePacketState so, TimerService timerService) { // Cancel previous timer timerService.deleteProcessingTimeTimer(so.timerValue); // Register new timer so.timerValue = (timerService.currentProcessingTime() + StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000; timerService.registerProcessingTimeTimer(so.timerValue); } } The state class looks like following: public class MultiStorePacketState implements Serializable { private /*static*/ Logger logger = LoggerFactory.getLogger(MultiStorePacketState.class); public transient RandomAccessFile currentFile = null; public long timerValue; public String fileName; public String exportedFileName; public String sessionKey; public long fileOffset = 0; public int partNumber = 0; public boolean isCoverageIncorrect = false; public boolean fileIsOpened = false; public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>(); private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { ois.defaultReadObject(); logger.info("Deserialized MultiStorePacketState: " + this.toString()); // No need to do anything in case of empty file if (fileName.isEmpty()) { return; } currentFile = new RandomAccessFile(fileName,"rw"); currentFile.seek(fileOffset); } } Input & output are Proto files , you could replace input with byte[] , and remove output generation and calls to collector. The test should generate & store data for a while, so at least once checkpoint would be triggered. I used a checkpoint interval 5000ms on a quite slow system. Every data chunk is about 1k. Utils.startWriteToPcap - new RandomAccessFile() Utils.writeToPcap - should be replaced with currentFile.write() пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com<mailto:beyond1...@gmail.com>>: Hi Alex, It is a little weird. Would you please provide the program which could reproduce the problem, including DataStream job code and related classes code. I need some debug to find out the reason. Best, JING ZHANG Alex Drobinsky <alex.drobin...@gmail.com<mailto:alex.drobin...@gmail.com>> 于2021年10月11日周一 下午5:50写道: Hi Jing Zhang, I'm using the FileSystem backend. I also implemented ReadObject function to support proper restart procedure: private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { ois.defaultReadObject(); logger.info("Deserialized MultiStorePacketState: " + this.toString()); // No need to do anything in case of empty file if (fileName.isEmpty()) { return; } currentFile = new RandomAccessFile(fileName,"rw"); currentFile.seek(fileOffset); } However, according to logs this function wasn't called. Btw, it could be beneficial to add this kind of State object e.g. FileState which will encapsulate serialization / deserialization for RandomAccessFile although the concept itself is a bit contradictory to regular state. Currently, I implemented and tested a workaround via addition of the boolean variable isFileOpened, however it's awkward because I need to check the state of the transient variable every time I use state.value(). So should it be expected that transient variables in state would be resetted to default values ? пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com<mailto:beyond1...@gmail.com>>: Hi, Alex What state backend do you choose? If you choose MemoryStateBackend or FsStateBackend, `transient` keyword may not have effect because MemoryStateBackend does not serialize state for regular read/write accesses but keeps it as objects on the heap. If you choose RocksDBStateBackend, I thought it was expected behavior because RocksDBStateBackend stores all state as byte arrays in embedded RocksDB instances. Therefore, it de/serializes the state of a key for every read/write access. CurrentFile is null because the transient variable would not be serialized by default. Best, JING ZHANG Alex Drobinsky <alex.drobin...@gmail.com<mailto:alex.drobin...@gmail.com>> 于2021年10月11日周一 下午4:33写道: Dear flink community, I have following state class ( irrelevant fields removed ) public class MultiStorePacketState implements Serializable { public transient RandomAccessFile currentFile = null; public long timerValue; public String fileName; public String exportedFileName; public String sessionKey; public long fileOffset = 0; } Once in a while, currentFile became nullified, this happens after I extract state via MultiStorePacketState so = state.value(); The frequency of this behaviour is similar to checkpoint interval ( checkpoint interval defined as 5 seconds and first occurence of this problem is also 5 seconds), otherwise I don't have any clues to a possible explanation. Is it an expected side effect of checkpoint procedure ? Best regards, Alex