That's what I would try out, but I'm not sure if the statebackend would pick that up. @Yun Tang <myas...@live.com> do you know more?
On Mon, Oct 18, 2021 at 9:37 AM Alex Drobinsky <alex.drobin...@gmail.com> wrote: > Hi Arvid, > > It sounds like a good direction, do I need to register my state class with > KryoSerializer , similar to this ? > > env.getConfig().registerTypeWithKryoSerializer(IPSessionOrganizer.proto.SourceOutput.class, > ProtobufSerializer.class); > > > > пн, 18 окт. 2021 г. в 10:32, Arvid Heise <ar...@apache.org>: > >> Hi Alex, >> >> could you also log the identifity hashcode (or something similar) of the >> related instance? I suspect that it's not the field that is set to null but >> that you get a clone where the field is null. In that case, you need to add >> a specific KryoSerializer to initialize it (or just go with a lazy access >> pattern all the way). >> >> On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky <alex.drobin...@gmail.com> >> wrote: >> >>> Hi Jing, >>> >>> Job doesn't restart from the checkpoint, it's a brand new clean job , no >>> exceptions happened during execution, no restarts :) >>> The state is a Keyed State so a new key means a new State - in this >>> situation a currentFile is equal to null - as expected and handled without >>> issues. >>> Before I even thought to inquire about my questions, the first thing I >>> did - I added log messages with the value of currentFile in any place it >>> could be changed. >>> So I checked that before I release my control to Flink, currentFile has >>> the correct value and after I receive value from state in the next >>> iteration it's set to null. >>> The checkpoints by themselves could be irrelevant to the problem, the >>> only indication of connection is my assumption based on observation that >>> the interval between first event and first occurrence of nullification is >>> exactly the same as the checkpoint interval. >>> >>> Yun Tang - you are correct, it's a KryoSerializer, if I remove the >>> "transient" qualifier from currentFile, it crashes inside of KryoSerializer >>> because RandomAccessFile isn't serializable. >>> Which also points to the fact that at least once serialization was >>> actually executed. I will try an alternative approach - I will add my own >>> writeObject implementation, it should work :) >>> >>> Best regards, >>> Alex >>> >>> >>> >>> >>> >>> >>> вт, 12 окт. 2021 г. в 15:07, JING ZHANG <beyond1...@gmail.com>: >>> >>>> Hi Alex, >>>> Since you use `FileSystemStateBackend`, I think currentFile became >>>> nullified once in a while is not caused by period checkpoint. >>>> >>>> Because if job is running without failover or restore from checkpoint, >>>> read/write value state on `FileSystemStateBackend` does not cause >>>> serialization and deserialization at all. I have already simplify your >>>> coding and verify this point. If you are interested, you can find this >>>> simplified code in the attachment of the email. >>>> >>>> There are some possible reasons come to my mind, I hope this helps. >>>> 1. Does job restore from checkpoint/savepoint? This may caused by >>>> failover or user trigger stop-with-savepoint. >>>> 2. If job does not restore from checkpoint or savepoint. >>>> 2.1 After read the MultiStorePacketState from ValueState, is >>>> there somewhere in your program to update the currentFile field to null >>>> again? Because the state stored in heap, it may be changed if program >>>> updates its value somewhere. >>>> 2.2 When the currentFile became null, is there any possible that >>>> current key never appear before? that is it's the first time that the >>>> current key appears, so get state would return default value(a new >>>> MultiStorePacketState instance with null currentFile) >>>> >>>> Best, >>>> JING ZHANG >>>> >>>> Yun Tang <myas...@live.com> 于2021年10月12日周二 下午4:41写道: >>>> >>>>> Hi Alex, >>>>> >>>>> Since you use customized MultiStorePacketState class as the value >>>>> state type, it should use kryo serializer [1] to serialize your class via >>>>> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I >>>>> don't know whether Kryo would serialize your transient field. >>>>> If you're not familiar with Flink's serialization stack, I think you >>>>> could check behaviors below: >>>>> >>>>> 1. Without any checkpoint restore, use FileSystemStateBackend to >>>>> see whether the transient field could be read as expected, the answer >>>>> should be yes. >>>>> 2. After restoring from checkpoint, check whether could read the >>>>> transient field back if using FileSystemStateBackend. >>>>> >>>>> >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class >>>>> >>>>> Best >>>>> Yun Tang >>>>> >>>>> >>>>> ------------------------------ >>>>> *From:* Alex Drobinsky <alex.drobin...@gmail.com> >>>>> *Sent:* Monday, October 11, 2021 22:37 >>>>> *To:* JING ZHANG <beyond1...@gmail.com> >>>>> *Cc:* User-Flink <user@flink.apache.org> >>>>> *Subject:* Re: Reset of transient variables in state to default >>>>> values. >>>>> >>>>> It would be difficult to provide even a semblance of the complete >>>>> product , however I could try to provide enough details to reproduce the >>>>> problem. >>>>> Standard source would do: >>>>> >>>>> DataStream<byte[]> stream = env.addSource( >>>>> new FlinkKafkaConsumer<>(topic, new >>>>> AbstractDeserializationSchema<byte[]>() { >>>>> @Override >>>>> public byte[] deserialize(byte[] bytes) throws IOException { >>>>> return bytes; >>>>> } >>>>> }, properties)).name(topic); >>>>> >>>>> >>>>> The operator body something like: >>>>> >>>>> public class MultiStorePacketFunction extends >>>>> KeyedProcessFunction<String, SplitterToMultiStore, ClassifierOutput> >>>>> implements Serializable { >>>>> private transient ValueState<MultiStorePacketState> state; >>>>> >>>>> @Override >>>>> public void processElement(SplitterToMultiStore packet, Context ctx, >>>>> Collector<ClassifierOutput> out) throws Exception { >>>>> if (packet.hasPackets()) { >>>>> storedPackets.inc(packet.getPackets().getPacketsCount()); >>>>> } >>>>> >>>>> MultiStorePacketState so = state.value(); >>>>> if (process(packet, out, so, ctx)) { >>>>> state.update(null); >>>>> state.clear(); >>>>> } else { >>>>> state.update(so); >>>>> } >>>>> } >>>>> >>>>> public String generateNextFilename(String sessionKey, int partNumber) >>>>> { >>>>> String path = DirectoryService.getInstance().bookDirectory(); >>>>> return path + File.separator + sessionKey + "-" + partNumber + >>>>> ".pcap"; >>>>> } >>>>> >>>>> private void storeContent(Collector<ClassifierOutput> collector, >>>>> MultiStorePacketState state, SplitterToMultiStore packets) throws >>>>> Exception { >>>>> assert (packets != null); >>>>> assert (packets.hasPackets()); >>>>> >>>>> if ( state.currentFile == null) { >>>>> openFile(collector, state, packets); >>>>> } >>>>> >>>>> Utils.continueWriteToPcap(state.currentFile, >>>>> packets.getPackets().getPacketsList()); >>>>> state.fileOffset = state.currentFile.length(); >>>>> >>>>> tryToCloseFile(collector, state); >>>>> } >>>>> >>>>> static public String extractExportedFileName(String fileName) { >>>>> String path[] = fileName.split("/"); >>>>> return path[path.length - 2] + "/" + path[path.length - 1]; >>>>> } >>>>> >>>>> private void openFile(Collector<ClassifierOutput> collector, >>>>> MultiStorePacketState state, SplitterToMultiStore packets) throws >>>>> Exception { >>>>> state.fileIsOpened = true; >>>>> state.fileName = generateNextFilename(state.sessionKey, >>>>> state.partNumber); >>>>> state.exportedFileName = extractExportedFileName(state.fileName); >>>>> >>>>> // -> Here RandomAccessFile created >>>>> state.currentFile = Utils.startWriteToPcap(state.fileName, >>>>> packets.getPackets().getPacketsList()); >>>>> state.fileOffset = state.currentFile.length(); >>>>> state.partNumber++; >>>>> } >>>>> >>>>> private void tryToCloseFile(Collector<ClassifierOutput> collector, >>>>> MultiStorePacketState state) throws IOException { >>>>> if (state.currentFile.length() < >>>>> StorePacketConfigurationParameters.partSizeLimit) { >>>>> return; >>>>> } >>>>> closeFile(collector, state); >>>>> } >>>>> >>>>> private void closeFile(Collector<ClassifierOutput> collector, >>>>> MultiStorePacketState state) throws IOException { >>>>> state.currentFile.close(); >>>>> state.currentFile = null; >>>>> state.fileIsOpened = false; >>>>> ClassifierOutput.Builder outputBuilder = >>>>> ClassifierOutput.newBuilder(); >>>>> outputBuilder.getUsualBuilder().setFileName(state.exportedFileName); >>>>> outputBuilder.setSessionType(SessionType.Multi); >>>>> outputBuilder.setSessionKey(state.sessionKey); >>>>> var classifierOutput = outputBuilder.build(); >>>>> state.sessionMetadata.add(classifierOutput); >>>>> collector.collect(classifierOutput); >>>>> } >>>>> >>>>> public boolean process(SplitterToMultiStore packet, >>>>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context >>>>> context) throws Exception { >>>>> >>>>> // First message >>>>> if (packet.hasClassificationResult()) { >>>>> sendClassificationResult(packet, collector, so); >>>>> return false; >>>>> } >>>>> >>>>> // Last message >>>>> if (packet.hasSessionClosure()) { >>>>> if (so.isCoverageIncorrect) { >>>>> return true; >>>>> } >>>>> handleSessionClosure(packet, collector, so, context); >>>>> return true; >>>>> } >>>>> >>>>> if (so.isCoverageIncorrect) { >>>>> return false; >>>>> } >>>>> storeContent(collector, so, packet); >>>>> >>>>> // File could be already close e.g. it reached expected size. >>>>> if (so.currentFile != null) { >>>>> setupTimer(so, context.timerService()); >>>>> } >>>>> return false; >>>>> } >>>>> >>>>> private void handleSessionClosure(SplitterToMultiStore packet, >>>>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context >>>>> context) throws IOException { >>>>> if (so.currentFile != null) { >>>>> closeFile(collector, so); >>>>> } >>>>> ClassifierOutput.Builder outputBuilder = >>>>> ClassifierOutput.newBuilder(); >>>>> outputBuilder.setSessionKey(packet.getSessionKey()); >>>>> outputBuilder.setSessionType(packet.getSessionType()); >>>>> var messageBuilder = outputBuilder.getLastBuilder(); >>>>> messageBuilder.addAllAggregatedSession(so.sessionMetadata); >>>>> outputBuilder.setLast(messageBuilder.build()); >>>>> var output = outputBuilder.build(); >>>>> collector.collect(output); >>>>> context.timerService().deleteProcessingTimeTimer(so.timerValue); >>>>> state.clear(); >>>>> } >>>>> >>>>> private void sendClassificationResult(SplitterToMultiStore packet, >>>>> Collector<ClassifierOutput> collector, MultiStorePacketState so) { >>>>> var coverageResult = >>>>> CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID()); >>>>> so.isCoverageIncorrect = !coverageResult.getValue0(); >>>>> if (so.isCoverageIncorrect) { >>>>> return; >>>>> } >>>>> >>>>> ClassifierOutput.Builder outputBuilder = >>>>> ClassifierOutput.newBuilder(); >>>>> >>>>> outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult()); >>>>> >>>>> outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID()); >>>>> >>>>> outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN()); >>>>> >>>>> outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId()); >>>>> >>>>> outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1()); >>>>> outputBuilder.setSessionKey(packet.getSessionKey()); >>>>> outputBuilder.setSessionType(packet.getSessionType()); >>>>> so.sessionKey = packet.getSessionKey(); >>>>> var classifierOutput = outputBuilder.build(); >>>>> so.sessionMetadata.add(classifierOutput); >>>>> collector.collect(classifierOutput); >>>>> } >>>>> >>>>> @Override >>>>> public void open(Configuration config) { >>>>> ValueStateDescriptor<MultiStorePacketState> descriptor = >>>>> new ValueStateDescriptor<MultiStorePacketState>( >>>>> "MultiStorePacketState", // the state name >>>>> TypeInformation.of(new >>>>> TypeHint<MultiStorePacketState>() {}), // type information >>>>> new MultiStorePacketState()); // default value of the >>>>> state, if nothing was set >>>>> state = getRuntimeContext().getState(descriptor); >>>>> StorePacketConfigurationParameters.init(); >>>>> } >>>>> >>>>> @Override >>>>> public void onTimer(long timestamp, OnTimerContext ctx, >>>>> Collector<ClassifierOutput> collector) throws Exception { >>>>> MultiStorePacketState so = state.value(); >>>>> if (so.currentFile != null) { >>>>> closeFile(collector, so); >>>>> } >>>>> >>>>> ctx.timerService().deleteProcessingTimeTimer(so.timerValue); >>>>> state.update(so); >>>>> } >>>>> >>>>> private void setupTimer(MultiStorePacketState so, TimerService >>>>> timerService) { >>>>> // Cancel previous timer >>>>> timerService.deleteProcessingTimeTimer(so.timerValue); >>>>> // Register new timer >>>>> so.timerValue = (timerService.currentProcessingTime() + >>>>> StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000; >>>>> timerService.registerProcessingTimeTimer(so.timerValue); >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> The state class looks like following: >>>>> >>>>> public class MultiStorePacketState implements Serializable { >>>>> private /*static*/ Logger logger = >>>>> LoggerFactory.getLogger(MultiStorePacketState.class); >>>>> public transient RandomAccessFile currentFile = null; >>>>> public long timerValue; >>>>> public String fileName; >>>>> public String exportedFileName; >>>>> public String sessionKey; >>>>> public long fileOffset = 0; >>>>> public int partNumber = 0; >>>>> public boolean isCoverageIncorrect = false; >>>>> public boolean fileIsOpened = false; >>>>> public ArrayList<ClassifierOutput> sessionMetadata = new >>>>> ArrayList<>(); >>>>> >>>>> private void readObject(ObjectInputStream ois) >>>>> throws ClassNotFoundException, IOException { >>>>> ois.defaultReadObject(); >>>>> logger.info("Deserialized MultiStorePacketState: " + >>>>> this.toString()); >>>>> >>>>> // No need to do anything in case of empty file >>>>> if (fileName.isEmpty()) { >>>>> return; >>>>> } >>>>> currentFile = new RandomAccessFile(fileName,"rw"); >>>>> currentFile.seek(fileOffset); >>>>> } >>>>> } >>>>> >>>>> Input & output are Proto files , you could replace input with byte[] , >>>>> and remove output generation and calls to collector. >>>>> The test should generate & store data for a while, so at least once >>>>> checkpoint would be triggered. I used a checkpoint interval 5000ms on a >>>>> quite slow system. >>>>> Every data chunk is about 1k. >>>>> Utils.startWriteToPcap - new RandomAccessFile() >>>>> Utils.writeToPcap - should be replaced with currentFile.write() >>>>> >>>>> пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com>: >>>>> >>>>> Hi Alex, >>>>> It is a little weird. >>>>> Would you please provide the program which could reproduce the >>>>> problem, including DataStream job code and related classes code. I need >>>>> some debug to find out the reason. >>>>> >>>>> Best, >>>>> JING ZHANG >>>>> >>>>> >>>>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午5:50写道: >>>>> >>>>> Hi Jing Zhang, >>>>> >>>>> I'm using the FileSystem backend. I also implemented ReadObject >>>>> function to support proper restart procedure: >>>>> >>>>> private void readObject(ObjectInputStream ois) >>>>> throws ClassNotFoundException, IOException { >>>>> ois.defaultReadObject(); >>>>> logger.info("Deserialized MultiStorePacketState: " + this.toString()); >>>>> >>>>> // No need to do anything in case of empty file >>>>> if (fileName.isEmpty()) { >>>>> return; >>>>> } >>>>> currentFile = new RandomAccessFile(fileName,"rw"); >>>>> currentFile.seek(fileOffset); >>>>> } >>>>> >>>>> However, according to logs this function wasn't called. >>>>> >>>>> Btw, it could be beneficial to add this kind of State object e.g. >>>>> FileState which will encapsulate serialization / deserialization for >>>>> RandomAccessFile although the concept itself is a bit contradictory to >>>>> regular state. >>>>> >>>>> Currently, I implemented and tested a workaround via addition of the >>>>> boolean variable isFileOpened, however it's awkward because I need to >>>>> check the state of the transient variable every time I use state.value(). >>>>> >>>>> So should it be expected that transient variables in state would be >>>>> resetted to default values ? >>>>> >>>>> >>>>> пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com>: >>>>> >>>>> Hi, Alex >>>>> What state backend do you choose? >>>>> If you choose MemoryStateBackend or FsStateBackend, `transient` >>>>> keyword may not have effect because MemoryStateBackend does not serialize >>>>> state for regular read/write accesses but keeps it as objects on the heap. >>>>> If you choose RocksDBStateBackend, I thought it was expected behavior >>>>> because RocksDBStateBackend stores all state as byte arrays in embedded >>>>> RocksDB instances. Therefore, it de/serializes the state of a key for >>>>> every >>>>> read/write access. CurrentFile is null because the transient variable >>>>> would not be serialized by default. >>>>> >>>>> Best, >>>>> JING ZHANG >>>>> >>>>> >>>>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午4:33写道: >>>>> >>>>> Dear flink community, >>>>> >>>>> I have following state class ( irrelevant fields removed ) >>>>> public class MultiStorePacketState implements Serializable { >>>>> >>>>> public transient RandomAccessFile currentFile = null; >>>>> public long timerValue; >>>>> public String fileName; >>>>> public String exportedFileName; >>>>> public String sessionKey; >>>>> public long fileOffset = 0; >>>>> >>>>> } >>>>> >>>>> Once in a while, currentFile became *nullified, *this happens after I >>>>> extract state via >>>>> >>>>> MultiStorePacketState so = state.value(); >>>>> >>>>> The frequency of this behaviour is similar to checkpoint interval ( >>>>> checkpoint interval defined as 5 seconds and first occurence of this >>>>> problem is also 5 seconds), otherwise I don't have any clues to a >>>>> possible explanation. >>>>> >>>>> Is it an expected side effect of checkpoint procedure ? >>>>> >>>>> Best regards, >>>>> Alex >>>>> >>>>>