This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new 18c52f3b42 KYLIN-5246 long running job's log staying in mem, may cause
job server oom.
18c52f3b42 is described below
commit 18c52f3b42fcc8e1f92ae32c6e12f4ddf45fdf64
Author: zhaoliu4 <[email protected]>
AuthorDate: Wed Nov 2 21:54:51 2022 +0800
KYLIN-5246 long running job's log staying in mem, may cause job server oom.
keep only the last 10k logs by ring buffer
add a UT for the [RingBuffer.java]
---
.../kylin/common/util/CliCommandExecutor.java | 19 +++---
.../org/apache/kylin/common/util/RingBuffer.java | 71 ++++++++++++++++++++++
.../apache/kylin/common/util/RingBufferTest.java | 60 ++++++++++++++++++
3 files changed, 142 insertions(+), 8 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
index 83f30ab5d3..2f56d4f0c0 100644
---
a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
+++
b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
@@ -58,10 +58,11 @@ public class CliCommandExecutor {
}
public void copyFile(String localFile, String destDir) throws IOException {
- if (remoteHost == null)
+ if (remoteHost == null) {
copyNative(localFile, destDir);
- else
+ } else {
copyRemote(localFile, destDir);
+ }
}
private void copyNative(String localFile, String destDir) throws
IOException {
@@ -93,11 +94,12 @@ public class CliCommandExecutor {
r = runRemoteCommand(command, logAppender);
}
- if (r.getFirst() != 0)
- throw new IOException("OS command error exit with return code: " +
r.getFirst() //
+ if (r.getFirst() != 0) {
+ throw new IOException("OS command error exit with return code: " +
r.getFirst()
+ ", error message: " + r.getSecond() + "The command is:
\n" + command
- + (remoteHost == null ? "" : " (remoteHost:" + remoteHost
+ ")") //
+ + (remoteHost == null ? "" : " (remoteHost:" + remoteHost
+ ")")
);
+ }
return r;
}
@@ -143,9 +145,10 @@ public class CliCommandExecutor {
BufferedReader reader = new BufferedReader(
new InputStreamReader(proc.getInputStream(),
StandardCharsets.UTF_8));
String line;
- StringBuilder result = new StringBuilder();
+ // keep only the last 10k logs, to avoid cause job server oom.
+ RingBuffer ringBuffer = RingBuffer.allocate(10240);
while ((line = reader.readLine()) != null &&
!Thread.currentThread().isInterrupted()) {
- result.append(line).append('\n');
+ ringBuffer.put((line + '\n').getBytes(StandardCharsets.UTF_8));
if (logAppender != null) {
logAppender.log(line);
}
@@ -164,7 +167,7 @@ public class CliCommandExecutor {
try {
int exitCode = proc.waitFor();
- return Pair.newPair(exitCode, result.toString());
+ return Pair.newPair(exitCode, new String(ringBuffer.get(),
StandardCharsets.UTF_8));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
diff --git
a/core-common/src/main/java/org/apache/kylin/common/util/RingBuffer.java
b/core-common/src/main/java/org/apache/kylin/common/util/RingBuffer.java
new file mode 100644
index 0000000000..8e0113cffb
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/RingBuffer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+/**
+ * @author zhaoliu4
+ * @date 2022/11/3
+ */
+public class RingBuffer {
+ private final byte[] bytes;
+
+ /**
+ * next write position
+ */
+ private int writePos;
+
+ /**
+ * number of bytes has stored
+ */
+ private int size;
+
+ private RingBuffer(int capacity) {
+ this.bytes = new byte[capacity];
+ }
+
+ public static RingBuffer allocate(int capacity) {
+ return new RingBuffer(capacity);
+ }
+
+ public RingBuffer put(byte[] src) {
+ for (int i = 0; i < src.length; i++) {
+ if (writePos >= bytes.length) {
+ // reset, turn back continue to write
+ writePos = 0;
+ }
+ bytes[writePos++] = src[i];
+ size = size < bytes.length ? size + 1 : size;
+ }
+ return this;
+ }
+
+ public byte[] get() {
+ byte[] res;
+ if (size == bytes.length && writePos < size) {
+ // occur turn back
+ res = new byte[size];
+ System.arraycopy(bytes, writePos, res, 0, size - writePos);
+ System.arraycopy(bytes, 0, res, size - writePos, writePos);
+ } else {
+ res = new byte[writePos];
+ System.arraycopy(bytes, 0, res, 0, writePos);
+ }
+ return res;
+ }
+}
diff --git
a/core-common/src/test/java/org/apache/kylin/common/util/RingBufferTest.java
b/core-common/src/test/java/org/apache/kylin/common/util/RingBufferTest.java
new file mode 100644
index 0000000000..07a9c1f139
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/util/RingBufferTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * @author zhaoliu4
+ * @date 2022/11/9
+ */
+public class RingBufferTest {
+ @Test
+ public void test() throws IOException {
+ String log1 = "2022-11-09 18:25:48,678 INFO [task-result-getter-0]
scheduler.TaskSetManager:54 : Starting task 0.0 in stage 384.0 (TID 621,
kylin-hadoop-001, executor 7, partition 0, NODE_LOCAL, 7780 bytes)\n";
+ String log2 = "2022-11-09 18:25:48,678 INFO
[dispatcher-event-loop-26] spark.MapOutputTrackerMasterEndpoint:54 : Asked to
send map output locations for shuffle 138 to x.x.x.x:46334\n";
+ String log3 = "2022-11-09 18:25:48,689 INFO [task-result-getter-0]
scheduler.TaskSetManager:54 : Finished task 0.0 in stage 384.0 (TID 621) in 31
ms on kylin-hadoop-001 (executor 7) (1/1)\n";
+ String log4 = "2022-11-09 18:25:48,689 INFO [task-result-getter-0]
cluster.YarnScheduler:54 : Removed TaskSet 384.0, whose tasks have all
completed, from pool vip_tasks\n";
+
+ // mock yarn application log input stream
+ byte[] logBytes = (log1 + log2 + log3 +
log4).getBytes(StandardCharsets.UTF_8);
+ try (BufferedReader reader = new BufferedReader(new
InputStreamReader(new ByteArrayInputStream(logBytes)))) {
+ String line = null;
+ // allocate 300B mem
+ RingBuffer ringBuffer = RingBuffer.allocate(300);
+ while ((line = reader.readLine()) != null) {
+ ringBuffer.put((line + '\n').getBytes(StandardCharsets.UTF_8));
+ }
+
+ // assert actual log data > 600B, but only keep the last 300B
+ Assert.assertTrue(logBytes.length > 600 && ringBuffer.get().length
== 300);
+
+ Assert.assertEquals(" [task-result-getter-0]
scheduler.TaskSetManager:54 : Finished task 0.0 in stage 384.0 (TID 621) in 31
ms on kylin-hadoop-001 (executor 7) (1/1)\n" +
+ "2022-11-09 18:25:48,689 INFO
[task-result-getter-0] cluster.YarnScheduler:54 : Removed TaskSet 384.0, whose
tasks have all completed, from pool vip_tasks\n",
+ new String(ringBuffer.get(), StandardCharsets.UTF_8));
+ }
+ }
+}