That's what I would try out, but I'm not sure if the statebackend would
pick that up. @Yun Tang <myas...@live.com> do you know more?

On Mon, Oct 18, 2021 at 9:37 AM Alex Drobinsky <alex.drobin...@gmail.com>
wrote:

> Hi Arvid,
>
> It sounds like a good direction, do I need to register my state class with
> KryoSerializer , similar to this ?
>
> env.getConfig().registerTypeWithKryoSerializer(IPSessionOrganizer.proto.SourceOutput.class,
>  ProtobufSerializer.class);
>
>
>
> пн, 18 окт. 2021 г. в 10:32, Arvid Heise <ar...@apache.org>:
>
>> Hi Alex,
>>
>> could you also log the identifity hashcode (or something similar) of the
>> related instance? I suspect that it's not the field that is set to null but
>> that you get a clone where the field is null. In that case, you need to add
>> a specific KryoSerializer to initialize it (or just go with a lazy access
>> pattern all the way).
>>
>> On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky <alex.drobin...@gmail.com>
>> wrote:
>>
>>> Hi Jing,
>>>
>>> Job doesn't restart from the checkpoint, it's a brand new clean job , no
>>> exceptions happened during execution, no restarts :)
>>> The state is a Keyed State so a new key means a new State - in this
>>> situation a currentFile is equal to null - as expected and handled without
>>> issues.
>>> Before I even thought to inquire about my questions, the first thing I
>>> did - I added log messages with the value of currentFile in any place it
>>> could be changed.
>>> So I checked that before I release my control to Flink, currentFile has
>>> the correct value and after I receive value from state in the next
>>> iteration it's set to null.
>>> The checkpoints by themselves could be irrelevant to the problem, the
>>> only indication of connection is my assumption based on observation that
>>> the interval between first event and first occurrence of nullification is
>>> exactly the same as the checkpoint interval.
>>>
>>> Yun Tang - you are correct, it's a KryoSerializer, if I remove the
>>> "transient" qualifier from currentFile, it crashes inside of KryoSerializer
>>> because RandomAccessFile isn't serializable.
>>> Which also points to the fact that at least once serialization was
>>> actually executed. I will try an alternative approach - I will add my own
>>> writeObject implementation, it should work :)
>>>
>>> Best regards,
>>> Alex
>>>
>>>
>>>
>>>
>>>
>>>
>>> вт, 12 окт. 2021 г. в 15:07, JING ZHANG <beyond1...@gmail.com>:
>>>
>>>> Hi Alex,
>>>> Since you use `FileSystemStateBackend`, I think currentFile became
>>>> nullified once in a while is not caused by period checkpoint.
>>>>
>>>> Because if job is running without failover or restore from checkpoint,
>>>> read/write value state on `FileSystemStateBackend` does not cause
>>>> serialization and deserialization at all. I have already simplify your
>>>> coding and verify this point. If you are interested, you can find this
>>>> simplified code in the attachment of the email.
>>>>
>>>> There are some possible reasons come to my mind, I hope this helps.
>>>> 1. Does job restore from checkpoint/savepoint? This may caused by
>>>> failover or user trigger stop-with-savepoint.
>>>> 2. If job does not restore from checkpoint or savepoint.
>>>>      2.1 After read the MultiStorePacketState from ValueState, is
>>>> there somewhere in your program to update the currentFile field to null
>>>> again? Because the state stored in heap,  it may be changed if program
>>>> updates its value somewhere.
>>>>      2.2 When the currentFile became null, is there any possible that
>>>> current key never appear before? that is it's the first time that the
>>>> current key appears, so get state would return default value(a new
>>>> MultiStorePacketState instance with null currentFile)
>>>>
>>>> Best,
>>>> JING ZHANG
>>>>
>>>> Yun Tang <myas...@live.com> 于2021年10月12日周二 下午4:41写道:
>>>>
>>>>> Hi Alex,
>>>>>
>>>>> Since you use customized MultiStorePacketState class as the value
>>>>> state type, it should use kryo serializer [1] to serialize your class via
>>>>> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I
>>>>> don't know whether Kryo would serialize your transient field.
>>>>> If you're not familiar with Flink's serialization stack, I think you
>>>>> could check behaviors below:
>>>>>
>>>>>    1. Without any checkpoint restore, use FileSystemStateBackend to
>>>>>    see whether the transient field could be read as expected, the answer
>>>>>    should be yes.
>>>>>    2. After restoring from checkpoint, check whether could read the
>>>>>    transient field back if using FileSystemStateBackend.
>>>>>
>>>>>
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class
>>>>>
>>>>> Best
>>>>> Yun Tang
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* Alex Drobinsky <alex.drobin...@gmail.com>
>>>>> *Sent:* Monday, October 11, 2021 22:37
>>>>> *To:* JING ZHANG <beyond1...@gmail.com>
>>>>> *Cc:* User-Flink <user@flink.apache.org>
>>>>> *Subject:* Re: Reset of transient variables in state to default
>>>>> values.
>>>>>
>>>>> It would be difficult to provide even a semblance of the complete
>>>>> product , however I could try to provide enough details to reproduce the
>>>>> problem.
>>>>> Standard source would do:
>>>>>
>>>>> DataStream<byte[]> stream = env.addSource(
>>>>>         new FlinkKafkaConsumer<>(topic, new 
>>>>> AbstractDeserializationSchema<byte[]>() {
>>>>>             @Override
>>>>>             public byte[] deserialize(byte[] bytes) throws IOException {
>>>>>                 return bytes;
>>>>>             }
>>>>>         }, properties)).name(topic);
>>>>>
>>>>>
>>>>> The operator body something like:
>>>>>
>>>>> public class MultiStorePacketFunction extends 
>>>>> KeyedProcessFunction<String, SplitterToMultiStore, ClassifierOutput> 
>>>>> implements Serializable {
>>>>>    private transient ValueState<MultiStorePacketState> state;
>>>>>
>>>>>    @Override
>>>>>    public void processElement(SplitterToMultiStore packet, Context ctx, 
>>>>> Collector<ClassifierOutput> out) throws Exception {
>>>>>       if (packet.hasPackets()) {
>>>>>          storedPackets.inc(packet.getPackets().getPacketsCount());
>>>>>       }
>>>>>
>>>>>       MultiStorePacketState so = state.value();
>>>>>       if (process(packet, out, so, ctx)) {
>>>>>          state.update(null);
>>>>>          state.clear();
>>>>>       } else {
>>>>>          state.update(so);
>>>>>       }
>>>>>    }
>>>>>
>>>>>     public String generateNextFilename(String sessionKey, int partNumber) 
>>>>> {
>>>>>       String path = DirectoryService.getInstance().bookDirectory();
>>>>>       return path + File.separator + sessionKey + "-" + partNumber + 
>>>>> ".pcap";
>>>>>    }
>>>>>
>>>>>    private void storeContent(Collector<ClassifierOutput> collector, 
>>>>> MultiStorePacketState state, SplitterToMultiStore packets) throws 
>>>>> Exception {
>>>>>       assert (packets != null);
>>>>>       assert (packets.hasPackets());
>>>>>
>>>>>       if ( state.currentFile == null) {
>>>>>          openFile(collector, state, packets);
>>>>>       }
>>>>>
>>>>>       Utils.continueWriteToPcap(state.currentFile, 
>>>>> packets.getPackets().getPacketsList());
>>>>>       state.fileOffset = state.currentFile.length();
>>>>>
>>>>>       tryToCloseFile(collector, state);
>>>>>    }
>>>>>
>>>>>    static public String extractExportedFileName(String fileName) {
>>>>>       String path[] = fileName.split("/");
>>>>>       return path[path.length - 2] + "/" + path[path.length - 1];
>>>>>    }
>>>>>
>>>>>    private void openFile(Collector<ClassifierOutput> collector, 
>>>>> MultiStorePacketState state, SplitterToMultiStore packets) throws 
>>>>> Exception {
>>>>>       state.fileIsOpened = true;
>>>>>       state.fileName = generateNextFilename(state.sessionKey, 
>>>>> state.partNumber);
>>>>>       state.exportedFileName = extractExportedFileName(state.fileName);
>>>>>
>>>>> // -> Here RandomAccessFile created
>>>>>       state.currentFile = Utils.startWriteToPcap(state.fileName, 
>>>>> packets.getPackets().getPacketsList());
>>>>>       state.fileOffset = state.currentFile.length();
>>>>>       state.partNumber++;
>>>>>    }
>>>>>
>>>>>    private void tryToCloseFile(Collector<ClassifierOutput> collector, 
>>>>> MultiStorePacketState state) throws IOException {
>>>>>       if (state.currentFile.length() < 
>>>>> StorePacketConfigurationParameters.partSizeLimit) {
>>>>>          return;
>>>>>       }
>>>>>       closeFile(collector, state);
>>>>>    }
>>>>>
>>>>>    private void closeFile(Collector<ClassifierOutput> collector, 
>>>>> MultiStorePacketState state) throws IOException {
>>>>>       state.currentFile.close();
>>>>>       state.currentFile = null;
>>>>>       state.fileIsOpened = false;
>>>>>       ClassifierOutput.Builder outputBuilder = 
>>>>> ClassifierOutput.newBuilder();
>>>>>       outputBuilder.getUsualBuilder().setFileName(state.exportedFileName);
>>>>>       outputBuilder.setSessionType(SessionType.Multi);
>>>>>       outputBuilder.setSessionKey(state.sessionKey);
>>>>>       var classifierOutput = outputBuilder.build();
>>>>>       state.sessionMetadata.add(classifierOutput);
>>>>>       collector.collect(classifierOutput);
>>>>>    }
>>>>>
>>>>>     public boolean process(SplitterToMultiStore packet, 
>>>>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
>>>>> context) throws Exception {
>>>>>
>>>>>       // First message
>>>>>       if (packet.hasClassificationResult()) {
>>>>>          sendClassificationResult(packet, collector, so);
>>>>>          return false;
>>>>>       }
>>>>>
>>>>>       // Last message
>>>>>       if (packet.hasSessionClosure()) {
>>>>>          if (so.isCoverageIncorrect) {
>>>>>             return true;
>>>>>          }
>>>>>          handleSessionClosure(packet, collector, so, context);
>>>>>          return true;
>>>>>       }
>>>>>
>>>>>       if (so.isCoverageIncorrect) {
>>>>>          return false;
>>>>>       }
>>>>>       storeContent(collector, so, packet);
>>>>>
>>>>>       // File could be already close e.g. it reached expected size.
>>>>>       if (so.currentFile != null) {
>>>>>          setupTimer(so, context.timerService());
>>>>>       }
>>>>>       return false;
>>>>>    }
>>>>>
>>>>>    private void handleSessionClosure(SplitterToMultiStore packet, 
>>>>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
>>>>> context) throws IOException {
>>>>>       if (so.currentFile != null) {
>>>>>          closeFile(collector, so);
>>>>>       }
>>>>>       ClassifierOutput.Builder outputBuilder = 
>>>>> ClassifierOutput.newBuilder();
>>>>>       outputBuilder.setSessionKey(packet.getSessionKey());
>>>>>       outputBuilder.setSessionType(packet.getSessionType());
>>>>>       var messageBuilder = outputBuilder.getLastBuilder();
>>>>>       messageBuilder.addAllAggregatedSession(so.sessionMetadata);
>>>>>       outputBuilder.setLast(messageBuilder.build());
>>>>>       var output = outputBuilder.build();
>>>>>       collector.collect(output);
>>>>>       context.timerService().deleteProcessingTimeTimer(so.timerValue);
>>>>>       state.clear();
>>>>>    }
>>>>>
>>>>>    private void sendClassificationResult(SplitterToMultiStore packet, 
>>>>> Collector<ClassifierOutput> collector, MultiStorePacketState so) {
>>>>>       var coverageResult = 
>>>>> CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID());
>>>>>       so.isCoverageIncorrect = !coverageResult.getValue0();
>>>>>       if (so.isCoverageIncorrect) {
>>>>>          return;
>>>>>       }
>>>>>
>>>>>       ClassifierOutput.Builder outputBuilder = 
>>>>> ClassifierOutput.newBuilder();
>>>>>       
>>>>> outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult());
>>>>>       
>>>>> outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID());
>>>>>       
>>>>> outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN());
>>>>>       
>>>>> outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId());
>>>>>       
>>>>> outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1());
>>>>>       outputBuilder.setSessionKey(packet.getSessionKey());
>>>>>       outputBuilder.setSessionType(packet.getSessionType());
>>>>>       so.sessionKey = packet.getSessionKey();
>>>>>       var classifierOutput = outputBuilder.build();
>>>>>       so.sessionMetadata.add(classifierOutput);
>>>>>       collector.collect(classifierOutput);
>>>>>    }
>>>>>
>>>>>     @Override
>>>>>    public void open(Configuration config) {
>>>>>       ValueStateDescriptor<MultiStorePacketState> descriptor =
>>>>>             new ValueStateDescriptor<MultiStorePacketState>(
>>>>>                   "MultiStorePacketState", // the state name
>>>>>                   TypeInformation.of(new 
>>>>> TypeHint<MultiStorePacketState>() {}), // type information
>>>>>                   new MultiStorePacketState()); // default value of the 
>>>>> state, if nothing was set
>>>>>       state = getRuntimeContext().getState(descriptor);
>>>>>       StorePacketConfigurationParameters.init();
>>>>>    }
>>>>>
>>>>>    @Override
>>>>>    public void onTimer(long timestamp, OnTimerContext ctx, 
>>>>> Collector<ClassifierOutput> collector) throws Exception {
>>>>>       MultiStorePacketState so = state.value();
>>>>>       if (so.currentFile != null) {
>>>>>          closeFile(collector, so);
>>>>>       }
>>>>>
>>>>>       ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
>>>>>       state.update(so);
>>>>>    }
>>>>>
>>>>>    private void setupTimer(MultiStorePacketState so, TimerService 
>>>>> timerService) {
>>>>>       // Cancel previous timer
>>>>>       timerService.deleteProcessingTimeTimer(so.timerValue);
>>>>>       // Register new timer
>>>>>       so.timerValue = (timerService.currentProcessingTime() + 
>>>>> StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000;
>>>>>       timerService.registerProcessingTimeTimer(so.timerValue);
>>>>>    }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> The state class looks like following:
>>>>>
>>>>> public class MultiStorePacketState implements Serializable {
>>>>>     private /*static*/ Logger logger = 
>>>>> LoggerFactory.getLogger(MultiStorePacketState.class);
>>>>>     public transient RandomAccessFile currentFile = null;
>>>>>     public long timerValue;
>>>>>     public String fileName;
>>>>>     public String exportedFileName;
>>>>>     public String sessionKey;
>>>>>     public long fileOffset = 0;
>>>>>     public int partNumber = 0;
>>>>>     public boolean isCoverageIncorrect = false;
>>>>>     public boolean fileIsOpened = false;
>>>>>     public ArrayList<ClassifierOutput> sessionMetadata = new 
>>>>> ArrayList<>();
>>>>>
>>>>>     private void readObject(ObjectInputStream ois)
>>>>>             throws ClassNotFoundException, IOException {
>>>>>         ois.defaultReadObject();
>>>>>         logger.info("Deserialized MultiStorePacketState: " + 
>>>>> this.toString());
>>>>>
>>>>>         // No need to do anything in case of empty file
>>>>>         if (fileName.isEmpty()) {
>>>>>             return;
>>>>>         }
>>>>>         currentFile = new RandomAccessFile(fileName,"rw");
>>>>>         currentFile.seek(fileOffset);
>>>>>     }
>>>>> }
>>>>>
>>>>> Input & output are Proto files , you could replace input with byte[] ,
>>>>> and remove output generation and calls to collector.
>>>>> The test should generate & store data for a while, so at least once
>>>>> checkpoint would be triggered. I used a checkpoint interval 5000ms on a
>>>>> quite slow system.
>>>>> Every data chunk is about 1k.
>>>>> Utils.startWriteToPcap - new RandomAccessFile()
>>>>> Utils.writeToPcap - should be replaced with currentFile.write()
>>>>>
>>>>> пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com>:
>>>>>
>>>>> Hi Alex,
>>>>> It is a little weird.
>>>>> Would you please provide the program which could reproduce the
>>>>> problem, including DataStream job code and related classes code. I need
>>>>> some debug to find out the reason.
>>>>>
>>>>> Best,
>>>>> JING ZHANG
>>>>>
>>>>>
>>>>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午5:50写道:
>>>>>
>>>>> Hi Jing Zhang,
>>>>>
>>>>> I'm using the FileSystem backend. I also implemented ReadObject
>>>>> function to support proper restart procedure:
>>>>>
>>>>> private void readObject(ObjectInputStream ois)
>>>>>         throws ClassNotFoundException, IOException {
>>>>>     ois.defaultReadObject();
>>>>>     logger.info("Deserialized MultiStorePacketState: " + this.toString());
>>>>>
>>>>>     // No need to do anything in case of empty file
>>>>>     if (fileName.isEmpty()) {
>>>>>         return;
>>>>>     }
>>>>>     currentFile = new RandomAccessFile(fileName,"rw");
>>>>>     currentFile.seek(fileOffset);
>>>>> }
>>>>>
>>>>> However, according to logs this function wasn't called.
>>>>>
>>>>> Btw, it could be beneficial to add this kind of State object e.g. 
>>>>> FileState which will encapsulate serialization / deserialization for 
>>>>> RandomAccessFile although the concept itself is a bit contradictory to 
>>>>> regular state.
>>>>>
>>>>> Currently, I implemented and tested a workaround via addition of the 
>>>>> boolean variable isFileOpened, however it's awkward because I need to 
>>>>> check the state of the transient variable every time I use state.value().
>>>>>
>>>>> So should it be expected that transient variables in state would be 
>>>>> resetted to default values ?
>>>>>
>>>>>
>>>>> пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com>:
>>>>>
>>>>> Hi, Alex
>>>>> What state backend do you choose?
>>>>> If you choose MemoryStateBackend or FsStateBackend, `transient`
>>>>> keyword may not have effect because MemoryStateBackend does not serialize
>>>>> state for regular read/write accesses but keeps it as objects on the heap.
>>>>> If you choose RocksDBStateBackend, I thought it was expected behavior
>>>>> because RocksDBStateBackend stores all state as byte arrays in embedded
>>>>> RocksDB instances. Therefore, it de/serializes the state of a key for 
>>>>> every
>>>>> read/write access.  CurrentFile is null because the transient variable
>>>>> would not be serialized by default.
>>>>>
>>>>> Best,
>>>>> JING ZHANG
>>>>>
>>>>>
>>>>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午4:33写道:
>>>>>
>>>>> Dear flink community,
>>>>>
>>>>> I have following state class ( irrelevant fields removed )
>>>>> public class MultiStorePacketState implements Serializable {
>>>>>
>>>>>     public transient RandomAccessFile currentFile = null;
>>>>>     public long timerValue;
>>>>>     public String fileName;
>>>>>     public String exportedFileName;
>>>>>     public String sessionKey;
>>>>>     public long fileOffset = 0;
>>>>>
>>>>> }
>>>>>
>>>>> Once in a while, currentFile became *nullified, *this happens after I
>>>>> extract state via
>>>>>
>>>>> MultiStorePacketState so = state.value();
>>>>>
>>>>> The frequency of this behaviour is similar to checkpoint interval ( 
>>>>> checkpoint interval defined as 5 seconds and first occurence of this 
>>>>> problem is also 5 seconds), otherwise I don't have any clues to a 
>>>>> possible explanation.
>>>>>
>>>>> Is it an expected side effect of checkpoint procedure ?
>>>>>
>>>>> Best regards,
>>>>> Alex
>>>>>
>>>>>

Reply via email to