Thanks David for the hints.
I checked the usage of the state API and for me it seems to be correct, but I am a new Flink users.
Checkpoints happen eachs minute, the scaleing I trigger after 30 minutes.
The source and sink are Kafka topics in EXACTLY_ONCE mode.
I tried to simplify the code, but didnt test if it still cause the issue, but maybe it already shows some misuage of the state API.
BR
Martin
public class MyRecord2ProcessFunction extends KeyedProcessFunction<Long, MyRecord2, MyRecord2> {
private final OutputTag<ErroneousMyRecord2> sideOutputLateData;
private transient UUID applicationId;
private transient ValueState<Long> stateRecordSequenceNumber;
public MyRecord2ProcessFunction(OutputTag<ErroneousMyRecord2> sideOutputLateData) {
this.sideOutputLateData = sideOutputLateData;
}
public static MyRecord2ProcessFunction create(OutputTag<ErroneousMyRecord2> sideOutputLateData) {
return new MyRecord2ProcessFunction(sideOutputLateData);
}
@Override
public String toString() {
return "MyRecord2ProcessFunction()";
}
@Override
public void open(Configuration configuration) throws Exception {
// Parameters
parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
long parameterTTL = parameters.getLong("processing.ttl.days", 1L);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(parameterTTL))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
applicationId = UuidUtil.getTimeBasedUuid();
ValueStateDescriptor<Long> recordSequenceNumberStateDescriptor = new ValueStateDescriptor<>("recordSequenceNumber", Long.class);
recordSequenceNumberStateDescriptor.enableTimeToLive((ttlConfig));
stateRecordSequenceNumber = getRuntimeContext().getState(recordSequenceNumberStateDescriptor);
}
@Override
public void processElement(MyRecord2 MyRecord2, KeyedProcessFunction<Long, MyRecord2, MyRecord2>.Context ctx, Collector<MyRecord2> out) throws Exception {
MyRecord2.setRecordingNetworkFunctionID(applicationId);
if(MyRecord2.isClosed()) {
ctx.output(sideOutputLateData, errorMyRecord2);
} else {
Long i = stateRecordSequenceNumber.value();
if (i == 1 && MyRecord2.isClosed()) {
// nothing
} else if(meta.isClosed()) {
MyRecord2.setRecordSequenceNumber(i);
stateRecordSequenceNumber.update(i + 1L);
} else if (!MyRecord2.hasRelease()) {
MyRecord2.setRecordSequenceNumber(i);
stateRecordSequenceNumber.update(i + 1L);
} else {
if(i>1) {
MyRecord2.setRecordSequenceNumber(i);
stateRecordSequenceNumber.update(i + 1L);
}
}
out.collect(MyRecord2);
}
}
}
public class MyRecord1Processing {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameters = ParameterToolEnvironmentUtils.createParameterTool(args);
env.getConfig().setGlobalJobParameters(parameters);
String parameterKafkaTransactionTimeout = parameters.get("kafka.transaction.timeout.ms", "3600000"); // sync with Kafka brokers transaction.max.timeout.ms setting
int parameterKafkaSourceIdlenessSec = parameters.getInt("processing.source.idleness.sec",60);
int parameterWatermarksMaxOutOfOrderSec = parameters.getInt("processing.watermarks.out-of-order.max.sec",60);
int parameterAllowedLatenessSec = parameters.getInt("processing.allowed-lateness.sec",600);
int parameterAllowedLatenessMs = parameterAllowedLatenessSec * 1000;
int parameterParallelism = parameters.getInt("processing.parallel.all", env.getParallelism());
String parameterKafkaTransactionIdPrefix = parameters.get("kafka.transaction.id.prefix", "charging-processing");
env.setParallelism(parameterParallelism);
env.getConfig().enableObjectReuse();
OutputTag<ErroneousMyRecord1> outErrors = new OutputTag<>("errors") {};
OutputTag<ErroneousMyRecord2> outMyRecord2Errors = new OutputTag<>("MyRecord2-errors") {};
KafkaSource<MyRecord1> MyRecord1Source = KafkaSource.<MyRecord1>builder()
.setBootstrapServers(parameterKafkaBrokers)
.setTopics(parameterKafkaTopicMyRecord1)
.setGroupId(parameterKafkaGroup)
.setValueOnlyDeserializer(new JsonSerializationSchema<>(MyRecord1.class))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setProperty("flink.partition-discovery.interval-millis", "10000")
.setProperty("transaction.timeout.ms", parameterKafkaTransactionTimeout)
.build();
WatermarkStrategy<MyRecord1> watermarkStrategy = WatermarkStrategy
.<MyRecord1>forBoundedOutOfOrderness(Duration.ofSeconds(parameterWatermarksMaxOutOfOrderSec))
.withIdleness(Duration.ofSeconds(parameterKafkaSourceIdlenessSec))
.withTimestampAssigner((event, old_timestamp) -> event.getMyRecord1Request().getInvocationTimeStamp().toInstant().toEpochMilli());
DataStream<MyRecord1> sourceStream = env
.fromSource(MyRecord1Source, watermarkStrategy, "MyRecord1")
.setParallelism(parameterParallelismMyRecord1);
KafkaSink<MyRecord2> MyRecord2Sink = KafkaSink.<MyRecord2>builder()
.setBootstrapServers(parameterKafkaBrokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(parameterKafkaTopicMyRecord2)
.setValueSerializationSchema(new JsonSerializationSchema<>(MyRecord2.class))
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix(parameterKafkaTopicMyRecord2 + parameterKafkaTransactionIdPrefix)
.build();
KafkaSink<ErroneousMyRecord2> MyRecord2ErrorSink = KafkaSink.<ErroneousMyRecord2>builder()
.setBootstrapServers(parameterKafkaBrokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(parameterKafkaTopicMyRecord2Errors)
.setValueSerializationSchema(new JsonSerializationSchema<>(ErroneousMyRecord2.class))
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix(parameterKafkaTopicMyRecord2Errors + parameterKafkaTransactionIdPrefix)
.build();
SingleOutputStreamOperator<MyRecord1> validatedStream = sourceStream
.process(new ValidationFunction(outErrors))
.setParallelism(parameterParallelismMyRecord1)
.name("Validation").uid("validation");
SingleOutputStreamOperator<MyRecord1> normalizedStream = validatedStream
.process(new NormalizationFunction(outErrors))
.setParallelism(parameterParallelismMyRecord1)
.name("Normalization").uid("normalization");
SingleOutputStreamOperator<MyRecord1> dcStream = normalizedStream
.keyBy(value -> value
.getMyRecord1Request()
.getpDUSessionChargingInformation()
.getChargingId()
)
.process(new DuplicateFunction(outErrors))
.setParallelism(parameterParallelismMyRecord2)
.name("Duplicate Check").uid("dc");
SingleOutputStreamOperator<MyRecord2> MyRecord2Stream = dcStream
.process(new MapFunction(outErrors))
.setParallelism(parameterParallelismMyRecord2)
.name("Map to MyRecord2").uid("map");
KeyedStream<MyRecord2, Long> keyedMyRecord2Stream = DataStreamUtils.reinterpretAsKeyedStream(
MyRecord2Stream,
value -> value
.getPDUSessionChargingInformation()
.getChargingId(),
TypeInformation.of(Long.class)
);
SingleOutputStreamOperator<MyRecord2> aggregateMyRecord2Stream = keyedMyRecord2Stream
.process(MyRecord2ProcessFunction.create(
parameterMyRecord2TriggerTimeMs,
parameterMyRecord2TriggerTimeIntervalMs,
parameterMyRecord2TriggerVolumeByte,
parameterAllowedLatenessMs,
outMyRecord2Errors
))
.setParallelism(parameterParallelismMyRecord2)
.name("Aggregation").uid("aggregation");
aggregateMyRecord2Stream
.sinkTo(MyRecord2Sink)
.setParallelism(parameterParallelismMyRecord2)
.name("MyRecord2").uid("MyRecord2-sink");
DataStream<ErroneousMyRecord2> MyRecord2ErrorStream = aggregateMyRecord2Stream
.getSideOutput(outMyRecord2Errors);
MyRecord2ErrorStream
.sinkTo(MyRecord2ErrorSink)
.setParallelism(parameterParallelismError)
.name("MyRecord2 Errors").uid("MyRecord2-errors-sink");
env.execute("MyRecord1 Processing");
}
}
David Morávek schrieb am 07.01.2022 10:51 (GMT +01:00):
OK, my first intuition would be some kind of misuse of the state API. Other guess would be, has any checkpoint completed prior triggering of the re-scaling event?
I'll also try to verify the scenario you've described, but these would be the things that I'd check first.
D.
Hello David,
right now I cant share the complete code. But I will try in some days to simplify it and reduce the code to still trigger the issue.
First I will check, if the explict keyBy instead of the reinterpretAsKeyedStream fix the issue.
If yes, that would assume - for me - that its a bug with reinterpretAsKeyedStream and the elastic scaling.
If no, its probably another issue caused by my code, instead of Flink.
BR
Martin
David Morávek schrieb am 07.01.2022 10:22 (GMT +01:00):
Would you be able share the code of your test by any chance?
Best,
D.
Hello David,
I have a test setup, where the input is all the time the same.
After processing, I check all the output if each sequence number ist just used once.
Another output field is a random UUID generated on startup of a Task (in the open-method of the (c)-keyed process function).
In the output I saw, that the sequence number started at 1 again on the same time when the scaling happend and the change of the UUID fitted also to that.
Some output:
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":2
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":3
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":4
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":5
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047",,"recordSequenceNumber":1
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":3
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":2
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":4
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":5
The percentage is then the number of output records which uses a already given sequence number (for each key1) compared to all output records.
Right now I change the flink job so, that instead of the reinterpretAsKeyedStream it has an explict keyBy before function (c) again.
I will check if this will fix the issue in my job and test setup.
BR
Martin
David Morávek schrieb am 07.01.2022 09:37 (GMT +01:00):
Hi Martin,
_reinterpretAsKeyedStream_ should definitely work with the reactive mode, if it doesn't it's a bug that needs to be fixed
For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.
Can you please provide details on how did you verify this?
Best,
D.
Hi,
typo: "I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode."
-> I run it of course via standalone kubernetes deployment, to make elastic scaling possible.
BR
Martin
mar...@sonicdev.de schrieb am 06.01.2022 21:38 (GMT +01:00):
Hi,
I have a job where I do a keyBy'd process function (a), then on the downstream a normal process function (b) and and then I use reinterpretAsKeyedStream to have yet another keyBy'd process function (c).
The last keyed process function use keyed state for a increasing sequence number.
I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode.
I use Flink 1.14.2.
I test that job on four use cases: (1) static parallelism, (2) scale out, (3) scale-in, (4) task manager file*.
* via kill -SIGTERM inside the container for the flink JVM
For test use cases (1) and (2) everything is fine.
For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.
Is the reinterpretAsKeyedStream feature in general usable with Elastic Scaling in Reactive Mode in Flink 1.14.2?
If yes, already any ideas what the root cause could be?
BR
Martin