Hi Community,

I want to understand the following logs from the flink-k8s-operator
autoscaler. My flink pipeline running on 1.18.0 and using
flink-k8s-operator (1.8.0) is not scaling up even though the source vertex
is back-pressured.


2024-06-06 21:33:35,270 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Updating source cbc357ccb763df2852fee8c4fc7d55f2 max parallelism based on
available partitions to 1

2024-06-06 21:33:35,276 o.a.f.a.RestApiMetricsCollector
[DEBUG][flink/pipeline-pipelinelocal]
Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC,
Source__dd-log-source.numRecordsOut=SOURCE_TASK_NUM_RECORDS_OUT,
Source__dd-log-source.numRecordsIn=SOURCE_TASK_NUM_RECORDS_IN,
Source__dd-log-source.pendingRecords=PENDING_RECORDS,
Source__dd-log-source.numRecordsInPerSecond=SOURCE_TASK_NUM_RECORDS_IN_PER_SEC,
backPressuredTimeMsPerSecond=BACKPRESSURE_TIME_PER_SEC} for
cbc357ccb763df2852fee8c4fc7d55f2

2024-06-06 21:33:35,282 o.a.f.a.RestApiMetricsCollector
[DEBUG][flink/pipeline-pipelinelocal]
Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
61214243927da46230dfd349fba7b8e6

2024-06-06 21:33:35,286 o.a.f.a.RestApiMetricsCollector
[DEBUG][flink/pipeline-pipelinelocal]
Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
7758b2b5ada48872db09a5c48176e34e

2024-06-06 21:33:35,291 o.a.f.a.RestApiMetricsCollector
[DEBUG][flink/pipeline-pipelinelocal]
Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
eab9c0013081b8479e60463931f3a593

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Calculating vertex scaling metrics for cbc357ccb763df2852fee8c4fc7d55f2
from
{BACKPRESSURE_TIME_PER_SEC=AggregatedMetric{id='backPressuredTimeMsPerSecond',
mim='0.0', max='0.0', avg='0.0', sum='0.0'},
BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond', mim='192.0',
max='192.0', avg='192.0', sum='192.0'},
SOURCE_TASK_NUM_RECORDS_OUT=AggregatedMetric{id='Source__dd-log-source.numRecordsOut',
mim='613279.0', max='613279.0', avg='613279.0', sum='613279.0'},
PENDING_RECORDS=AggregatedMetric{id='Source__dd-log-source.pendingRecords',
mim='0.0', max='0.0', avg='0.0', sum='0.0'},
SOURCE_TASK_NUM_RECORDS_IN=AggregatedMetric{id='Source__dd-log-source.numRecordsIn',
mim='613279.0', max='613279.0', avg='613279.0', sum='613279.0'},
SOURCE_TASK_NUM_RECORDS_IN_PER_SEC=AggregatedMetric{id='Source__dd-log-source.numRecordsInPerSecond',
mim='1682.7333333333333', max='1682.7333333333333',
avg='1682.7333333333333', sum='1682.7333333333333'}}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Vertex scaling metrics for cbc357ccb763df2852fee8c4fc7d55f2:
{ACCUMULATED_BUSY_TIME=32301.0, NUM_RECORDS_OUT=125.0, LOAD=0.192,
NUM_RECORDS_IN=613279.0, OBSERVED_TPR=Infinity, LAG=0.0}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Calculating vertex scaling metrics for eab9c0013081b8479e60463931f3a593
from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
mim='0.0', max='0.0', avg='0.0', sum='0.0'}}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Vertex scaling metrics for eab9c0013081b8479e60463931f3a593:
{ACCUMULATED_BUSY_TIME=0.0, NUM_RECORDS_OUT=0.0, LOAD=0.0,
NUM_RECORDS_IN=8.0}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Calculating vertex scaling metrics for 61214243927da46230dfd349fba7b8e6
from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
mim='0.0', max='0.0', avg='0.0', sum='0.0'}}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Vertex scaling metrics for 61214243927da46230dfd349fba7b8e6:
{ACCUMULATED_BUSY_TIME=0.0, NUM_RECORDS_OUT=0.0, LOAD=0.0,
NUM_RECORDS_IN=8.0}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Calculating vertex scaling metrics for 7758b2b5ada48872db09a5c48176e34e
from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
mim='0.0', max='0.0', avg='0.0', sum='0.0'}}

2024-06-06 21:33:35,305 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Vertex scaling metrics for 7758b2b5ada48872db09a5c48176e34e:
{ACCUMULATED_BUSY_TIME=0.0, NUM_RECORDS_OUT=8.0, LOAD=0.0,
NUM_RECORDS_IN=117.0}

2024-06-06 21:33:35,305 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Global metrics: {NUM_TASK_SLOTS_USED=1.0,
HEAP_MAX_USAGE_RATIO=0.6800108099126959, HEAP_MEMORY_USED=4.74886648E8,
METASPACE_MEMORY_USED=1.40677456E8, MANAGED_MEMORY_USED=0.0}

2024-06-06 21:33:35,306 o.a.f.a.ScalingTracking
[DEBUG][flink/pipeline-pipelinelocal]
Cannot record restart duration because already set in the latest record:
PT0.114185S

2024-06-06 21:33:35,307 o.a.f.a.JobAutoScalerImpl
[DEBUG][flink/pipeline-pipelinelocal]
Collected metrics:
CollectedMetricHistory(jobTopology=JobTopology(vertexInfos={cbc357ccb763df2852fee8c4fc7d55f2=VertexInfo(id=cbc357ccb763df2852fee8c4fc7d55f2,
inputs={}, outputs={61214243927da46230dfd349fba7b8e6=REBALANCE,
7758b2b5ada48872db09a5c48176e34e=HASH}, parallelism=1, maxParallelism=1,
originalMaxParallelism=20, finished=false,
ioMetrics=IOMetrics(numRecordsIn=0, numRecordsOut=125,
accumulatedBusyTime=32301.0)),
eab9c0013081b8479e60463931f3a593=VertexInfo(id=eab9c0013081b8479e60463931f3a593,
inputs={7758b2b5ada48872db09a5c48176e34e=REBALANCE}, outputs={},
parallelism=1, maxParallelism=1, originalMaxParallelism=1, finished=false,
ioMetrics=IOMetrics(numRecordsIn=8, numRecordsOut=0,
accumulatedBusyTime=0.0)),
61214243927da46230dfd349fba7b8e6=VertexInfo(id=61214243927da46230dfd349fba7b8e6,
inputs={cbc357ccb763df2852fee8c4fc7d55f2=REBALANCE}, outputs={},
parallelism=1, maxParallelism=1, originalMaxParallelism=1, finished=false,
ioMetrics=IOMetrics(numRecordsIn=8, numRecordsOut=0,
accumulatedBusyTime=0.0)),
7758b2b5ada48872db09a5c48176e34e=VertexInfo(id=7758b2b5ada48872db09a5c48176e34e,
inputs={cbc357ccb763df2852fee8c4fc7d55f2=HASH},
outputs={eab9c0013081b8479e60463931f3a593=REBALANCE}, parallelism=1,
maxParallelism=20, originalMaxParallelism=20, finished=false,
ioMetrics=IOMetrics(numRecordsIn=117, numRecordsOut=8,
accumulatedBusyTime=0.0))}, finishedVertices=[],
verticesInTopologicalOrder=[cbc357ccb763df2852fee8c4fc7d55f2,
61214243927da46230dfd349fba7b8e6, 7758b2b5ada48872db09a5c48176e34e,
eab9c0013081b8479e60463931f3a593]),
metricHistory={2024-06-06T21:32:35.170678Z=CollectedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={ACCUMULATED_BUSY_TIME=20821.0,
NUM_RECORDS_OUT=109.0, LOAD=0.0, NUM_RECORDS_IN=512339.0,
OBSERVED_TPR=Infinity, LAG=0.0},
eab9c0013081b8479e60463931f3a593={ACCUMULATED_BUSY_TIME=0.0,
NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=7.0},
61214243927da46230dfd349fba7b8e6={ACCUMULATED_BUSY_TIME=0.0,
NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=7.0},
7758b2b5ada48872db09a5c48176e34e={ACCUMULATED_BUSY_TIME=0.0,
NUM_RECORDS_OUT=7.0, LOAD=0.0, NUM_RECORDS_IN=102.0}},
globalMetrics={NUM_TASK_SLOTS_USED=1.0,
HEAP_MAX_USAGE_RATIO=0.5849425971687019, HEAP_MEMORY_USED=4.08495608E8,
METASPACE_MEMORY_USED=1.43093792E8, MANAGED_MEMORY_USED=0.0}),
2024-06-06T21:33:35.258489Z=CollectedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={ACCUMULATED_BUSY_TIME=32301.0,
NUM_RECORDS_OUT=125.0, LOAD=0.192, NUM_RECORDS_IN=613279.0,
OBSERVED_TPR=Infinity, LAG=0.0},
eab9c0013081b8479e60463931f3a593={ACCUMULATED_BUSY_TIME=0.0,
NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=8.0},
61214243927da46230dfd349fba7b8e6={ACCUMULATED_BUSY_TIME=0.0,
NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=8.0},
7758b2b5ada48872db09a5c48176e34e={ACCUMULATED_BUSY_TIME=0.0,
NUM_RECORDS_OUT=8.0, LOAD=0.0, NUM_RECORDS_IN=117.0}},
globalMetrics={NUM_TASK_SLOTS_USED=1.0,
HEAP_MAX_USAGE_RATIO=0.6800108099126959, HEAP_MEMORY_USED=4.74886648E8,
METASPACE_MEMORY_USED=1.40677456E8, MANAGED_MEMORY_USED=0.0})},
jobRunningTs=2024-06-06T21:17:35.712Z, fullyCollected=true)

2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator
[DEBUG][flink/pipeline-pipelinelocal]
Restart time used in metrics evaluation: PT5M

2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator
[DEBUG][flink/pipeline-pipelinelocal]
Using busy time based tpr 17498.932104004747 for
cbc357ccb763df2852fee8c4fc7d55f2.

2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator
[DEBUG][flink/pipeline-pipelinelocal]
Computing edge (cbc357ccb763df2852fee8c4fc7d55f2,
61214243927da46230dfd349fba7b8e6) data rate for single input downstream task

2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator
[DEBUG][flink/pipeline-pipelinelocal]
Computed output ratio for edge (cbc357ccb763df2852fee8c4fc7d55f2 ->
61214243927da46230dfd349fba7b8e6) : 9.906875371507826E-6

2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator
[DEBUG][flink/pipeline-pipelinelocal]
Computing edge (cbc357ccb763df2852fee8c4fc7d55f2,
7758b2b5ada48872db09a5c48176e34e) data rate for single input downstream task

2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator
[DEBUG][flink/pipeline-pipelinelocal]
Computed output ratio for edge (cbc357ccb763df2852fee8c4fc7d55f2 ->
7758b2b5ada48872db09a5c48176e34e) : 1.486031305726174E-4

2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator
[DEBUG][flink/pipeline-pipelinelocal]
Computing edge (7758b2b5ada48872db09a5c48176e34e,
eab9c0013081b8479e60463931f3a593) data rate for single input downstream task

2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator
[DEBUG][flink/pipeline-pipelinelocal]
Computed output ratio for edge (7758b2b5ada48872db09a5c48176e34e ->
eab9c0013081b8479e60463931f3a593) : 0.06666666666666667

2024-06-06 21:33:35,308 o.a.f.a.JobAutoScalerImpl
[DEBUG][flink/pipeline-pipelinelocal]
Evaluated metrics:
EvaluatedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
average=1679.897), PARALLELISM=EvaluatedScalingMetric(current=1.0,
average=NaN),
SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=83995.0,
average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0,
average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
average=17498.932), LOAD=EvaluatedScalingMetric(current=NaN,
average=0.096),
SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
average=NaN), LAG=EvaluatedScalingMetric(current=0.0, average=NaN)},
eab9c0013081b8479e60463931f3a593={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
average=0.017), PARALLELISM=EvaluatedScalingMetric(current=1.0,
average=NaN), SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=1.0,
average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0,
average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
average=Infinity), LOAD=EvaluatedScalingMetric(current=NaN, average=0.0),
SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
average=NaN)},
61214243927da46230dfd349fba7b8e6={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
average=0.017), PARALLELISM=EvaluatedScalingMetric(current=1.0,
average=NaN), SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=1.0,
average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0,
average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
average=Infinity), LOAD=EvaluatedScalingMetric(current=NaN, average=0.0),
SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
average=NaN)},
7758b2b5ada48872db09a5c48176e34e={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
average=0.25), PARALLELISM=EvaluatedScalingMetric(current=1.0,
average=NaN), SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=13.0,
average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=20.0,
average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
average=Infinity), LOAD=EvaluatedScalingMetric(current=NaN, average=0.0),
SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
average=NaN)}},
globalMetrics={HEAP_MAX_USAGE_RATIO=EvaluatedScalingMetric(current=0.68,
average=0.632), NUM_TASK_SLOTS_USED=EvaluatedScalingMetric(current=1.0,
average=NaN), GC_PRESSURE=EvaluatedScalingMetric(current=NaN, average=NaN),
HEAP_MEMORY_USED=EvaluatedScalingMetric(current=4.74886648E8,
average=4.41691128E8),
METASPACE_MEMORY_USED=EvaluatedScalingMetric(current=1.40677456E8,
average=1.41885624E8),
MANAGED_MEMORY_USED=EvaluatedScalingMetric(current=0.0, average=0.0)})

2024-06-06 21:33:35,308 o.a.f.a.ScalingExecutor
[DEBUG][flink/pipeline-pipelinelocal]
Restart time used in scaling summary computation: PT5M

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Target processing capacity for cbc357ccb763df2852fee8c4fc7d55f2 is 168270.0

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Capped target processing capacity for cbc357ccb763df2852fee8c4fc7d55f2 is
168270.0

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Specified autoscaler maximum parallelism 200 is greater than the operator
max parallelism 1. This means the operator max parallelism can never be
reached.

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Target processing capacity for eab9c0013081b8479e60463931f3a593 is 2.0

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Computed scale factor of 0.0 for eab9c0013081b8479e60463931f3a593 is capped
by maximum scale down factor to 0.4

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Capped target processing capacity for eab9c0013081b8479e60463931f3a593 is
Infinity

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Specified autoscaler maximum parallelism 200 is greater than the operator
max parallelism 1. This means the operator max parallelism can never be
reached.

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Target processing capacity for 61214243927da46230dfd349fba7b8e6 is 2.0

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Computed scale factor of 0.0 for 61214243927da46230dfd349fba7b8e6 is capped
by maximum scale down factor to 0.4

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Capped target processing capacity for 61214243927da46230dfd349fba7b8e6 is
Infinity

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Specified autoscaler maximum parallelism 200 is greater than the operator
max parallelism 1. This means the operator max parallelism can never be
reached.

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Target processing capacity for 7758b2b5ada48872db09a5c48176e34e is 25.0

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Computed scale factor of 0.0 for 7758b2b5ada48872db09a5c48176e34e is capped
by maximum scale down factor to 0.4

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Capped target processing capacity for 7758b2b5ada48872db09a5c48176e34e is
Infinity

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler
[DEBUG][flink/pipeline-pipelinelocal]
Specified autoscaler maximum parallelism 200 is greater than the operator
max parallelism 20. This means the operator max parallelism can never be
reached.

2024-06-06 21:33:35,309 o.a.f.a.ScalingExecutor        [INFO ][flink/
pipeline-pipelinelocal] All job vertices are currently running at their
target parallelism.


Some Questions

1. Does the autoscaler decide to scale a job vertex when the target
processing capacity is higher than the current processing capacity? If yes,
how to check the current processing capacity?

2. Which metrics in the above logs say the target utilization threshold is
not reached and hence all the vertices are running at target parallelism?


Thank you
Chetas

Reply via email to