It would be difficult to provide even a semblance of the complete product , however I could try to provide enough details to reproduce the problem. Standard source would do:
DataStream<byte[]> stream = env.addSource( new FlinkKafkaConsumer<>(topic, new AbstractDeserializationSchema<byte[]>() { @Override public byte[] deserialize(byte[] bytes) throws IOException { return bytes; } }, properties)).name(topic); The operator body something like: public class MultiStorePacketFunction extends KeyedProcessFunction<String, SplitterToMultiStore, ClassifierOutput> implements Serializable { private transient ValueState<MultiStorePacketState> state; @Override public void processElement(SplitterToMultiStore packet, Context ctx, Collector<ClassifierOutput> out) throws Exception { if (packet.hasPackets()) { storedPackets.inc(packet.getPackets().getPacketsCount()); } MultiStorePacketState so = state.value(); if (process(packet, out, so, ctx)) { state.update(null); state.clear(); } else { state.update(so); } } public String generateNextFilename(String sessionKey, int partNumber) { String path = DirectoryService.getInstance().bookDirectory(); return path + File.separator + sessionKey + "-" + partNumber + ".pcap"; } private void storeContent(Collector<ClassifierOutput> collector, MultiStorePacketState state, SplitterToMultiStore packets) throws Exception { assert (packets != null); assert (packets.hasPackets()); if ( state.currentFile == null) { openFile(collector, state, packets); } Utils.continueWriteToPcap(state.currentFile, packets.getPackets().getPacketsList()); state.fileOffset = state.currentFile.length(); tryToCloseFile(collector, state); } static public String extractExportedFileName(String fileName) { String path[] = fileName.split("/"); return path[path.length - 2] + "/" + path[path.length - 1]; } private void openFile(Collector<ClassifierOutput> collector, MultiStorePacketState state, SplitterToMultiStore packets) throws Exception { state.fileIsOpened = true; state.fileName = generateNextFilename(state.sessionKey, state.partNumber); state.exportedFileName = extractExportedFileName(state.fileName); // -> Here RandomAccessFile created state.currentFile = Utils.startWriteToPcap(state.fileName, packets.getPackets().getPacketsList()); state.fileOffset = state.currentFile.length(); state.partNumber++; } private void tryToCloseFile(Collector<ClassifierOutput> collector, MultiStorePacketState state) throws IOException { if (state.currentFile.length() < StorePacketConfigurationParameters.partSizeLimit) { return; } closeFile(collector, state); } private void closeFile(Collector<ClassifierOutput> collector, MultiStorePacketState state) throws IOException { state.currentFile.close(); state.currentFile = null; state.fileIsOpened = false; ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder(); outputBuilder.getUsualBuilder().setFileName(state.exportedFileName); outputBuilder.setSessionType(SessionType.Multi); outputBuilder.setSessionKey(state.sessionKey); var classifierOutput = outputBuilder.build(); state.sessionMetadata.add(classifierOutput); collector.collect(classifierOutput); } public boolean process(SplitterToMultiStore packet, Collector<ClassifierOutput> collector, MultiStorePacketState so, Context context) throws Exception { // First message if (packet.hasClassificationResult()) { sendClassificationResult(packet, collector, so); return false; } // Last message if (packet.hasSessionClosure()) { if (so.isCoverageIncorrect) { return true; } handleSessionClosure(packet, collector, so, context); return true; } if (so.isCoverageIncorrect) { return false; } storeContent(collector, so, packet); // File could be already close e.g. it reached expected size. if (so.currentFile != null) { setupTimer(so, context.timerService()); } return false; } private void handleSessionClosure(SplitterToMultiStore packet, Collector<ClassifierOutput> collector, MultiStorePacketState so, Context context) throws IOException { if (so.currentFile != null) { closeFile(collector, so); } ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder(); outputBuilder.setSessionKey(packet.getSessionKey()); outputBuilder.setSessionType(packet.getSessionType()); var messageBuilder = outputBuilder.getLastBuilder(); messageBuilder.addAllAggregatedSession(so.sessionMetadata); outputBuilder.setLast(messageBuilder.build()); var output = outputBuilder.build(); collector.collect(output); context.timerService().deleteProcessingTimeTimer(so.timerValue); state.clear(); } private void sendClassificationResult(SplitterToMultiStore packet, Collector<ClassifierOutput> collector, MultiStorePacketState so) { var coverageResult = CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID()); so.isCoverageIncorrect = !coverageResult.getValue0(); if (so.isCoverageIncorrect) { return; } ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder(); outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult()); outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID()); outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN()); outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId()); outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1()); outputBuilder.setSessionKey(packet.getSessionKey()); outputBuilder.setSessionType(packet.getSessionType()); so.sessionKey = packet.getSessionKey(); var classifierOutput = outputBuilder.build(); so.sessionMetadata.add(classifierOutput); collector.collect(classifierOutput); } @Override public void open(Configuration config) { ValueStateDescriptor<MultiStorePacketState> descriptor = new ValueStateDescriptor<MultiStorePacketState>( "MultiStorePacketState", // the state name TypeInformation.of(new TypeHint<MultiStorePacketState>() {}), // type information new MultiStorePacketState()); // default value of the state, if nothing was set state = getRuntimeContext().getState(descriptor); StorePacketConfigurationParameters.init(); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<ClassifierOutput> collector) throws Exception { MultiStorePacketState so = state.value(); if (so.currentFile != null) { closeFile(collector, so); } ctx.timerService().deleteProcessingTimeTimer(so.timerValue); state.update(so); } private void setupTimer(MultiStorePacketState so, TimerService timerService) { // Cancel previous timer timerService.deleteProcessingTimeTimer(so.timerValue); // Register new timer so.timerValue = (timerService.currentProcessingTime() + StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000; timerService.registerProcessingTimeTimer(so.timerValue); } } The state class looks like following: public class MultiStorePacketState implements Serializable { private /*static*/ Logger logger = LoggerFactory.getLogger(MultiStorePacketState.class); public transient RandomAccessFile currentFile = null; public long timerValue; public String fileName; public String exportedFileName; public String sessionKey; public long fileOffset = 0; public int partNumber = 0; public boolean isCoverageIncorrect = false; public boolean fileIsOpened = false; public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>(); private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { ois.defaultReadObject(); logger.info("Deserialized MultiStorePacketState: " + this.toString()); // No need to do anything in case of empty file if (fileName.isEmpty()) { return; } currentFile = new RandomAccessFile(fileName,"rw"); currentFile.seek(fileOffset); } } Input & output are Proto files , you could replace input with byte[] , and remove output generation and calls to collector. The test should generate & store data for a while, so at least once checkpoint would be triggered. I used a checkpoint interval 5000ms on a quite slow system. Every data chunk is about 1k. Utils.startWriteToPcap - new RandomAccessFile() Utils.writeToPcap - should be replaced with currentFile.write() пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com>: > Hi Alex, > It is a little weird. > Would you please provide the program which could reproduce the problem, > including DataStream job code and related classes code. I need some debug > to find out the reason. > > Best, > JING ZHANG > > > Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午5:50写道: > >> Hi Jing Zhang, >> >> I'm using the FileSystem backend. I also implemented ReadObject function >> to support proper restart procedure: >> >> private void readObject(ObjectInputStream ois) >> throws ClassNotFoundException, IOException { >> ois.defaultReadObject(); >> logger.info("Deserialized MultiStorePacketState: " + this.toString()); >> >> // No need to do anything in case of empty file >> if (fileName.isEmpty()) { >> return; >> } >> currentFile = new RandomAccessFile(fileName,"rw"); >> currentFile.seek(fileOffset); >> } >> >> However, according to logs this function wasn't called. >> >> Btw, it could be beneficial to add this kind of State object e.g. FileState >> which will encapsulate serialization / deserialization for RandomAccessFile >> although the concept itself is a bit contradictory to regular state. >> >> Currently, I implemented and tested a workaround via addition of the boolean >> variable isFileOpened, however it's awkward because I need to check the >> state of the transient variable every time I use state.value(). >> >> So should it be expected that transient variables in state would be resetted >> to default values ? >> >> >> пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com>: >> >>> Hi, Alex >>> What state backend do you choose? >>> If you choose MemoryStateBackend or FsStateBackend, `transient` keyword >>> may not have effect because MemoryStateBackend does not serialize state for >>> regular read/write accesses but keeps it as objects on the heap. >>> If you choose RocksDBStateBackend, I thought it was expected behavior >>> because RocksDBStateBackend stores all state as byte arrays in embedded >>> RocksDB instances. Therefore, it de/serializes the state of a key for every >>> read/write access. CurrentFile is null because the transient variable >>> would not be serialized by default. >>> >>> Best, >>> JING ZHANG >>> >>> >>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午4:33写道: >>> >>>> Dear flink community, >>>> >>>> I have following state class ( irrelevant fields removed ) >>>> public class MultiStorePacketState implements Serializable { >>>> >>>> public transient RandomAccessFile currentFile = null; >>>> public long timerValue; >>>> public String fileName; >>>> public String exportedFileName; >>>> public String sessionKey; >>>> public long fileOffset = 0; >>>> >>>> } >>>> >>>> Once in a while, currentFile became *nullified, *this happens after I >>>> extract state via >>>> >>>> MultiStorePacketState so = state.value(); >>>> >>>> The frequency of this behaviour is similar to checkpoint interval ( >>>> checkpoint interval defined as 5 seconds and first occurence of this >>>> problem is also 5 seconds), otherwise I don't have any clues to a possible >>>> explanation. >>>> >>>> Is it an expected side effect of checkpoint procedure ? >>>> >>>> Best regards, >>>> Alex >>>> >>>>