This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 8497add [KYLIN-4964] Receiver consumer thread should be stoped while
encounting unrecoverable error (#1622)
8497add is described below
commit 8497add0b8d04647ea8cf0ff2265af46104a88a0
Author: dixingxing <[email protected]>
AuthorDate: Fri May 14 10:52:59 2021 +0800
[KYLIN-4964] Receiver consumer thread should be stoped while encounting
unrecoverable error (#1622)
* Improve error handle for streaming receiver, stop consumer thread while
encounting unrecoverable error
* KYLIN-4964 Receiver consumer thread should be stoped while encounting
unrecoverable error
Co-authored-by: dixingxing <[email protected]>
---
.../kylin/stream/core/consumer/StreamingConsumerChannel.java | 6 ++++++
.../apache/kylin/stream/core/model/stats/ConsumerStats.java | 11 +++++++++++
.../core/storage/columnar/ColumnarMemoryStorePersister.java | 5 ++++-
.../stream/core/storage/columnar/ColumnarSegmentStore.java | 3 +++
4 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
index 838793d..1372186 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kylin.stream.core.exception.IllegalStorageException;
import org.apache.kylin.stream.core.exception.StreamingException;
import org.apache.kylin.stream.core.metrics.StreamingMetrics;
import org.apache.kylin.stream.core.model.StreamingMessage;
@@ -115,6 +116,10 @@ public class StreamingConsumerChannel implements Runnable {
} catch (InterruptedException ie) {
logger.warn("interrupted!");
stopped = true;
+ } catch (IllegalStorageException ise) {
+ logger.error("Encountering unrecoverable exception,
stopping consumer thread! {}",
+ ise.getMessage(), ise);
+ throw ise;
} catch (Exception e) {
long countValue = addEventErrorCnt.incrementAndGet();
if (countValue % 1000 < 3) {
@@ -301,6 +306,7 @@ public class StreamingConsumerChannel implements Runnable {
stats.setPartitionConsumeStatsMap(partitionConsumeStatsMap);
stats.setConsumeOffsetInfo(getSourceConsumeInfo());
stats.setConsumeLag(totalLag);
+ stats.setConsumerThreadAlive(this.consumerThread.isAlive());
return stats;
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ConsumerStats.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ConsumerStats.java
index ab640b1..57c9d1e 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ConsumerStats.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ConsumerStats.java
@@ -47,6 +47,9 @@ public class ConsumerStats {
@JsonProperty("consume_lag")
private long consumeLag;
+ @JsonProperty("consumer_thread_alive")
+ private boolean consumerThreadAlive;
+
public Map<Integer, PartitionConsumeStats> getPartitionConsumeStatsMap() {
return partitionConsumeStatsMap;
}
@@ -102,4 +105,12 @@ public class ConsumerStats {
public void setConsumeLag(long consumeLag) {
this.consumeLag = consumeLag;
}
+
+ public boolean isConsumerThreadAlive() {
+ return consumerThreadAlive;
+ }
+
+ public void setConsumerThreadAlive(boolean consumerThreadAlive) {
+ this.consumerThreadAlive = consumerThreadAlive;
+ }
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
index 02a6ad6..5cd1119 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
@@ -51,6 +51,7 @@ import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.datatype.DataTypeSerializer;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.stream.core.exception.IllegalStorageException;
import
org.apache.kylin.stream.core.storage.columnar.ParsedStreamingCubeInfo.CuboidInfo;
import org.apache.kylin.stream.core.storage.columnar.protocol.CuboidMetaInfo;
import
org.apache.kylin.stream.core.storage.columnar.protocol.DimDictionaryMetaInfo;
@@ -110,7 +111,9 @@ public class ColumnarMemoryStorePersister {
logger.info("Finish persist memory store for cube:{} segment:{},
take: {}ms", cubeInstance.getName(),
segmentName, stopwatch.elapsed(MILLISECONDS));
} catch (Exception e) {
- logger.error("Error persist DataSegment.", e);
+ logger.error("Error persist DataSegment, deleteing fragment
folder:{}", fragment.getFragmentFolder().getPath());
+ fragment.purge();
+ throw new IllegalStorageException("Error persist DataSegment : " +
e.getMessage(), e);
}
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
index 85fcd2c..6598206 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
@@ -313,11 +313,14 @@ public class ColumnarSegmentStore implements
IStreamingSegmentStore {
String checkpointFragmentIDString = (String) checkpoint;
FragmentId checkpointFragmentID =
FragmentId.parse(checkpointFragmentIDString);
List<DataSegmentFragment> fragments = getFragmentsFromFileSystem();
+ List<DataSegmentFragment> invalidFragments = Lists.newArrayList();
for (DataSegmentFragment fragment : fragments) {
if (fragment.getFragmentId().compareTo(checkpointFragmentID) > 0) {
fragment.purge();
+ invalidFragments.add(fragment);
}
}
+ this.fragments.removeAll(invalidFragments);
}
@Override