mxm commented on code in PR #655: URL: https://github.com/apache/flink-kubernetes-operator/pull/655#discussion_r1300143301
########## examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package autoscaling; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +/** + * Example pipeline which simulates load fluctuating load from zero to a defined max, and + * vice-versa. The goal is to simulate a fluctuating traffic pattern which traverses all possible + * stages between peak and zero load. The load curve is computed using a sine function. + * + * <p>The pipeline has defaults but can be parameterized as follows: + * + * <pre> + * repeatsAfterMs => The period length after which the initial load will be reached again. + * maxLoadPerTask => Each task's max load is presented by a double which is similar to the Unix CPU load + * in the sense that at least maxLoad amount of subtasks are needed to sustain the load. + * For example, a max load of 1 represents 100% load on a single subtask, 50% load on two subtasks. + * Similarly, a max load of 2 represents 100% load on two tasks, 50% load on 4 subtasks. + * + * Multiple tasks and branches can be defined to test Flink Autoscaling. The format is as follows: + * maxLoadTask1Branch1[;maxLoadTask2Branch1...[\n maxLoadTask1Branch2[;maxLoadTask2Branch2...]...] + * + * A concrete example: "1;2;4\n4;2;1" + * Two branches are created with three tasks each. On the first branch, the tasks have + * a load of 1, 2, and 3 respectively. On the second branch, the tasks have the load reversed. + * This means, that at peak Flink Autoscaling at target utilization of 0.5, the parallelisms of + * the tasks will be 2, 4, 8 for branch one and vise-versa for branch two. + * </pre> + */ +public class LoadSimulationPipeline { + + private static final Logger LOG = LoggerFactory.getLogger(LoadSimulationPipeline.class); + + public static void main(String[] args) throws Exception { + var env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.disableOperatorChaining(); + + var arguments = ParameterTool.fromArgs(args); + String maxLoadPerTask = + arguments.get("maxLoadPerTask", "1;2;4;8;16;\n16;8;4;2;1\n8;4;16;1;2"); + long repeatsAfterMs = + Duration.ofMinutes(arguments.getLong("repeatsAfterMinutes", 60)).toMillis(); + int samplingIntervalMs = arguments.getInt("samplingIntervalMs", 1_000); + + for (String branch : maxLoadPerTask.split("\n")) { + String[] taskLoads = branch.split(";"); + + DataStream<Long> stream = + env.addSource(new ImpulseSource(samplingIntervalMs)).name("ImpulseSource"); + + for (String load : taskLoads) { + double maxLoad = Double.parseDouble(load); + stream = + stream.shuffle() + .flatMap( + new LoadSimulationFn( + maxLoad, repeatsAfterMs, samplingIntervalMs)) + .name("MaxLoad: " + maxLoad) + .broadcast(); + } + + stream.addSink(new DiscardingSink<>()); + } + + env.execute( + "Load Simulation (repeats after " + + Duration.of(repeatsAfterMs, ChronoUnit.MILLIS) + + ")"); + } + + private static class ImpulseSource implements SourceFunction<Long> { + private final int maxSleepTimeMs; + boolean canceled; + + public ImpulseSource(int samplingInterval) { + this.maxSleepTimeMs = samplingInterval / 10; + } + + @Override + public void run(SourceContext<Long> sourceContext) throws Exception { + while (!canceled) { + sourceContext.collect(42L); Review Comment: The lock is there to ensure correctness while checkpointing. What is being checkpointed? This is not stateful function, neither are the other operators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org