It would be difficult to provide even a semblance of the complete product ,
however I could try to provide enough details to reproduce the problem.
Standard source would do:
DataStream<byte[]> stream = env.addSource(
new FlinkKafkaConsumer<>(topic, new
AbstractDeserializationSchema<byte[]>() {
@Override
public byte[] deserialize(byte[] bytes) throws IOException {
return bytes;
}
}, properties)).name(topic);
The operator body something like:
public class MultiStorePacketFunction extends
KeyedProcessFunction<String, SplitterToMultiStore, ClassifierOutput>
implements Serializable {
private transient ValueState<MultiStorePacketState> state;
@Override
public void processElement(SplitterToMultiStore packet, Context
ctx, Collector<ClassifierOutput> out) throws Exception {
if (packet.hasPackets()) {
storedPackets.inc(packet.getPackets().getPacketsCount());
}
MultiStorePacketState so = state.value();
if (process(packet, out, so, ctx)) {
state.update(null);
state.clear();
} else {
state.update(so);
}
}
public String generateNextFilename(String sessionKey, int partNumber) {
String path = DirectoryService.getInstance().bookDirectory();
return path + File.separator + sessionKey + "-" + partNumber + ".pcap";
}
private void storeContent(Collector<ClassifierOutput> collector,
MultiStorePacketState state, SplitterToMultiStore packets) throws
Exception {
assert (packets != null);
assert (packets.hasPackets());
if ( state.currentFile == null) {
openFile(collector, state, packets);
}
Utils.continueWriteToPcap(state.currentFile,
packets.getPackets().getPacketsList());
state.fileOffset = state.currentFile.length();
tryToCloseFile(collector, state);
}
static public String extractExportedFileName(String fileName) {
String path[] = fileName.split("/");
return path[path.length - 2] + "/" + path[path.length - 1];
}
private void openFile(Collector<ClassifierOutput> collector,
MultiStorePacketState state, SplitterToMultiStore packets) throws
Exception {
state.fileIsOpened = true;
state.fileName = generateNextFilename(state.sessionKey, state.partNumber);
state.exportedFileName = extractExportedFileName(state.fileName);
// -> Here RandomAccessFile created
state.currentFile = Utils.startWriteToPcap(state.fileName,
packets.getPackets().getPacketsList());
state.fileOffset = state.currentFile.length();
state.partNumber++;
}
private void tryToCloseFile(Collector<ClassifierOutput> collector,
MultiStorePacketState state) throws IOException {
if (state.currentFile.length() <
StorePacketConfigurationParameters.partSizeLimit) {
return;
}
closeFile(collector, state);
}
private void closeFile(Collector<ClassifierOutput> collector,
MultiStorePacketState state) throws IOException {
state.currentFile.close();
state.currentFile = null;
state.fileIsOpened = false;
ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
outputBuilder.getUsualBuilder().setFileName(state.exportedFileName);
outputBuilder.setSessionType(SessionType.Multi);
outputBuilder.setSessionKey(state.sessionKey);
var classifierOutput = outputBuilder.build();
state.sessionMetadata.add(classifierOutput);
collector.collect(classifierOutput);
}
public boolean process(SplitterToMultiStore packet,
Collector<ClassifierOutput> collector, MultiStorePacketState so,
Context context) throws Exception {
// First message
if (packet.hasClassificationResult()) {
sendClassificationResult(packet, collector, so);
return false;
}
// Last message
if (packet.hasSessionClosure()) {
if (so.isCoverageIncorrect) {
return true;
}
handleSessionClosure(packet, collector, so, context);
return true;
}
if (so.isCoverageIncorrect) {
return false;
}
storeContent(collector, so, packet);
// File could be already close e.g. it reached expected size.
if (so.currentFile != null) {
setupTimer(so, context.timerService());
}
return false;
}
private void handleSessionClosure(SplitterToMultiStore packet,
Collector<ClassifierOutput> collector, MultiStorePacketState so,
Context context) throws IOException {
if (so.currentFile != null) {
closeFile(collector, so);
}
ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
outputBuilder.setSessionKey(packet.getSessionKey());
outputBuilder.setSessionType(packet.getSessionType());
var messageBuilder = outputBuilder.getLastBuilder();
messageBuilder.addAllAggregatedSession(so.sessionMetadata);
outputBuilder.setLast(messageBuilder.build());
var output = outputBuilder.build();
collector.collect(output);
context.timerService().deleteProcessingTimeTimer(so.timerValue);
state.clear();
}
private void sendClassificationResult(SplitterToMultiStore packet,
Collector<ClassifierOutput> collector, MultiStorePacketState so) {
var coverageResult =
CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID());
so.isCoverageIncorrect = !coverageResult.getValue0();
if (so.isCoverageIncorrect) {
return;
}
ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult());
outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID());
outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN());
outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId());
outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1());
outputBuilder.setSessionKey(packet.getSessionKey());
outputBuilder.setSessionType(packet.getSessionType());
so.sessionKey = packet.getSessionKey();
var classifierOutput = outputBuilder.build();
so.sessionMetadata.add(classifierOutput);
collector.collect(classifierOutput);
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<MultiStorePacketState> descriptor =
new ValueStateDescriptor<MultiStorePacketState>(
"MultiStorePacketState", // the state name
TypeInformation.of(new
TypeHint<MultiStorePacketState>() {}), // type information
new MultiStorePacketState()); // default value of
the state, if nothing was set
state = getRuntimeContext().getState(descriptor);
StorePacketConfigurationParameters.init();
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<ClassifierOutput> collector) throws Exception {
MultiStorePacketState so = state.value();
if (so.currentFile != null) {
closeFile(collector, so);
}
ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
state.update(so);
}
private void setupTimer(MultiStorePacketState so, TimerService
timerService) {
// Cancel previous timer
timerService.deleteProcessingTimeTimer(so.timerValue);
// Register new timer
so.timerValue = (timerService.currentProcessingTime() +
StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000;
timerService.registerProcessingTimeTimer(so.timerValue);
}
}
The state class looks like following:
public class MultiStorePacketState implements Serializable {
private /*static*/ Logger logger =
LoggerFactory.getLogger(MultiStorePacketState.class);
public transient RandomAccessFile currentFile = null;
public long timerValue;
public String fileName;
public String exportedFileName;
public String sessionKey;
public long fileOffset = 0;
public int partNumber = 0;
public boolean isCoverageIncorrect = false;
public boolean fileIsOpened = false;
public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>();
private void readObject(ObjectInputStream ois)
throws ClassNotFoundException, IOException {
ois.defaultReadObject();
logger.info("Deserialized MultiStorePacketState: " + this.toString());
// No need to do anything in case of empty file
if (fileName.isEmpty()) {
return;
}
currentFile = new RandomAccessFile(fileName,"rw");
currentFile.seek(fileOffset);
}
}
Input & output are Proto files , you could replace input with byte[] , and
remove output generation and calls to collector.
The test should generate & store data for a while, so at least once
checkpoint would be triggered. I used a checkpoint interval 5000ms on a
quite slow system.
Every data chunk is about 1k.
Utils.startWriteToPcap - new RandomAccessFile()
Utils.writeToPcap - should be replaced with currentFile.write()
пн, 11 окт. 2021 г. в 16:50, JING ZHANG <[email protected]>:
> Hi Alex,
> It is a little weird.
> Would you please provide the program which could reproduce the problem,
> including DataStream job code and related classes code. I need some debug
> to find out the reason.
>
> Best,
> JING ZHANG
>
>
> Alex Drobinsky <[email protected]> 于2021年10月11日周一 下午5:50写道:
>
>> Hi Jing Zhang,
>>
>> I'm using the FileSystem backend. I also implemented ReadObject function
>> to support proper restart procedure:
>>
>> private void readObject(ObjectInputStream ois)
>> throws ClassNotFoundException, IOException {
>> ois.defaultReadObject();
>> logger.info("Deserialized MultiStorePacketState: " + this.toString());
>>
>> // No need to do anything in case of empty file
>> if (fileName.isEmpty()) {
>> return;
>> }
>> currentFile = new RandomAccessFile(fileName,"rw");
>> currentFile.seek(fileOffset);
>> }
>>
>> However, according to logs this function wasn't called.
>>
>> Btw, it could be beneficial to add this kind of State object e.g. FileState
>> which will encapsulate serialization / deserialization for RandomAccessFile
>> although the concept itself is a bit contradictory to regular state.
>>
>> Currently, I implemented and tested a workaround via addition of the boolean
>> variable isFileOpened, however it's awkward because I need to check the
>> state of the transient variable every time I use state.value().
>>
>> So should it be expected that transient variables in state would be resetted
>> to default values ?
>>
>>
>> пн, 11 окт. 2021 г. в 12:33, JING ZHANG <[email protected]>:
>>
>>> Hi, Alex
>>> What state backend do you choose?
>>> If you choose MemoryStateBackend or FsStateBackend, `transient` keyword
>>> may not have effect because MemoryStateBackend does not serialize state for
>>> regular read/write accesses but keeps it as objects on the heap.
>>> If you choose RocksDBStateBackend, I thought it was expected behavior
>>> because RocksDBStateBackend stores all state as byte arrays in embedded
>>> RocksDB instances. Therefore, it de/serializes the state of a key for every
>>> read/write access. CurrentFile is null because the transient variable
>>> would not be serialized by default.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>>
>>> Alex Drobinsky <[email protected]> 于2021年10月11日周一 下午4:33写道:
>>>
>>>> Dear flink community,
>>>>
>>>> I have following state class ( irrelevant fields removed )
>>>> public class MultiStorePacketState implements Serializable {
>>>>
>>>> public transient RandomAccessFile currentFile = null;
>>>> public long timerValue;
>>>> public String fileName;
>>>> public String exportedFileName;
>>>> public String sessionKey;
>>>> public long fileOffset = 0;
>>>>
>>>> }
>>>>
>>>> Once in a while, currentFile became *nullified, *this happens after I
>>>> extract state via
>>>>
>>>> MultiStorePacketState so = state.value();
>>>>
>>>> The frequency of this behaviour is similar to checkpoint interval (
>>>> checkpoint interval defined as 5 seconds and first occurence of this
>>>> problem is also 5 seconds), otherwise I don't have any clues to a possible
>>>> explanation.
>>>>
>>>> Is it an expected side effect of checkpoint procedure ?
>>>>
>>>> Best regards,
>>>> Alex
>>>>
>>>>