Hi Alex,

Since you use customized MultiStorePacketState class as the value state type, 
it should use kryo serializer [1] to serialize your class via accessing RocksDB 
state or checkpoint via FileSystemStateBackend, and I don't know whether Kryo 
would serialize your transient field.
If you're not familiar with Flink's serialization stack, I think you could 
check behaviors below:

  1.  Without any checkpoint restore, use FileSystemStateBackend to see whether 
the transient field could be read as expected, the answer should be yes.
  2.  After restoring from checkpoint, check whether could read the transient 
field back if using FileSystemStateBackend.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class

Best
Yun Tang


________________________________
From: Alex Drobinsky <alex.drobin...@gmail.com>
Sent: Monday, October 11, 2021 22:37
To: JING ZHANG <beyond1...@gmail.com>
Cc: User-Flink <user@flink.apache.org>
Subject: Re: Reset of transient variables in state to default values.

It would be difficult to provide even a semblance of the complete product , 
however I could try to provide enough details to reproduce the problem.
Standard source would do:

DataStream<byte[]> stream = env.addSource(
        new FlinkKafkaConsumer<>(topic, new 
AbstractDeserializationSchema<byte[]>() {
            @Override
            public byte[] deserialize(byte[] bytes) throws IOException {
                return bytes;
            }
        }, properties)).name(topic);

The operator body something like:


public class MultiStorePacketFunction extends KeyedProcessFunction<String, 
SplitterToMultiStore, ClassifierOutput> implements Serializable {
   private transient ValueState<MultiStorePacketState> state;

   @Override
   public void processElement(SplitterToMultiStore packet, Context ctx, 
Collector<ClassifierOutput> out) throws Exception {
      if (packet.hasPackets()) {
         storedPackets.inc(packet.getPackets().getPacketsCount());
      }

      MultiStorePacketState so = state.value();
      if (process(packet, out, so, ctx)) {
         state.update(null);
         state.clear();
      } else {
         state.update(so);
      }
   }

    public String generateNextFilename(String sessionKey, int partNumber) {
      String path = DirectoryService.getInstance().bookDirectory();
      return path + File.separator + sessionKey + "-" + partNumber + ".pcap";
   }

   private void storeContent(Collector<ClassifierOutput> collector, 
MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
      assert (packets != null);
      assert (packets.hasPackets());

      if ( state.currentFile == null) {
         openFile(collector, state, packets);
      }

      Utils.continueWriteToPcap(state.currentFile, 
packets.getPackets().getPacketsList());
      state.fileOffset = state.currentFile.length();

      tryToCloseFile(collector, state);
   }

   static public String extractExportedFileName(String fileName) {
      String path[] = fileName.split("/");
      return path[path.length - 2] + "/" + path[path.length - 1];
   }

   private void openFile(Collector<ClassifierOutput> collector, 
MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
      state.fileIsOpened = true;
      state.fileName = generateNextFilename(state.sessionKey, state.partNumber);
      state.exportedFileName = extractExportedFileName(state.fileName);

// -> Here RandomAccessFile created
      state.currentFile = Utils.startWriteToPcap(state.fileName, 
packets.getPackets().getPacketsList());
      state.fileOffset = state.currentFile.length();
      state.partNumber++;
   }

   private void tryToCloseFile(Collector<ClassifierOutput> collector, 
MultiStorePacketState state) throws IOException {
      if (state.currentFile.length() < 
StorePacketConfigurationParameters.partSizeLimit) {
         return;
      }
      closeFile(collector, state);
   }

   private void closeFile(Collector<ClassifierOutput> collector, 
MultiStorePacketState state) throws IOException {
      state.currentFile.close();
      state.currentFile = null;
      state.fileIsOpened = false;
      ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
      outputBuilder.getUsualBuilder().setFileName(state.exportedFileName);
      outputBuilder.setSessionType(SessionType.Multi);
      outputBuilder.setSessionKey(state.sessionKey);
      var classifierOutput = outputBuilder.build();
      state.sessionMetadata.add(classifierOutput);
      collector.collect(classifierOutput);
   }

    public boolean process(SplitterToMultiStore packet, 
Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
context) throws Exception {

      // First message
      if (packet.hasClassificationResult()) {
         sendClassificationResult(packet, collector, so);
         return false;
      }

      // Last message
      if (packet.hasSessionClosure()) {
         if (so.isCoverageIncorrect) {
            return true;
         }
         handleSessionClosure(packet, collector, so, context);
         return true;
      }

      if (so.isCoverageIncorrect) {
         return false;
      }
      storeContent(collector, so, packet);

      // File could be already close e.g. it reached expected size.
      if (so.currentFile != null) {
         setupTimer(so, context.timerService());
      }
      return false;
   }

   private void handleSessionClosure(SplitterToMultiStore packet, 
Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
context) throws IOException {
      if (so.currentFile != null) {
         closeFile(collector, so);
      }
      ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
      outputBuilder.setSessionKey(packet.getSessionKey());
      outputBuilder.setSessionType(packet.getSessionType());
      var messageBuilder = outputBuilder.getLastBuilder();
      messageBuilder.addAllAggregatedSession(so.sessionMetadata);
      outputBuilder.setLast(messageBuilder.build());
      var output = outputBuilder.build();
      collector.collect(output);
      context.timerService().deleteProcessingTimeTimer(so.timerValue);
      state.clear();
   }

   private void sendClassificationResult(SplitterToMultiStore packet, 
Collector<ClassifierOutput> collector, MultiStorePacketState so) {
      var coverageResult = 
CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID());
      so.isCoverageIncorrect = !coverageResult.getValue0();
      if (so.isCoverageIncorrect) {
         return;
      }

      ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
      
outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult());
      
outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID());
      
outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN());
      
outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId());
      outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1());
      outputBuilder.setSessionKey(packet.getSessionKey());
      outputBuilder.setSessionType(packet.getSessionType());
      so.sessionKey = packet.getSessionKey();
      var classifierOutput = outputBuilder.build();
      so.sessionMetadata.add(classifierOutput);
      collector.collect(classifierOutput);
   }

    @Override
   public void open(Configuration config) {
      ValueStateDescriptor<MultiStorePacketState> descriptor =
            new ValueStateDescriptor<MultiStorePacketState>(
                  "MultiStorePacketState", // the state name
                  TypeInformation.of(new TypeHint<MultiStorePacketState>() {}), 
// type information
                  new MultiStorePacketState()); // default value of the state, 
if nothing was set
      state = getRuntimeContext().getState(descriptor);
      StorePacketConfigurationParameters.init();
   }

   @Override
   public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<ClassifierOutput> collector) throws Exception {
      MultiStorePacketState so = state.value();
      if (so.currentFile != null) {
         closeFile(collector, so);
      }

      ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
      state.update(so);
   }

   private void setupTimer(MultiStorePacketState so, TimerService timerService) 
{
      // Cancel previous timer
      timerService.deleteProcessingTimeTimer(so.timerValue);
      // Register new timer
      so.timerValue = (timerService.currentProcessingTime() + 
StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000;
      timerService.registerProcessingTimeTimer(so.timerValue);
   }
}


The state class looks like following:


public class MultiStorePacketState implements Serializable {
    private /*static*/ Logger logger = 
LoggerFactory.getLogger(MultiStorePacketState.class);
    public transient RandomAccessFile currentFile = null;
    public long timerValue;
    public String fileName;
    public String exportedFileName;
    public String sessionKey;
    public long fileOffset = 0;
    public int partNumber = 0;
    public boolean isCoverageIncorrect = false;
    public boolean fileIsOpened = false;
    public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>();

    private void readObject(ObjectInputStream ois)
            throws ClassNotFoundException, IOException {
        ois.defaultReadObject();
        logger.info("Deserialized MultiStorePacketState: " + this.toString());

        // No need to do anything in case of empty file
        if (fileName.isEmpty()) {
            return;
        }
        currentFile = new RandomAccessFile(fileName,"rw");
        currentFile.seek(fileOffset);
    }
}

Input & output are Proto files , you could replace input with byte[] , and 
remove output generation and calls to collector.
The test should generate & store data for a while, so at least once checkpoint 
would be triggered. I used a checkpoint interval 5000ms on a quite slow system.
Every data chunk is about 1k.
Utils.startWriteToPcap - new RandomAccessFile()
Utils.writeToPcap - should be replaced with currentFile.write()

пн, 11 окт. 2021 г. в 16:50, JING ZHANG 
<beyond1...@gmail.com<mailto:beyond1...@gmail.com>>:
Hi Alex,
It is a little weird.
Would you please provide the program which could reproduce the problem, 
including DataStream job code and related classes code. I need some debug to 
find out the reason.

Best,
JING ZHANG


Alex Drobinsky <alex.drobin...@gmail.com<mailto:alex.drobin...@gmail.com>> 
于2021年10月11日周一 下午5:50写道:
Hi Jing Zhang,

I'm using the FileSystem backend. I also implemented ReadObject function to 
support proper restart procedure:

private void readObject(ObjectInputStream ois)
        throws ClassNotFoundException, IOException {
    ois.defaultReadObject();
    logger.info("Deserialized MultiStorePacketState: " + this.toString());

    // No need to do anything in case of empty file
    if (fileName.isEmpty()) {
        return;
    }
    currentFile = new RandomAccessFile(fileName,"rw");
    currentFile.seek(fileOffset);
}

However, according to logs this function wasn't called.

Btw, it could be beneficial to add this kind of State object e.g. FileState 
which will encapsulate serialization / deserialization for RandomAccessFile 
although the concept itself is a bit contradictory to regular state.

Currently, I implemented and tested a workaround via addition of the boolean 
variable isFileOpened, however it's awkward because I need to check the state 
of the transient variable every time I use state.value().

So should it be expected that transient variables in state would be resetted to 
default values ?

пн, 11 окт. 2021 г. в 12:33, JING ZHANG 
<beyond1...@gmail.com<mailto:beyond1...@gmail.com>>:
Hi, Alex
What state backend do you choose?
If you choose MemoryStateBackend or FsStateBackend, `transient` keyword may not 
have effect because MemoryStateBackend does not serialize state for regular 
read/write accesses but keeps it as objects on the heap.
If you choose RocksDBStateBackend, I thought it was expected behavior because 
RocksDBStateBackend stores all state as byte arrays in embedded RocksDB 
instances. Therefore, it de/serializes the state of a key for every read/write 
access.  CurrentFile is null because the transient variable would not be 
serialized by default.

Best,
JING ZHANG


Alex Drobinsky <alex.drobin...@gmail.com<mailto:alex.drobin...@gmail.com>> 
于2021年10月11日周一 下午4:33写道:
Dear flink community,

I have following state class ( irrelevant fields removed )
public class MultiStorePacketState implements Serializable {

    public transient RandomAccessFile currentFile = null;
    public long timerValue;
    public String fileName;
    public String exportedFileName;
    public String sessionKey;
    public long fileOffset = 0;

}

Once in a while, currentFile became nullified, this happens after I extract 
state via

MultiStorePacketState so = state.value();

The frequency of this behaviour is similar to checkpoint interval ( checkpoint 
interval defined as 5 seconds and first occurence of this problem is also 5 
seconds), otherwise I don't have any clues to a possible explanation.

Is it an expected side effect of checkpoint procedure ?

Best regards,
Alex

Reply via email to