This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 7752a44 add update logic for the DBSinker 、 upgrade the concat_ws
function (#57)
7752a44 is described below
commit 7752a44aa45a2a174d71cfe62db053630043354c
Author: 零号程序 <[email protected]>
AuthorDate: Wed Sep 15 11:36:58 2021 +0800
add update logic for the DBSinker 、 upgrade the concat_ws function (#57)
* add update logic for the DBSinker 、 upgrade the concat_ws function
* Add the field level cache to reduce duplicate data entry #60
Co-authored-by: junjie.cheng <[email protected]>
---
.../apache/rocketmq/streams/db/sink/DBSink.java | 96 ++++++++--------
.../rocketmq/streams/db/sink/DBSinkBuilder.java | 41 +++----
.../rocketmq/streams/client/DataStreamAction.java | 6 +-
.../client/transform/window/HoppingWindow.java | 5 +-
.../common/cache/compress/AdditionStore.java | 7 +-
.../streams/common/cache/compress/BitSetCache.java | 88 +++++++-------
.../streams/common/cache/compress/ByteArray.java | 13 ++-
.../common/cache/compress/ByteArrayValueKV.java | 9 +-
.../streams/common/cache/compress/CacheKV.java | 18 +--
.../streams/common/cache/compress/ICacheKV.java | 1 -
.../streams/common/cache/compress/KVElement.java | 2 +-
.../cache/compress/impl/FixedLenRowCacheKV.java | 4 +-
.../common/cache/compress/impl/IntValueKV.java | 8 +-
.../common/cache/compress/impl/LongValueKV.java | 67 +++++++++++
.../common/cache/compress/impl/MutilValueKV.java | 46 ++++----
.../AbstractSupportShuffleChannelBuilder.java | 2 +-
.../common/channel/impl/OutputPrintChannel.java | 7 +-
.../streams/common/channel/sink/AbstractSink.java | 35 +++---
.../streams/common/context/AbstractContext.java | 19 ++--
.../streams/common/topology/ChainPipeline.java | 95 +++++++++++++---
.../common/topology/stages/OutputChainStage.java | 52 ++++-----
.../rocketmq/streams/common/utils/SQLUtil.java | 54 +++++----
.../service/AbstractConfigurableService.java | 6 +-
.../streams/filter/context/RuleContext.java | 2 +-
.../lease/service/storages/DBLeaseStorage.java | 1 +
.../streams/script/context/FunctionContext.java | 2 +-
.../script/function/impl/field/FieldFunction.java | 48 ++++----
.../function/impl/flatmap/SplitArrayFunction.java | 4 +-
.../performance/AbstractScriptProxy.java | 11 +-
.../performance/CaseScriptExpressionProxy.java | 31 ++---
.../optimization/performance/EqualsProxy.java | 5 +-
.../optimization/performance/RegexProxy.java | 8 +-
.../performance/ScriptExpressionGroupsProxy.java | 26 +++--
.../performance/ScriptOptimization.java | 62 ++++------
.../performance/ScriptProxyFactory.java | 26 ++---
.../performance/SimpleScriptExpressionProxy.java | 26 ++---
.../streams/window/operator/AbstractWindow.java | 126 +++++++++++----------
.../rocketmq/streams/window/sqlcache/SQLCache.java | 9 +-
38 files changed, 594 insertions(+), 474 deletions(-)
diff --git
a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
index 4a2b50e..1aab2ba 100644
---
a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
+++
b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
@@ -17,13 +17,7 @@
package org.apache.rocketmq.streams.db.sink;
import com.alibaba.fastjson.JSONObject;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Set;
+import com.google.common.collect.Lists;
import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
@@ -38,6 +32,14 @@ import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Set;
+
/**
* 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql
模版:insert into
table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
MetaData:主要是描述每个字段的类型,是否必须
二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql
*/
@@ -45,6 +47,8 @@ public class DBSink extends AbstractSink {
protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into
table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
+ protected String duplicateSQLTemplate; //通过on duplicate key update
来对已经存在的信息进行更新
+
protected MetaData metaData;//可以指定meta data,和insertSQL二选一
protected String tableName; //指定要插入的数据表
@@ -58,7 +62,7 @@ public class DBSink extends AbstractSink {
@ENVDependence
protected String password;
- protected boolean openSqlCache=false;
+ protected boolean openSqlCache = true;
protected transient IMessageCache<String> sqlCache;//cache sql, batch
submit sql
@@ -111,26 +115,27 @@ public class DBSink extends AbstractSink {
ResultSet metaResult =
metaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
this.metaData = MetaData.createMetaData(metaResult);
this.metaData.setTableName(this.tableName);
- sqlCache=new MessageCache<>(new
IMessageFlushCallBack<String>() {
- @Override public boolean flushMessage(List<String> sqls) {
- JDBCDriver dataSource =
DriverBuilder.createDriver(jdbcDriver, url, userName, password);
- try {
- dataSource.executSqls(sqls);
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- if (dataSource != null) {
- dataSource.destroy();
- }
+ }
+ sqlCache = new MessageCache<>(new IMessageFlushCallBack<String>() {
+ @Override
+ public boolean flushMessage(List<String> sqls) {
+ JDBCDriver dataSource =
DriverBuilder.createDriver(jdbcDriver, url, userName, password);
+ try {
+ dataSource.executSqls(sqls);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ if (dataSource != null) {
+ dataSource.destroy();
}
- return true;
}
- });
- ((MessageCache<String>) sqlCache).setAutoFlushTimeGap(100000);
- ((MessageCache<String>) sqlCache).setAutoFlushSize(50);
- sqlCache.openAutoFlush();
- }
+ return true;
+ }
+ });
+ ((MessageCache<String>) sqlCache).setAutoFlushTimeGap(100000);
+ ((MessageCache<String>) sqlCache).setAutoFlushSize(50);
+ sqlCache.openAutoFlush();
return super.initConfigurable();
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
@@ -148,7 +153,8 @@ public class DBSink extends AbstractSink {
List<JSONObject> messages =
convertJsonObjectFromMessage(messageList);
if (StringUtil.isEmpty(insertSQLTemplate) && metaData != null) {
String sql = SQLUtil.createInsertSql(metaData,
messages.get(0));
- sql = sql + SQLUtil.createInsertValuesSQL(metaData,
messages.subList(1, messages.size()));
+ sql += SQLUtil.createInsertValuesSQL(metaData,
messages.subList(1, messages.size()));
+ sql += this.duplicateSQLTemplate;
executeSQL(dbDataSource, sql);
return true;
}
@@ -156,26 +162,17 @@ public class DBSink extends AbstractSink {
if (StringUtil.isEmpty(insertValueSQL) ||
insertSQLTemplate.replace(insertValueSQL, "").contains("#{")) {
for (JSONObject message : messages) {
String sql = parseSQL(message, insertSQLTemplate);
+ sql += this.duplicateSQLTemplate;
executeSQL(dbDataSource, sql);
}
return true;
} else {
- StringBuilder sb = new StringBuilder();
- String insertSQL;
- boolean isFirst = true;
- int i = 0;
+ List<String> subInsert = Lists.newArrayList();
for (JSONObject message : messages) {
- insertSQL = parseSQL(message, insertValueSQL);
- if (isFirst) {
- isFirst = false;
- } else {
- sb.append(",");
- }
- i++;
-
- sb.append(insertSQL);
+ subInsert.add(parseSQL(message, insertValueSQL));
}
- insertSQL = this.insertSQLTemplate.replace(insertValueSQL,
sb.toString());
+ String insertSQL =
this.insertSQLTemplate.replace(insertValueSQL, String.join(",", subInsert));
+ insertSQL += this.duplicateSQLTemplate;
executeSQL(dbDataSource, insertSQL);
return true;
}
@@ -184,17 +181,18 @@ public class DBSink extends AbstractSink {
}
}
- @Override public boolean checkpoint(Set<String> splitIds) {
- if(sqlCache!=null){
+ @Override
+ public boolean checkpoint(Set<String> splitIds) {
+ if (sqlCache != null) {
sqlCache.flush(splitIds);
}
return true;
}
protected void executeSQL(JDBCDriver dbDataSource, String sql) {
- if(isOpenSqlCache()){
+ if (isOpenSqlCache()) {
this.sqlCache.addCache(sql);
- }else {
+ } else {
dbDataSource.execute(sql);
}
@@ -285,4 +283,12 @@ public class DBSink extends AbstractSink {
public void setOpenSqlCache(boolean openSqlCache) {
this.openSqlCache = openSqlCache;
}
+
+ public String getDuplicateSQLTemplate() {
+ return duplicateSQLTemplate;
+ }
+
+ public void setDuplicateSQLTemplate(String duplicateSQLTemplate) {
+ this.duplicateSQLTemplate = duplicateSQLTemplate;
+ }
}
diff --git
a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
index c960bae..87f5fb7 100644
---
a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
+++
b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
@@ -17,15 +17,16 @@
package org.apache.rocketmq.streams.db.sink;
import com.google.auto.service.AutoService;
-import java.util.List;
-import java.util.Properties;
+import com.google.common.collect.Lists;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
+
+import java.util.List;
+import java.util.Properties;
@AutoService(IChannelBuilder.class)
@ServiceName(DBSinkBuilder.TYPE)
@@ -36,29 +37,23 @@ public class DBSinkBuilder implements IChannelBuilder {
public ISink createSink(String namespace, String name, Properties
properties, MetaData metaData) {
DBSink sink = new DBSink();
sink.setUrl(properties.getProperty("url"));
- sink.setUserName("userName");
- sink.setPassword("password");
+ sink.setUserName(properties.getProperty("userName"));
+ sink.setPassword(properties.getProperty("password"));
List<MetaDataField> fieldList = metaData.getMetaDataFields();
- StringBuilder insertSQL = new StringBuilder();
- StringBuilder insertValueSQL = new StringBuilder();
- boolean isFirst = true;
- for (MetaDataField field : fieldList) {
+
+ List<String> insertFields = Lists.newArrayList();
+ List<String> insertValues = Lists.newArrayList();
+ List<String> duplicateKeys = Lists.newArrayList();
+ fieldList.forEach(field -> {
String fieldName = field.getFieldName();
- if (isFirst) {
- isFirst = false;
- } else {
- insertSQL.append(",");
- insertValueSQL.append(",");
- }
- insertSQL.append(fieldName);
- if (DataTypeUtil.isNumber(field.getDataType())) {
- insertValueSQL.append(fieldName);
- } else {
- insertValueSQL.append("'#{" + fieldName + "}'");
- }
- }
- String sql = "insert into " + properties.getProperty("tableName") +
"(" + insertSQL.toString() + ")values(" + insertValueSQL.toString() + ")";
+ insertFields.add(fieldName);
+ insertValues.add("'#{" + fieldName + "}'");
+ duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")");
+ });
+
+ String sql = "insert into " + properties.getProperty("tableName") +
"(" + String.join(",", insertFields) + ") values (" + String.join(",",
insertValues) + ") ";
sink.setInsertSQLTemplate(sql);
+ sink.setDuplicateSQLTemplate(" on duplicate key update " +
String.join(",", duplicateKeys));
return sink;
}
diff --git
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
index 91f16fa..0a052fa 100644
---
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
+++
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
@@ -18,9 +18,11 @@
package org.apache.rocketmq.streams.client;
import com.google.common.collect.Maps;
+
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+
import org.apache.rocketmq.streams.client.strategy.Strategy;
import org.apache.rocketmq.streams.client.transform.DataStream;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
@@ -79,11 +81,11 @@ public class DataStreamAction extends DataStream {
}
ConfigurableComponent configurableComponent =
ComponentCreator.getComponent(mainPipelineBuilder.getPipelineNameSpace(),
ConfigurableComponent.class, kvs);
- ChainPipeline pipeline =
this.mainPipelineBuilder.build(configurableComponent.getService());
+ ChainPipeline<?> pipeline =
this.mainPipelineBuilder.build(configurableComponent.getService());
pipeline.startChannel();
if (this.otherPipelineBuilders != null) {
for (PipelineBuilder builder : otherPipelineBuilders) {
- ChainPipeline otherPipeline =
builder.build(configurableComponent.getService());
+ ChainPipeline<?> otherPipeline =
builder.build(configurableComponent.getService());
otherPipeline.startChannel();
}
}
diff --git
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/HoppingWindow.java
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/HoppingWindow.java
index 0c945f0..9078b16 100644
---
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/HoppingWindow.java
+++
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/HoppingWindow.java
@@ -20,10 +20,11 @@ package org.apache.rocketmq.streams.client.transform.window;
public class HoppingWindow {
/**
* 滑动窗口信息
+ *
* @return
*/
- public static WindowInfo of(Time windowSize,Time windowSlide){
- WindowInfo windowInfo=new WindowInfo();
+ public static WindowInfo of(Time windowSize, Time windowSlide) {
+ WindowInfo windowInfo = new WindowInfo();
windowInfo.setType(WindowInfo.HOPPING_WINDOW);
windowInfo.setWindowSize(windowSize);
windowInfo.setWindowSlide(windowSlide);
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AdditionStore.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AdditionStore.java
index 0b441bf..e8485b7 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AdditionStore.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AdditionStore.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.common.cache.compress;
import java.util.ArrayList;
import java.util.List;
-
public class AdditionStore {
/**
@@ -104,8 +103,8 @@ public class AdditionStore {
CacheKV.MapAddress address = new CacheKV.MapAddress(conflictIndex,
conflictOffset);
if (isVarLen) {
int size = value.length;
- bytes[conflictOffset] = (byte)(size & 0xff);
- bytes[conflictOffset + 1] = (byte)(size >> 8 & 0xff);
+ bytes[conflictOffset] = (byte) (size & 0xff);
+ bytes[conflictOffset + 1] = (byte) (size >> 8 & 0xff);
conflictOffset = conflictOffset + 2;
}
for (int i = 0; i < value.length; i++) {
@@ -131,7 +130,7 @@ public class AdditionStore {
if (bytes == null) {
return null;
}
- if (isVarLen == false) {
+ if (!isVarLen) {
return new ByteArray(bytes, mapAddress.offset, elementSize);
} else {
int len = new ByteArray(bytes, mapAddress.offset, 2).castInt(0, 2);
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java
index 7a4c5fb..bc4a6f1 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java
@@ -26,81 +26,81 @@ public class BitSetCache {
protected int capacity;
protected int bitSetSize;
- public class BitSet{
+ public class BitSet {
private byte[] bytes;
- public BitSet(){
- bytes=new byte[byteSetSize];
+ public BitSet() {
+ bytes = new byte[byteSetSize];
}
- public BitSet(byte[] bytes){
- this.bytes=bytes;
+
+ public BitSet(byte[] bytes) {
+ this.bytes = bytes;
}
- public void set(int index){
- if(index>bitSetSize){
- throw new RuntimeException("the index exceed max index, max
index is "+byteSetSize+", real is "+index);
+
+ public void set(int index) {
+ if (index > bitSetSize) {
+ throw new RuntimeException("the index exceed max index, max
index is " + byteSetSize + ", real is " + index);
}
- int byteIndex=index/8;
- int bitIndex=index%8;
- byte byteElement=bytes[byteIndex];
- byteElement = (byte) (byteElement|(1 << bitIndex));
- bytes[byteIndex]=byteElement;
+ int byteIndex = index / 8;
+ int bitIndex = index % 8;
+ byte byteElement = bytes[byteIndex];
+ byteElement = (byte) (byteElement | (1 << bitIndex));
+ bytes[byteIndex] = byteElement;
}
- public boolean get(int index){
- if(index>bitSetSize){
- throw new RuntimeException("the index exceed max index, max
index is "+byteSetSize+", real is "+index);
+
+ public boolean get(int index) {
+ if (index > bitSetSize) {
+ throw new RuntimeException("the index exceed max index, max
index is " + byteSetSize + ", real is " + index);
}
- int byteIndex=index/8;
- int bitIndex=index%8;
- byte byteElement=bytes[byteIndex];
- boolean isTrue = ((byteElement & (1 << bitIndex)) != 0);
- return isTrue;
+ int byteIndex = index / 8;
+ int bitIndex = index % 8;
+ byte byteElement = bytes[byteIndex];
+ return ((byteElement & (1 << bitIndex)) != 0);
}
- public byte[] getBytes(){
+ public byte[] getBytes() {
return bytes;
}
}
- public BitSet createBitSet(){
+ public BitSet createBitSet() {
return new BitSet();
}
-
- public BitSetCache(int bitSetSize, int capacity){
- cache=new ByteArrayValueKV(capacity,true);
- this.byteSetSize=bitSetSize/8+bitSetSize%8;
- this.capacity=capacity;
- this.bitSetSize=bitSetSize;
+ public BitSetCache(int bitSetSize, int capacity) {
+ cache = new ByteArrayValueKV(capacity, true);
+ this.byteSetSize = bitSetSize / 8 + bitSetSize % 8;
+ this.capacity = capacity;
+ this.bitSetSize = bitSetSize;
}
-
- public void put(String key,BitSet bitSet){
- if(cache.size>cache.capacity){
- synchronized (this){
- if(cache.size>cache.capacity){
- cache=new ByteArrayValueKV(capacity,true);
+ public void put(String key, BitSet bitSet) {
+ if (cache.size > cache.capacity) {
+ synchronized (this) {
+ if (cache.size > cache.capacity) {
+ cache = new ByteArrayValueKV(capacity, true);
}
}
}
- cache.put(key,bitSet.getBytes());
+ cache.put(key, bitSet.getBytes());
}
public static void main(String[] args) {
- BitSetCache bitSetCache=new BitSetCache(150,30000);
- BitSet bitSet=bitSetCache.createBitSet();
+ BitSetCache bitSetCache = new BitSetCache(150, 30000);
+ BitSet bitSet = bitSetCache.createBitSet();
bitSet.set(13);
- bitSetCache.put("fdsdf",bitSet);
- BitSet bitSet1=bitSetCache.get("fdsdf");
+ bitSetCache.put("fdsdf", bitSet);
+ BitSet bitSet1 = bitSetCache.get("fdsdf");
System.out.println(bitSet1.get(13));
}
- public BitSet get(String key){
- byte[] bytes=cache.get(key);
- if(bytes==null){
+ public BitSet get(String key) {
+ byte[] bytes = cache.get(key);
+ if (bytes == null) {
return null;
}
- return new BitSet(bytes);
+ return new BitSet(bytes);
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArray.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArray.java
index 969d299..cc48286 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArray.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArray.java
@@ -59,6 +59,15 @@ public class ByteArray {
return res;
}
+ public long castLong(int offset, int size) {
+ int index = startIndex + offset;
+ long res = 0L;
+ for (int i = 0; i < size; i++) {
+ res += (long) (bytes[i + index] & 0xff) << (i * 8);
+ }
+ return res;
+ }
+
public byte getByte(int offset) {
int index = startIndex + offset;
return bytes[index];
@@ -94,9 +103,9 @@ public class ByteArray {
protected void flush(int value) {
for (int i = 0; i < 4; i++) {
if (i == 0) {
- this.bytes[i + this.startIndex] = (byte)(value & 0xff);
+ this.bytes[i + this.startIndex] = (byte) (value & 0xff);
} else {
- this.bytes[i + this.startIndex] = (byte)(value >> (i * 8) &
0xff);
+ this.bytes[i + this.startIndex] = (byte) (value >> (i * 8) &
0xff);
}
}
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArrayValueKV.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArrayValueKV.java
index 0e3d789..6d3bd38 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArrayValueKV.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArrayValueKV.java
@@ -89,18 +89,13 @@ public class ByteArrayValueKV extends CacheKV<byte[]> {
@Override
public int calMemory() {
- int value = super.calMemory() + (this.conflicts.getConflictIndex() +
1) * this.conflicts
- .getBlockSize();
- return value;
+ return super.calMemory() + (this.conflicts.getConflictIndex() + 1) *
this.conflicts.getBlockSize();
}
@Override
public boolean contains(String key) {
byte[] bytes = get(key);
- if (bytes == null) {
- return false;
- }
- return true;
+ return bytes != null;
}
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/CacheKV.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/CacheKV.java
index 08bb056..4af9ba1 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/CacheKV.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/CacheKV.java
@@ -75,7 +75,9 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
public abstract void put(String key, T value);
public T remove(String key) {
- if (StringUtil.isEmpty(key)) { return null; }
+ if (StringUtil.isEmpty(key)) {
+ return null;
+ }
MapElementContext context = queryMapElementByHashCode(key);
/**
* TODO:
@@ -113,11 +115,11 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
KVElement mapElement = context.mapElement;
//如果没有发生冲突,说明当前节点无被占用,直接写入
- if (context.isOccurConflict == false) {
+ if (!context.isOccurConflict) {
size++;
mapElement.keyHashCode.flush(mapElement.getKeyHashCode());
- if (mapElement.isNoValue() == false) {
+ if (!mapElement.isNoValue()) {
mapElement.value.flush(value);
}
@@ -125,7 +127,7 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
} else {
//如果key已经存在,覆盖value
if (context.isMatchKey) {
- if (mapElement.isNoValue() == false) {
+ if (!mapElement.isNoValue()) {
if (!supportUpdate) {
return false;
}
@@ -280,7 +282,7 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
this.mapAddress = mapAddress;
this.mapElement = mapElement;
this.isMatchKey = isMatchKey;
- if (mapElement.isEmpty() == false) {
+ if (!mapElement.isEmpty()) {
isOccurConflict = true;
}
}
@@ -327,7 +329,7 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
}
public boolean isEmpty() {
- return isConflict == false && conflictIndex == 0 && offset == 0;
+ return !isConflict && conflictIndex == 0 && offset == 0;
}
/**
@@ -356,14 +358,14 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
public byte[] createBytes() {
byte[] bytes = NumberUtils.toByte(offset);
int value = 0;
- byte fisrtByte = (byte)(conflictIndex & 0xff);
+ byte fisrtByte = (byte) (conflictIndex & 0xff);
if (isConflict) {
value = (fisrtByte | (1 << 7));//把第一位变成1
} else {
return bytes;
}
- bytes[bytes.length - 1] = (byte)(value & 0xff);
+ bytes[bytes.length - 1] = (byte) (value & 0xff);
return bytes;
}
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ICacheKV.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ICacheKV.java
index 9a766e0..885affc 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ICacheKV.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ICacheKV.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.common.cache.compress;
/**
* kv提供的对外接口,通过二进制实现存储,减少java对象带来的头部开销。 需要指定初始容量,会在创建对象时分配内存。
- *
*/
public interface ICacheKV<T> {
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/KVElement.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/KVElement.java
index 60f2f0b..d494882 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/KVElement.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/KVElement.java
@@ -51,7 +51,7 @@ public class KVElement {
}
public static byte[] createByteArray(CacheKV.MapAddress nextAddress,
byte[] keyHashCode, int value,
- int elementSize) {
+ int elementSize) {
KVElement element = new KVElement(nextAddress, keyHashCode, value);
element.setElementSize(elementSize);
return element.getBytes();
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/FixedLenRowCacheKV.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/FixedLenRowCacheKV.java
index 56a300f..d595da3 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/FixedLenRowCacheKV.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/FixedLenRowCacheKV.java
@@ -133,8 +133,8 @@ public class FixedLenRowCacheKV {
Object o = values[i];
byte[] byteValue = null;
if (dataType instanceof SetDataType) {
- byteValue = new byte[] {(byte)i};
- add2Set(i, (Set<String>)o);
+ byteValue = new byte[] {(byte) i};
+ add2Set(i, (Set<String>) o);
} else {
byteValue = dataType.toBytes(o, false);
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java
index a7577d3..23b354e 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java
@@ -35,7 +35,6 @@ import org.junit.Assert;
/**
* 支持key是string,value是int的场景,支持size不大于10000000.只支持int,long,boolean,string类型
- *
*/
public class IntValueKV extends CacheKV<Integer> {
@@ -79,9 +78,7 @@ public class IntValueKV extends CacheKV<Integer> {
@Override
public int calMemory() {
- int value = super.calMemory() + (this.conflicts.getConflictIndex() +
1) * this.conflicts
- .getBlockSize();
- return value;
+ return super.calMemory() + (this.conflicts.getConflictIndex() + 1) *
this.conflicts.getBlockSize();
}
/**
@@ -89,9 +86,6 @@ public class IntValueKV extends CacheKV<Integer> {
*
* @return
*/
- //public Integer remove(String key) {
- // return null;
- //}
public IntValueKV(int capacity) {
super(capacity);
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongValueKV.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongValueKV.java
new file mode 100644
index 0000000..78bd032
--- /dev/null
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongValueKV.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.common.cache.compress.impl;
+
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.streams.common.cache.compress.ByteArrayValueKV;
+import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
+
+public class LongValueKV extends CacheKV<Long> {
+
+ private final ByteArrayValueKV byteArrayValueKV;
+
+ public LongValueKV(int capacity) {
+ super(capacity, 8);
+ byteArrayValueKV = new ByteArrayValueKV(capacity, true);
+ }
+
+ @Override
+ public void put(String key, Long value) {
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ buffer.putLong(0, value);
+ byte[] bytes = buffer.array();
+ byteArrayValueKV.put(key, bytes);
+ }
+
+ @Override
+ public boolean contains(String key) {
+ return byteArrayValueKV.contains(key);
+ }
+
+ @Override
+ public int getSize() {
+ return byteArrayValueKV.getSize();
+ }
+
+ @Override
+ public int calMemory() {
+ return byteArrayValueKV.calMemory();
+ }
+
+ @Override
+ public Long get(String key) {
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ byte[] bytes = byteArrayValueKV.get(key);
+ if (bytes == null) {
+ return null;
+ }
+ buffer.put(bytes, 0, 8);
+ buffer.flip();
+ return buffer.getLong();
+ }
+
+}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/MutilValueKV.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/MutilValueKV.java
index 3065a46..0a6009a 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/MutilValueKV.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/MutilValueKV.java
@@ -23,26 +23,25 @@ import
org.apache.rocketmq.streams.common.cache.compress.ICacheKV;
public abstract class MutilValueKV<T> implements ICacheKV<T> {
//按固定大小分割存储
- protected List<ICacheKV<T>> valueKVS=new ArrayList<>();
+ protected List<ICacheKV<T>> valueKVS = new ArrayList<>();
//当前存储的索引
- protected int currentIndex=0;
+ protected int currentIndex = 0;
//每个分片的大小
protected int capacity;
- public MutilValueKV(int capacity){
- this.capacity=capacity;
+ public MutilValueKV(int capacity) {
+ this.capacity = capacity;
}
-
@Override
public T get(String key) {
- if(valueKVS==null){
+ if (valueKVS == null) {
return null;
}
- for(ICacheKV<T> cacheKV:valueKVS){
- if(cacheKV!=null){
- T value=cacheKV.get(key);
- if(value!=null){
+ for (ICacheKV<T> cacheKV : valueKVS) {
+ if (cacheKV != null) {
+ T value = cacheKV.get(key);
+ if (value != null) {
return value;
}
}
@@ -52,32 +51,32 @@ public abstract class MutilValueKV<T> implements
ICacheKV<T> {
@Override
public void put(String key, T value) {
- if(valueKVS==null){
+ if (valueKVS == null) {
return;
}
- ICacheKV<T> cacheKV= valueKVS.get(currentIndex);
- if(cacheKV.getSize()>=capacity){
- synchronized (this){
- cacheKV= valueKVS.get(currentIndex);
- if(cacheKV.getSize()>=capacity){
- cacheKV=create();
+ ICacheKV<T> cacheKV = valueKVS.get(currentIndex);
+ if (cacheKV.getSize() >= capacity) {
+ synchronized (this) {
+ cacheKV = valueKVS.get(currentIndex);
+ if (cacheKV.getSize() >= capacity) {
+ cacheKV = create();
valueKVS.add(cacheKV);
currentIndex++;
}
}
}
- cacheKV.put(key,value);
+ cacheKV.put(key, value);
}
@Override
public boolean contains(String key) {
- if(valueKVS==null){
+ if (valueKVS == null) {
return false;
}
- for(ICacheKV<T> cacheKV:valueKVS){
- if(cacheKV!=null){
- boolean isMatch=cacheKV.contains(key);
- if(isMatch){
+ for (ICacheKV<T> cacheKV : valueKVS) {
+ if (cacheKV != null) {
+ boolean isMatch = cacheKV.contains(key);
+ if (isMatch) {
return true;
}
}
@@ -85,7 +84,6 @@ public abstract class MutilValueKV<T> implements ICacheKV<T> {
return false;
}
-
protected abstract ICacheKV<T> create();
@Override
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
index 2119a6b..211fc17 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
@@ -25,6 +25,6 @@ public abstract class AbstractSupportShuffleChannelBuilder
implements IChannelBu
@Override
public ISource copy(ISource pipelineSource) {
JSONObject jsonObject =
JSONObject.parseObject(pipelineSource.toJson());
- return (ISource)ConfigurableUtil.create(pipelineSource.getNameSpace(),
pipelineSource.getConfigureName(), jsonObject,
pipelineSource.getClass().getName());
+ return ConfigurableUtil.create(pipelineSource.getNameSpace(),
pipelineSource.getConfigureName(), jsonObject,
pipelineSource.getClass().getName());
}
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
index bd7029f..8c4f63c 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.streams.common.channel.impl;
import java.util.List;
+
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.utils.PrintUtil;
@@ -26,16 +27,12 @@ import org.apache.rocketmq.streams.common.utils.PrintUtil;
*/
public class OutputPrintChannel extends AbstractSink {
-
@Override
protected boolean batchInsert(List<IMessage> messages) {
for (IMessage msg : messages) {
- System.out.println(msg.getMessageValue());
+ //System.out.println(msg.getMessageValue());
}
return false;
}
-
-
-
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
index e56c30a..82218c2 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
@@ -17,12 +17,14 @@
package org.apache.rocketmq.streams.common.channel.sink;
import com.alibaba.fastjson.JSONObject;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
@@ -72,13 +74,12 @@ public abstract class AbstractSink extends
BasedConfigurable implements ISink<Ab
}
public ISplit getSplit(IMessage message) {
- return (ISplit)message.getMessageBody().get(TARGET_QUEUE);
+ return (ISplit) message.getMessageBody().get(TARGET_QUEUE);
}
@Override
public boolean batchAdd(IMessage fieldName2Value) {
messageCache.addCache(fieldName2Value);
-
return true;
}
@@ -127,7 +128,7 @@ public abstract class AbstractSink extends
BasedConfigurable implements ISink<Ab
public boolean flush(Set<String> splitIds) {
int size = messageCache.flush(splitIds);
if (size > 0) {
- System.out.println(this.getClass().getSimpleName()+ " finish flush
data " + size);
+ System.out.println(this.getClass().getSimpleName() + " finish
flush data " + size);
}
return size > 0;
@@ -135,11 +136,11 @@ public abstract class AbstractSink extends
BasedConfigurable implements ISink<Ab
@Override
public boolean flush(String... splitIds) {
- if(splitIds==null){
+ if (splitIds == null) {
return true;
}
- Set<String> splitIdSet =new HashSet<>();
- for(String splitId:splitIds){
+ Set<String> splitIdSet = new HashSet<>();
+ for (String splitId : splitIds) {
splitIdSet.add(splitId);
}
return flush(splitIdSet);
@@ -172,16 +173,18 @@ public abstract class AbstractSink extends
BasedConfigurable implements ISink<Ab
return success;
}
- @Override public boolean checkpoint(Set<String> splitIds) {
+ @Override
+ public boolean checkpoint(Set<String> splitIds) {
return flush(splitIds);
}
- @Override public boolean checkpoint(String... splitIds) {
- if(splitIds==null){
+ @Override
+ public boolean checkpoint(String... splitIds) {
+ if (splitIds == null) {
return false;
}
- Set<String> splitSet=new HashSet<>();
- for(String splitId: splitIds){
+ Set<String> splitSet = new HashSet<>();
+ for (String splitId : splitIds) {
splitSet.add(splitId);
}
@@ -242,13 +245,13 @@ public abstract class AbstractSink extends
BasedConfigurable implements ISink<Ab
@Override
public Map<String, MessageOffset>
getFinishedQueueIdAndOffsets(CheckPointMessage checkPointMessage) {
- String piplineName = null;
- if
(IConfigurableIdentification.class.isInstance(checkPointMessage.getStreamOperator()))
{
- IConfigurableIdentification configurable =
(IConfigurableIdentification)checkPointMessage.getStreamOperator();
- piplineName = configurable.getConfigureName();
+ String pipelineName = null;
+ if (checkPointMessage.getStreamOperator() instanceof
IConfigurableIdentification) {
+ IConfigurableIdentification configurable =
(IConfigurableIdentification) checkPointMessage.getStreamOperator();
+ pipelineName = configurable.getConfigureName();
}
SourceState sourceState = this.sourceName2State.get(
- CheckPointManager.createSourceName(checkPointMessage.getSource(),
piplineName));
+ CheckPointManager.createSourceName(checkPointMessage.getSource(),
pipelineName));
if (sourceState != null) {
return sourceState.getQueueId2Offsets();
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java
index 955e950..4e49b63 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java
@@ -150,23 +150,24 @@ public abstract class AbstractContext<T extends IMessage>
extends HashMap {
/**
* cache filter(regex,like,equals)result
*/
- private static String FILTER_CACHE_PREPIX="__filter_cache_prefix";
- public void setFilterCache(String expressionStr,String varValue, boolean
result){
-
this.put(MapKeyUtil.createKey(FILTER_CACHE_PREPIX,expressionStr,varValue),result);
+ private static String FILTER_CACHE_PREPIX = "__filter_cache_prefix";
+
+ public void setFilterCache(String expressionStr, String varValue, boolean
result) {
+ this.put(MapKeyUtil.createKey(FILTER_CACHE_PREPIX, expressionStr,
varValue), result);
}
/**
* get cache result
+ *
* @param expressionStr
* @param varValue
* @return
*/
- public Boolean getFilterCache(String expressionStr,String varValue){
- String
key=MapKeyUtil.createKey(FILTER_CACHE_PREPIX,expressionStr,varValue);
+ public Boolean getFilterCache(String expressionStr, String varValue) {
+ String key = MapKeyUtil.createKey(FILTER_CACHE_PREPIX, expressionStr,
varValue);
return (Boolean) this.get(key);
}
-
/**
* 获取基于字段缓存的某些值
*
@@ -176,7 +177,7 @@ public abstract class AbstractContext<T extends IMessage>
extends HashMap {
*/
@Deprecated
public <T> T getValue(String fieldName) {
- return (T)values.get(fieldName);
+ return (T) values.get(fieldName);
}
/**
@@ -222,7 +223,7 @@ public abstract class AbstractContext<T extends IMessage>
extends HashMap {
}
public static <R, C extends AbstractContext> List<IMessage>
executeScript(IMessage channelMessage, C context,
-
List<? extends IBaseStreamOperator<IMessage, R, C>> scriptExpressions) {
+ List<? extends IBaseStreamOperator<IMessage, R, C>> scriptExpressions)
{
List<IMessage> messages = new ArrayList<>();
if (scriptExpressions == null) {
return messages;
@@ -356,7 +357,7 @@ public abstract class AbstractContext<T extends IMessage>
extends HashMap {
context.setSplitModel(this.isSplitModel());
List<T> messages = new ArrayList<>();
for (T tmp : this.getSplitMessages()) {
- messages.add(tmp.copy());
+ messages.add(tmp.deepCopy());
}
context.setSplitMessages(messages);
context.monitor = this.monitor;
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
index 13f1312..244ed3b 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
@@ -17,13 +17,18 @@
package org.apache.rocketmq.streams.common.topology;
import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.streams.common.cache.compress.impl.LongValueKV;
import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
import
org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
@@ -48,6 +53,12 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
private static final long serialVersionUID = -5189371682717444347L;
+ private final transient int duplicateCacheSize = 1000000;
+ private transient LongValueKV duplicateCache;
+ // private transient Map<String, Long> duplicateCache;
+ private transient List<String> duplicateFields;
+ private transient int duplicateCacheExpirationTime;
+
/**
* 是否自动启动channel
*/
@@ -91,6 +102,7 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
/**
* 启动一个channel,并给channel应用pipeline
*/
+
public void startChannel() {
final String monitorName = createPipelineMonitorName();
if (isInitSuccess()) {
@@ -104,8 +116,9 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
pipelineMonitorForChannel = IMonitor.createMonitor(this);
}
try {
- source.start((IStreamOperator<T, T>)(message, context) -> {
+ source.start((IStreamOperator<T, T>) (message, context) -> {
//每条消息一个,监控整个链路
+
IMonitor pipelineMonitorForStage =
context.startMonitor(monitorName);
pipelineMonitorForStage.setType(IMonitor.TYPE_DATAPROCESS);
message.getHeader().setPiplineName(this.getConfigureName());
@@ -127,10 +140,21 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
}
+ private String createDuplicateKey(IMessage message) {
+ List<String> duplicateValues = Lists.newArrayList();
+ for (String field : duplicateFields) {
+ duplicateValues.add(message.getMessageBody().getString(field));
+ }
+ return StringUtil.createMD5Str(String.join("", duplicateValues));
+ }
+
private String createPipelineMonitorName() {
return MapKeyUtil.createKeyBySign(".", getType(), getNameSpace(),
getConfigureName());
}
+ private static AtomicInteger total = new AtomicInteger(0);
+ private static AtomicInteger hitCache = new AtomicInteger(0);
+
/**
* 可以替换某个阶段的阶段,而不用配置的阶段
*
@@ -141,6 +165,26 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
*/
@Override
protected T doMessageInner(T t, AbstractContext context, AbstractStage...
replaceStage) {
+ if (this.duplicateCache != null && this.duplicateFields != null &&
!this.duplicateFields.isEmpty() && !t.getHeader().isSystemMessage()) {
+ total.incrementAndGet();
+ String duplicateKey = createDuplicateKey(t);
+ Long cacheTime = this.duplicateCache.get(duplicateKey);
+ Long currentTime = System.currentTimeMillis();
+ if (cacheTime != null && currentTime - cacheTime <
this.duplicateCacheExpirationTime) {
+ hitCache.incrementAndGet();
+ context.breakExecute();
+ return t;
+ } else {
+ this.duplicateCache.put(duplicateKey, currentTime);
+ if (this.duplicateCache.getSize() > duplicateCacheSize) {
+ this.duplicateCache = new
LongValueKV(this.duplicateCacheSize);
+ }
+ }
+ if (total.get() % 5000 == 0) {
+ System.out.printf("total: %s, hit: %s%n", total.get(),
hitCache.get());
+ }
+ }
+
if (!t.getHeader().isSystemMessage()) {
MessageGloableTrace.joinMessage(t);//关联全局监控器
}
@@ -164,7 +208,8 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
return isTopology(this.channelNextStageLabel);
}
- public void doNextStages(AbstractContext context, String
msgPrewSourceName, List<String> nextStageLabel, String prewSQLNodeName,
AbstractStage... replaceStage) {
+ public void doNextStages(AbstractContext context, String
msgPrewSourceName, List<String> nextStageLabel,
+ String prewSQLNodeName, AbstractStage... replaceStage) {
if (!isTopology(nextStageLabel)) {
return;
@@ -178,7 +223,7 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
if (size > 1) {
copyContext = context.copy();
}
- T msg = (T)copyContext.getMessage();
+ T msg = (T) copyContext.getMessage();
AbstractStage oriStage = stageMap.get(lable);
if (oriStage == null) {
if (stages != null && stages.size() > 0) {
@@ -224,7 +269,7 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
continue;
} else {
if (ChainStage.class.isInstance(stage)) {
- ChainStage chainStage = (ChainStage)stage;
+ ChainStage chainStage = (ChainStage) stage;
String msgSourceName = chainStage.getMsgSourceName();
if (StringUtil.isNotEmpty(msgSourceName)) {
msgPrewSourceName = msgSourceName;
@@ -286,7 +331,7 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
JSONObject jsonObject = null;
if (stage instanceof ChainStage) {
jsonObject = new JSONObject();
- ChainStage chainStage = (ChainStage)stage;
+ ChainStage chainStage = (ChainStage) stage;
return chainStage.toJsonObject();
//String entityName = chainStage.getEntityName();
////todo 需要改写
@@ -324,18 +369,15 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
public void doProcessAfterRefreshConfigurable(IConfigurableService
configurableService) {
for (AbstractStage stage : getStages()) {
stage.setPipeline(this);
- if (IAfterConfigurableRefreshListener.class.isInstance(stage)) {
- if (AbstractConfigurable.class.isInstance(stage)) {
- AbstractConfigurable abstractConfigurable =
(AbstractConfigurable)stage;
- if (abstractConfigurable.isInitSuccess() == false &&
this.isInitSuccess() == false) {
- this.setInitSuccess(false);
- return;
- }
+ if (stage instanceof IAfterConfigurableRefreshListener) {
+ if (!stage.isInitSuccess() && !this.isInitSuccess()) {
+ this.setInitSuccess(false);
+ return;
}
- IAfterConfigurableRefreshListener
afterConfiguableRefreshListerner =
- (IAfterConfigurableRefreshListener)stage;
+ IAfterConfigurableRefreshListener
afterConfigurableRefreshListener =
+ (IAfterConfigurableRefreshListener) stage;
-
afterConfiguableRefreshListerner.doProcessAfterRefreshConfigurable(configurableService);
+
afterConfigurableRefreshListener.doProcessAfterRefreshConfigurable(configurableService);
}
}
@@ -349,9 +391,9 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
startChannel();
}
this.source = source;
- if (AbstractConfigurable.class.isInstance(source)) {
- AbstractConfigurable abstractConfigurable =
(AbstractConfigurable)source;
- if (abstractConfigurable.isInitSuccess() == false &&
this.isInitSuccess()) {
+ if (source instanceof AbstractConfigurable) {
+ AbstractConfigurable abstractConfigurable = (AbstractConfigurable)
source;
+ if (!abstractConfigurable.isInitSuccess() && this.isInitSuccess())
{
this.setInitSuccess(false);
return;
}
@@ -361,6 +403,23 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
if ((isAutoStart || isPublish()) && isInitSuccess()) {
startChannel();
}
+
+ //增加去重的逻辑
+ String duplicateFieldNameStr =
ComponentCreator.getProperties().getProperty(getConfigureName() +
".duplicate.fields.names");
+ if (duplicateFieldNameStr != null && !duplicateFieldNameStr.isEmpty())
{
+ this.duplicateFields = Lists.newArrayList();
+
this.duplicateFields.addAll(Arrays.asList(duplicateFieldNameStr.split(";")));
+ }
+ if (this.duplicateCache == null && this.duplicateFields != null) {
+ this.duplicateCache = new LongValueKV(this.duplicateCacheSize);
+ }
+ String duplicateCacheExpirationStr =
ComponentCreator.getProperties().getProperty(getConfigureName() +
".duplicate.expiration.time");
+ if (duplicateCacheExpirationStr != null &&
!duplicateCacheExpirationStr.isEmpty()) {
+ this.duplicateCacheExpirationTime =
Integer.parseInt(duplicateCacheExpirationStr);
+ } else {
+ this.duplicateCacheExpirationTime = 86400000;
+ }
+
}
public Map<String, AbstractStage> createStageMap() {
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
index 4dff54f..5cbac0f 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
@@ -80,14 +80,14 @@ public class OutputChainStage<T extends IMessage> extends
ChainStage<T> implemen
/**
* 主要是输出可能影响线上数据,可以通过配置文件的开关,把所有的输出,都指定到一个其他输出中
*/
- if(openMockChannel()){
- if(mockSink!=null){
+ if (openMockChannel()) {
+ if (mockSink != null) {
mockSink.batchAdd(message.deepCopy());
return message;
}
return message;
}
- sink.batchAdd(message);
+ sink.batchAdd(message.deepCopy());
return message;
}
@@ -100,23 +100,23 @@ public class OutputChainStage<T extends IMessage> extends
ChainStage<T> implemen
@Override
public void checkpoint(IMessage message, AbstractContext context,
CheckPointMessage checkPointMessage) {
- ISink realSink=null;
- if(openMockChannel()&&mockSink!=null){
- realSink=mockSink;
- }else {
- realSink=sink;
+ ISink realSink = null;
+ if (openMockChannel() && mockSink != null) {
+ realSink = mockSink;
+ } else {
+ realSink = sink;
}
- if(message.getHeader().isNeedFlush()){
- Set<String> queueIds=new HashSet<>();
- if(message.getHeader().getCheckpointQueueIds()!=null){
+ if (message.getHeader().isNeedFlush()) {
+ Set<String> queueIds = new HashSet<>();
+ if (message.getHeader().getCheckpointQueueIds() != null) {
queueIds.addAll(message.getHeader().getCheckpointQueueIds());
}
- if(StringUtil.isNotEmpty(message.getHeader().getQueueId())){
+ if (StringUtil.isNotEmpty(message.getHeader().getQueueId())) {
queueIds.add(message.getHeader().getQueueId());
}
realSink.checkpoint(queueIds);
}
- CheckPointState checkPointState= new CheckPointState();
+ CheckPointState checkPointState = new CheckPointState();
checkPointState.setQueueIdAndOffset(realSink.getFinishedQueueIdAndOffsets(checkPointMessage));
checkPointMessage.reply(checkPointState);
@@ -182,33 +182,33 @@ public class OutputChainStage<T extends IMessage> extends
ChainStage<T> implemen
@Override
public void doProcessAfterRefreshConfigurable(IConfigurableService
configurableService) {
- sink=configurableService.queryConfigurable(ISink.TYPE, sinkName);
- if(sink==null){
+ sink = configurableService.queryConfigurable(ISink.TYPE, sinkName);
+ if (sink == null) {
sink = configurableService.queryConfigurable(IChannel.TYPE,
sinkName);
}
metaData = configurableService.queryConfigurable(MetaData.TYPE,
metaDataName);
- mockSink=getMockChannel(configurableService,sink.getNameSpace());
+ mockSink = getMockChannel(configurableService, sink.getNameSpace());
}
- private ISink getMockChannel(IConfigurableService
configurableService,String nameSpace) {
- String
type=ComponentCreator.getProperties().getProperty("out.mock.type");
- if(type==null){
+ private ISink getMockChannel(IConfigurableService configurableService,
String nameSpace) {
+ String type =
ComponentCreator.getProperties().getProperty("out.mock.type");
+ if (type == null) {
return null;
}
- ISink mockSink=
configurableService.queryConfigurable(ISink.TYPE,OUT_MOCK_SWITCH+"_"+type);
- if(mockSink==null){
- mockSink=
configurableService.queryConfigurable(IChannel.TYPE,OUT_MOCK_SWITCH+"_"+type);
+ ISink mockSink = configurableService.queryConfigurable(ISink.TYPE,
OUT_MOCK_SWITCH + "_" + type);
+ if (mockSink == null) {
+ mockSink = configurableService.queryConfigurable(IChannel.TYPE,
OUT_MOCK_SWITCH + "_" + type);
}
return mockSink;
}
- protected boolean openMockChannel(){
- String
swtich=ComponentCreator.getProperties().getProperty(OUT_MOCK_SWITCH);
- if(swtich==null){
+ protected boolean openMockChannel() {
+ String swtich =
ComponentCreator.getProperties().getProperty(OUT_MOCK_SWITCH);
+ if (swtich == null) {
return false;
}
- if("true".equals(swtich)){
+ if ("true".equals(swtich)) {
return true;
}
return false;
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
index 9f8f46d..fe8a3bb 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
@@ -34,7 +34,8 @@ public class SQLUtil {
private static final String INSERT_IGNORE = "INSERT IGNORE INTO";
private static final String REPLACE = "REPLACE INTO";
- public static String createReplacesInsertSql(MetaData metaData,
Map<String, Object> fieldName2Value, Boolean containsIdField) {
+ public static String createReplacesInsertSql(MetaData metaData,
Map<String, Object> fieldName2Value,
+ Boolean containsIdField) {
String insertSQL = createInsertSql(metaData, fieldName2Value,
containsIdField);
insertSQL = insertSQL.replaceFirst(INSERT, REPLACE);
return insertSQL;
@@ -60,7 +61,8 @@ public class SQLUtil {
return stringBuilder.toString();
}
- public static String createIgnoreInsertSql(MetaData metaData, Map<String,
Object> fieldName2Value, Boolean containsIdField) {
+ public static String createIgnoreInsertSql(MetaData metaData, Map<String,
Object> fieldName2Value,
+ Boolean containsIdField) {
String insertSQL = createInsertSql(metaData, fieldName2Value,
containsIdField);
insertSQL = insertSQL.replaceFirst(INSERT, INSERT_IGNORE);
return insertSQL;
@@ -70,7 +72,8 @@ public class SQLUtil {
return createInsertSql(metaData, fieldName2Value, null);
}
- public static String createInsertSql(MetaData metaData, Map<String,
Object> fieldName2Value, Boolean containsIdField) {
+ public static String createInsertSql(MetaData metaData, Map<String,
Object> fieldName2Value,
+ Boolean containsIdField) {
StringBuilder sql = new StringBuilder(INSERT + " " +
metaData.getTableName() + "(");
StringBuilder fieldSql = new StringBuilder();
@@ -106,7 +109,8 @@ public class SQLUtil {
return stringBuilder.toString();
}
- protected static String createInsertValuesSQL(MetaData metaData,
Map<String, Object> fieldName2Value, StringBuilder fieldSql, StringBuilder
valueSql) {
+ protected static String createInsertValuesSQL(MetaData metaData,
Map<String, Object> fieldName2Value,
+ StringBuilder fieldSql, StringBuilder valueSql) {
boolean isIncrement = true;
if (fieldName2Value.containsKey(metaData.getIdFieldName())) {
isIncrement = false;
@@ -114,7 +118,8 @@ public class SQLUtil {
return createInsertValuesSQL(metaData, fieldName2Value, fieldSql,
valueSql, isIncrement);
}
- protected static String createInsertValuesSQL(MetaData metaData,
Map<String, Object> fieldName2Value, StringBuilder fieldSql, StringBuilder
valueSql, boolean containsIdField) {
+ protected static String createInsertValuesSQL(MetaData metaData,
Map<String, Object> fieldName2Value,
+ StringBuilder fieldSql, StringBuilder valueSql, boolean
containsIdField) {
boolean isFirst = true;
valueSql.append("(");
//if (fieldName2Value.containsKey(metaData.getIdFieldName())) {
@@ -172,16 +177,16 @@ public class SQLUtil {
// }
String result = null;
if (DataTypeUtil.isDate(field.getDataType().getDataClass())) {
- result = DateUtil.format((Date)value);
+ result = DateUtil.format((Date) value);
} else if (JSONObject.class.isInstance(value)) {
- result = ((JSONObject)value).toJSONString();
+ result = ((JSONObject) value).toJSONString();
} else {
result = value.toString();
}
return "'" + handleSpecialCharInSql(result) + "'";
} else {
if (DataTypeUtil.isBoolean(field.getDataType().getDataClass())) {
- boolean boolValue = (Boolean)value;
+ boolean boolValue = (Boolean) value;
return boolValue ? "1" : "0";
}
return value + "";
@@ -261,12 +266,12 @@ public class SQLUtil {
if (object == null) {
return ibatisSQL;
}
- if (object == null || StringUtil.isEmpty(ibatisSQL)) {
+ if (StringUtil.isEmpty(ibatisSQL)) {
return null;
}
List<String> vars = parseIbatisSQLVars(ibatisSQL);
- if (vars == null || vars.size() == 0) {
+ if (vars.size() == 0) {
return ibatisSQL;
}
String sql = ibatisSQL;
@@ -274,19 +279,22 @@ public class SQLUtil {
Object value = getBeanFieldValue(object, varName);
String valueSQL = null;
- if (value != null & !String.class.isInstance(value) &&
!Date.class.isInstance(value)) {
+ if (value != null & !(value instanceof String) &&
!Date.class.isInstance(value)) {
valueSQL = value.toString();
}
- if (value != null && String.class.isInstance(value)) {
+ if (value instanceof String) {
value = value.toString().replace("'", "''");
+ if (value.toString().contains("\\")) {
+ value = value.toString().replaceAll("\\\\", "\\\\\\\\");
+ }
if (containsQuotation && (ibatisSQL.indexOf("'#{" + varName +
"}'") > -1 || ibatisSQL.indexOf("`#{" + varName + "}`") > -1)) {
valueSQL = value + "";
} else {
valueSQL = "'" + value + "'";
}
}
- if (value != null && Date.class.isInstance(value)) {
- String valueDate = DateUtil.format((Date)value);
+ if (value instanceof Date) {
+ String valueDate = DateUtil.format((Date) value);
if (containsQuotation && ibatisSQL.indexOf("'#{" + varName +
"}'") > -1) {
valueSQL = valueDate;
} else {
@@ -314,20 +322,20 @@ public class SQLUtil {
}
protected static Object getBeanFieldValue(Object object, String varName) {
- if (JSONObject.class.isInstance(object)) {
- JSONObject jsonObject = (JSONObject)object;
+ if (object instanceof JSONObject) {
+ JSONObject jsonObject = (JSONObject) object;
return jsonObject.get(varName);
- } else if (Map.class.isInstance(object)) {
- Map<String, Object> paras = (Map)object;
+ } else if (object instanceof Map) {
+ Map<String, Object> paras = (Map) object;
return paras.get(varName);
} else {
- if (IConfigurable.class.isInstance(object) &&
varName.equals(IConfigurable.JSON_PROPERTY)) {
- IConfigurable configurable = (IConfigurable)object;
+ if (object instanceof IConfigurable &&
varName.equals(IConfigurable.JSON_PROPERTY)) {
+ IConfigurable configurable = (IConfigurable) object;
return configurable.toJson();
}
- if (BasedConfigurable.class.isInstance(object) &&
varName.equals(IConfigurable.STATUS_PROPERTY)) {
- BasedConfigurable basedConfigurable =
(BasedConfigurable)object;
+ if (object instanceof BasedConfigurable &&
varName.equals(IConfigurable.STATUS_PROPERTY)) {
+ BasedConfigurable basedConfigurable = (BasedConfigurable)
object;
return basedConfigurable.getStatus();
}
return ReflectUtil.getBeanFieldValue(object, varName);
@@ -406,7 +414,7 @@ public class SQLUtil {
if (!isString) {
stringBuilder.append(value);
} else {
- stringBuilder.append("'" + value + "'");
+ StringBuilder append = stringBuilder.append("'" + value +
"'");
}
}
diff --git
a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
index 896e621..25c5bee 100644
---
a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
+++
b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.streams.configurable.service;
import com.alibaba.fastjson.JSONObject;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -27,6 +28,7 @@ import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
@@ -133,9 +135,7 @@ public abstract class AbstractConfigurableService
implements IConfigurableServic
if (configures != null && configures.isQuerySuccess() &&
configures.getConfigurables() != null) {
// List<Configure> configureList =
filterConfigure(configures.getConfigure());
List<IConfigurable> configurables = configures.getConfigurables();
- List<IConfigurable> configurableList =
checkAndUpdateConfigurables(namespace, configurables,
- tempType2ConfigurableMap, tempName2ConfigurableMap,
- configures.getConfigurables());
+ List<IConfigurable> configurableList =
checkAndUpdateConfigurables(namespace, configurables, tempType2ConfigurableMap,
tempName2ConfigurableMap, configures.getConfigurables());
// this.namespace2ConfigurableMap = namespace2ConfigurableMap;
for (IConfigurable configurable : configurableList) {
if (configurable instanceof IAfterConfigurableRefreshListener)
{
diff --git
a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
index 9080479..e51e94f 100644
---
a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
+++
b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
@@ -317,7 +317,7 @@ public class RuleContext extends AbstractContext<Message>
implements Serializabl
@Override
public AbstractContext copy() {
- IMessage message = this.message.copy();
+ IMessage message = this.message.deepCopy();
RuleContext context = new RuleContext(nameSpace,
message.getMessageBody(), rule, contextConfigure);
super.copyProperty(context);
context.actionExecutor = actionExecutor;
diff --git
a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
index bd7ef82..c5a0c27 100644
---
a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
+++
b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.utils.DateUtil;
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
index 2f2ccf1..3255a69 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
@@ -48,7 +48,7 @@ public class FunctionContext<T extends IMessage>
@Override
public AbstractContext copy() {
- IMessage message = this.message.copy();
+ IMessage message = this.message.deepCopy();
FunctionContext context = new FunctionContext(message);
super.copyProperty(context);
context.setFunctionService(this.functionService);
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/FieldFunction.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/FieldFunction.java
index 2541a82..6d375be 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/FieldFunction.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/FieldFunction.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.streams.script.function.impl.field;
+import com.google.common.collect.Lists;
+import java.util.List;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.script.annotation.Function;
@@ -29,17 +31,17 @@ public class FieldFunction {
@FunctionMethod(value = "field", alias = "get", comment = "获取字段值")
public <T> T getFieldValue(IMessage message, FunctionContext context,
- @FunctionParamter(value = "string", comment =
"字段的名称,不需要引号") String fieldName) {
+ @FunctionParamter(value = "string", comment = "字段的名称,不需要引号") String
fieldName) {
String name = FunctionUtils.getValueString(message, context,
fieldName);
if (StringUtil.isEmpty(name)) {
name = fieldName;
}
- return (T)message.getMessageBody().get(name);
+ return (T) message.getMessageBody().get(name);
}
@FunctionMethod(value = "char_length", alias = "len", comment =
"求字段代码字符串或常量的长度")
public int len(IMessage message, FunctionContext context,
- @FunctionParamter(value = "string", comment =
"代表字符串的字段名或常量") String fieldName) {
+ @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String
fieldName) {
String value = FunctionUtils.getValueString(message, context,
fieldName);
if (StringUtil.isEmpty(value)) {
return 0;
@@ -49,7 +51,7 @@ public class FieldFunction {
@FunctionMethod(value = "lower", alias = "low", comment = "把字符串转换称小写")
public String lower(IMessage message, FunctionContext context,
- @FunctionParamter(value = "string", comment =
"代表字符串的字段名或常量") String fieldName) {
+ @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String
fieldName) {
String value = FunctionUtils.getValueString(message, context,
fieldName);
if (StringUtil.isEmpty(value)) {
return null;
@@ -59,7 +61,7 @@ public class FieldFunction {
@FunctionMethod(value = "concat", comment = "连接字符串")
public String concat(IMessage message, FunctionContext context,
- @FunctionParamter(value = "string", comment =
"代表字符串的字段名或常量") String... fieldNames) {
+ @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量")
String... fieldNames) {
StringBuilder sb = new StringBuilder();
for (String fieldName : fieldNames) {
String value = FunctionUtils.getValueString(message, context,
fieldName);
@@ -70,28 +72,30 @@ public class FieldFunction {
@FunctionMethod(value = "concat_ws", alias = "concat_sign", comment =
"通过分隔符把字符串拼接在一起")
public String concat_ws(IMessage message, FunctionContext context,
- @FunctionParamter(value = "string", comment =
"代表分隔符的字段名或常量") String sign,
- @FunctionParamter(value = "string", comment =
"代表字符串的字段名或常量") String... fieldNames) {
- StringBuilder sb = new StringBuilder();
- boolean isFirst = true;
+ @FunctionParamter(value = "string", comment = "代表分隔符的字段名或常量") String
sign,
+ @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量")
String... fieldNames) {
sign = FunctionUtils.getValueString(message, context, sign);
+ if (sign == null) {
+ sign = ",";
+ }
+ List<String> values = Lists.newArrayList();
for (String fieldName : fieldNames) {
- if (isFirst) {
- isFirst = false;
- } else {
- sb.append(sign);
- }
String value = FunctionUtils.getValueString(message, context,
fieldName);
- sb.append(value);
+ if (value != null) {
+ values.add(value);
+ }
}
- return sb.toString();
+ if (values.isEmpty()) {
+ return null;
+ }
+ return String.join(sign, values);
}
@FunctionMethod(value = "lpad", comment =
"在原串左边补n个pad字符串,如果原串长度小于len,则截断,使得整个字符串长度为len")
public String lpad(IMessage message, FunctionContext context,
- @FunctionParamter(value = "string", comment =
"代表字符串的字段名或常量") String ori,
- @FunctionParamter(value = "string", comment =
"代表字符串长度字段名,数字或常量") String lenStr,
- @FunctionParamter(value = "string", comment =
"代表补齐字符串的字段名或常量") String pad) {
+ @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String
ori,
+ @FunctionParamter(value = "string", comment = "代表字符串长度字段名,数字或常量")
String lenStr,
+ @FunctionParamter(value = "string", comment = "代表补齐字符串的字段名或常量") String
pad) {
if (StringUtil.isEmpty(ori) || pad == null) {
return null;
}
@@ -121,9 +125,9 @@ public class FieldFunction {
@FunctionMethod(value = "rpad", comment =
"在原串左边补n个pad字符串,如果原串长度小于len,则截断,使得整个字符串长度为len")
public String rpad(IMessage message, FunctionContext context,
- @FunctionParamter(value = "string", comment =
"代表字符串的字段名或常量") String ori,
- @FunctionParamter(value = "string", comment =
"代表字符串长度字段名,数字或常量") String lenStr,
- @FunctionParamter(value = "string", comment =
"代表补齐字符串的字段名或常量") String pad) {
+ @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String
ori,
+ @FunctionParamter(value = "string", comment = "代表字符串长度字段名,数字或常量")
String lenStr,
+ @FunctionParamter(value = "string", comment = "代表补齐字符串的字段名或常量") String
pad) {
if (StringUtil.isEmpty(ori) || pad == null) {
return null;
}
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
index f076696..85f5727 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
@@ -18,8 +18,10 @@ package
org.apache.rocketmq.streams.script.function.impl.flatmap;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
+
import java.util.List;
import java.util.Map;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.context.IMessage;
@@ -132,7 +134,7 @@ public class SplitArrayFunction {
context.openSplitModel();
for (int i = 0; i < values.length; i++) {
String value = values[i];
- if("null".equals(value.toLowerCase())){
+ if ("null".equalsIgnoreCase(value)) {
continue;
}
IMessage newMessage = channelMessage.deepCopy();
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/AbstractScriptProxy.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/AbstractScriptProxy.java
index 6bdfc5a..5071a02 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/AbstractScriptProxy.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/AbstractScriptProxy.java
@@ -29,27 +29,26 @@ import
org.apache.rocketmq.streams.script.utils.FunctionUtils;
public abstract class AbstractScriptProxy implements IScriptExpression {
protected IScriptExpression origExpression;
+
public AbstractScriptProxy(IScriptExpression origExpression) {
- this.origExpression=origExpression;
+ this.origExpression = origExpression;
}
public abstract List<ICacheFilter> getCacheFilters();
+ public abstract boolean supportOptimization(IScriptExpression
scriptExpression);
-
- public abstract boolean supportOptimization(IScriptExpression
scriptExpression) ;
protected String getParameterValue(IScriptParamter scriptParamter) {
if (!ScriptParameter.class.isInstance(scriptParamter)) {
return null;
}
- ScriptParameter parameter = (ScriptParameter)scriptParamter;
+ ScriptParameter parameter = (ScriptParameter) scriptParamter;
if (parameter.getRigthVarName() != null) {
return null;
}
return FunctionUtils.getConstant(parameter.getLeftVarName());
}
-
@Override public List<IScriptParamter> getScriptParamters() {
return this.origExpression.getScriptParamters();
}
@@ -63,7 +62,7 @@ public abstract class AbstractScriptProxy implements
IScriptExpression {
}
@Override public Object getScriptParamter(IMessage message,
FunctionContext context) {
- return this.origExpression.getScriptParamter(message,context);
+ return this.origExpression.getScriptParamter(message, context);
}
public void setOrigExpression(IScriptExpression origExpression) {
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/CaseScriptExpressionProxy.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/CaseScriptExpressionProxy.java
index 6f6140c..4b507c6 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/CaseScriptExpressionProxy.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/CaseScriptExpressionProxy.java
@@ -31,39 +31,40 @@ public class CaseScriptExpressionProxy extends
AbstractScriptProxy {
}
@Override public List<ICacheFilter> getCacheFilters() {
- List<ICacheFilter> result=new ArrayList<>();
- GroupScriptExpression
groupScriptExpression=(GroupScriptExpression)this.origExpression;
- recursion(groupScriptExpression,result);
+ List<ICacheFilter> result = new ArrayList<>();
+ GroupScriptExpression groupScriptExpression = (GroupScriptExpression)
this.origExpression;
+ recursion(groupScriptExpression, result);
return result;
}
/**
* recursion else if GroupScriptExpression list
+ *
* @param groupScriptExpression
* @param cacheFilters
*/
- protected void recursion(GroupScriptExpression
groupScriptExpression,List<ICacheFilter> cacheFilters){
- IScriptExpression scriptExpression=
groupScriptExpression.getIfExpresssion();
- AbstractScriptProxy abstractExpressionProxy=
ScriptProxyFactory.getInstance().create(scriptExpression);
- if(abstractExpressionProxy!=null){
+ protected void recursion(GroupScriptExpression groupScriptExpression,
List<ICacheFilter> cacheFilters) {
+ IScriptExpression scriptExpression =
groupScriptExpression.getIfExpresssion();
+ AbstractScriptProxy abstractExpressionProxy =
ScriptProxyFactory.getInstance().create(scriptExpression);
+ if (abstractExpressionProxy != null) {
groupScriptExpression.setIfExpresssion(abstractExpressionProxy);
cacheFilters.addAll(abstractExpressionProxy.getCacheFilters());
}
- if(groupScriptExpression.getElseIfExpressions()!=null){
- for(GroupScriptExpression
expression:groupScriptExpression.getElseIfExpressions()){
- recursion(expression,cacheFilters);
+ if (groupScriptExpression.getElseIfExpressions() != null) {
+ for (GroupScriptExpression expression :
groupScriptExpression.getElseIfExpressions()) {
+ recursion(expression, cacheFilters);
}
}
}
@Override public boolean supportOptimization(IScriptExpression
scriptExpression) {
- if(scriptExpression instanceof GroupScriptExpression){
- return true;
- }
- return false;
+ if (scriptExpression instanceof GroupScriptExpression) {
+ return true;
+ }
+ return false;
}
@Override public Object executeExpression(IMessage message,
FunctionContext context) {
- return this.origExpression.executeExpression(message,context);
+ return this.origExpression.executeExpression(message, context);
}
}
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/EqualsProxy.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/EqualsProxy.java
index ba1d06d..5f9aca9 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/EqualsProxy.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/EqualsProxy.java
@@ -17,10 +17,7 @@
package org.apache.rocketmq.streams.script.optimization.performance;
-import
org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilter;
-import
org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilterBulider;
import
org.apache.rocketmq.streams.script.function.impl.condition.EqualsFunction;
-import org.apache.rocketmq.streams.script.function.impl.string.RegexFunction;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.service.IScriptParamter;
@@ -40,7 +37,7 @@ public class EqualsProxy extends SimpleScriptExpressionProxy {
}
@Override protected String getVarName() {
- return
getParameterValue((IScriptParamter)this.origExpression.getScriptParamters().get(0));
+ return getParameterValue((IScriptParamter)
this.origExpression.getScriptParamters().get(0));
}
}
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/RegexProxy.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/RegexProxy.java
index d3cc1b2..30c214a 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/RegexProxy.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/RegexProxy.java
@@ -17,14 +17,11 @@
package org.apache.rocketmq.streams.script.optimization.performance;
-import
org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilter;
-import
org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilterBulider;
-import
org.apache.rocketmq.streams.script.function.impl.condition.EqualsFunction;
import org.apache.rocketmq.streams.script.function.impl.string.RegexFunction;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.service.IScriptParamter;
-public class RegexProxy extends SimpleScriptExpressionProxy {
+public class RegexProxy extends SimpleScriptExpressionProxy {
public RegexProxy(IScriptExpression origExpression) {
super(origExpression);
}
@@ -40,8 +37,7 @@ public class RegexProxy extends SimpleScriptExpressionProxy {
}
@Override protected String getVarName() {
- return
getParameterValue((IScriptParamter)this.origExpression.getScriptParamters().get(0));
+ return getParameterValue((IScriptParamter)
this.origExpression.getScriptParamters().get(0));
}
-
}
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptExpressionGroupsProxy.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptExpressionGroupsProxy.java
index 220350c..5433e60 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptExpressionGroupsProxy.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptExpressionGroupsProxy.java
@@ -30,28 +30,31 @@ import
org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.service.IScriptParamter;
public class ScriptExpressionGroupsProxy extends CacheFilterManager implements
IScriptExpression {
- protected List<IScriptExpression> scriptExpressions=new ArrayList<>();
+ protected List<IScriptExpression> scriptExpressions = new ArrayList<>();
public ScriptExpressionGroupsProxy(int elementCount, int capacity) {
super(elementCount, capacity);
}
+
public void removeLessCount() {
- Map<String, CacheFilterGroup> newFilterOptimizationMap=new HashMap<>();
- for(String varName:this.filterOptimizationMap.keySet()){
- CacheFilterGroup cacheFilterGroup
=this.filterOptimizationMap.get(varName);
- if(cacheFilterGroup.getSize()>5){
- newFilterOptimizationMap.put(varName,cacheFilterGroup);
+ Map<String, CacheFilterGroup> newFilterOptimizationMap = new
HashMap<>();
+ for (String varName : this.filterOptimizationMap.keySet()) {
+ CacheFilterGroup cacheFilterGroup =
this.filterOptimizationMap.get(varName);
+ if (cacheFilterGroup.getSize() > 5) {
+ newFilterOptimizationMap.put(varName, cacheFilterGroup);
}
}
- this.filterOptimizationMap=newFilterOptimizationMap;
+ this.filterOptimizationMap = newFilterOptimizationMap;
}
- public void addScriptExpression(IScriptExpression scriptExpression){
+
+ public void addScriptExpression(IScriptExpression scriptExpression) {
this.scriptExpressions.add(scriptExpression);
}
+
@Override public Object executeExpression(IMessage message,
FunctionContext context) {
- this.execute(message,context);
- for(IScriptExpression scriptExpression:scriptExpressions){
- scriptExpression.executeExpression(message,context);
+ this.execute(message, context);
+ for (IScriptExpression scriptExpression : scriptExpressions) {
+ scriptExpression.executeExpression(message, context);
}
return null;
}
@@ -84,5 +87,4 @@ public class ScriptExpressionGroupsProxy extends
CacheFilterManager implements I
return null;
}
-
}
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptOptimization.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptOptimization.java
index ae147ee..9d43e2b 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptOptimization.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptOptimization.java
@@ -16,41 +16,33 @@
*/
package org.apache.rocketmq.streams.script.optimization.performance;
+import
org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilter;
+import
org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
+import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
+import org.apache.rocketmq.streams.script.service.IScriptExpression;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.optimization.HyperscanRegex;
-import
org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilter;
-import org.apache.rocketmq.streams.script.context.FunctionContext;
-import
org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
-import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
-import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
-import org.apache.rocketmq.streams.script.service.IScriptExpression;
public class ScriptOptimization {
protected String name;//function script namespace,name
-
/**
* optimizate expression
*/
- protected ScriptExpressionGroupsProxy scriptExpressionGroupsProxy =new
ScriptExpressionGroupsProxy(160,1000000);
+ protected ScriptExpressionGroupsProxy scriptExpressionGroupsProxy = new
ScriptExpressionGroupsProxy(160, 1000000);
//the optimizated script
protected List<IScriptExpression> scriptExpressions;
-
//newFieldName created in the script
protected Map<String, IScriptExpression> newFieldName2Expressions = new
HashMap<>();
-
/**
* Optimization once
*/
@@ -61,8 +53,8 @@ public class ScriptOptimization {
*
* @param scriptExpressions
*/
- public ScriptOptimization(String name,List<IScriptExpression>
scriptExpressions) {
- this.name=name;
+ public ScriptOptimization(String name, List<IScriptExpression>
scriptExpressions) {
+ this.name = name;
this.scriptExpressions = scriptExpressions;
/**
@@ -91,8 +83,6 @@ public class ScriptOptimization {
return false;
}
-
-
/**
* 把表达式拆成3段,创建变量的,正则类,其他。正则类用HyperscanRegex做优化
*/
@@ -100,7 +90,7 @@ public class ScriptOptimization {
if (!startOptimization.compareAndSet(false, true)) {
return this.scriptExpressions;
}
- Set<String> newVarNames=new HashSet<>();
+ Set<String> newVarNames = new HashSet<>();
List<IScriptExpression> allScriptExpressions = new
ArrayList<>();//最终输出的表达式列表
List<IScriptExpression> proxyExpressions = new
ArrayList<>();//最后执行的脚本,在执行完正则后执行的部分
List<IScriptExpression> lastExpressions = new
ArrayList<>();//最后执行的脚本,在执行完正则后执行的部分
@@ -108,46 +98,39 @@ public class ScriptOptimization {
for (IScriptExpression scriptExpression : scriptExpressions) {
Set<String> newFieldNames = scriptExpression.getNewFieldNames();
- if (newFieldNames != null&&newFieldNames.size() > 0) {
+ if (newFieldNames != null && newFieldNames.size() > 0) {
String newFieldName = newFieldNames.iterator().next();
newVarNames.add(newFieldName);
}
-
-
-
-
- IScriptExpression scriptExpressionProxy =
createProxy(scriptExpression,newVarNames);
+ IScriptExpression scriptExpressionProxy =
createProxy(scriptExpression, newVarNames);
String functionName = scriptExpressionProxy.getFunctionName();
- if(scriptExpressionProxy instanceof AbstractScriptProxy){
+ if (scriptExpressionProxy instanceof AbstractScriptProxy) {
proxyExpressions.add(scriptExpressionProxy);
- }else if("trim".equals(functionName) ||
"lower".equals(functionName) || "concat".equals(functionName)){
+ } else if ("trim".equals(functionName) ||
"lower".equals(functionName) || "concat".equals(functionName)) {
mapExpressions.add(scriptExpressionProxy);
- }else {
+ } else {
lastExpressions.add(scriptExpressionProxy);
}
}
allScriptExpressions.addAll(mapExpressions);//把优先执行的表达式添加上
- if(this.scriptExpressionGroupsProxy.scriptExpressions.size()>0){
+ if (this.scriptExpressionGroupsProxy.scriptExpressions.size() > 0) {
allScriptExpressions.add(this.scriptExpressionGroupsProxy);
}
allScriptExpressions.addAll(lastExpressions);//把剩余的表达式增加到list中
- this.scriptExpressions=allScriptExpressions;
+ this.scriptExpressions = allScriptExpressions;
this.scriptExpressionGroupsProxy.removeLessCount();
return this.scriptExpressions;
}
-
-
-
/**
* 如果脚本中有较多的正则表达式,则统一注册到正则库,并行执行。
*
* @param scriptExpression
* @return
*/
- protected IScriptExpression createProxy(IScriptExpression
scriptExpression,Set<String> newVarNames) {
+ protected IScriptExpression createProxy(IScriptExpression
scriptExpression, Set<String> newVarNames) {
AbstractScriptProxy scriptProxy =
ScriptProxyFactory.getInstance().create(scriptExpression);
if (scriptProxy == null) {
return scriptExpression;
@@ -158,22 +141,21 @@ public class ScriptOptimization {
* 如果依赖的字段是其他脚本产生的,则不做优化
*/
for (String fieldName : dependentFields) {
- if
(newFieldName2Expressions.containsKey(fieldName)&&!newVarNames.contains(fieldName))
{
+ if (newFieldName2Expressions.containsKey(fieldName) &&
!newVarNames.contains(fieldName)) {
return scriptExpression;
}
}
this.scriptExpressionGroupsProxy.addScriptExpression(scriptProxy);
- List<ICacheFilter> cacheFilters=scriptProxy.getCacheFilters();
- if(cacheFilters!=null){
- for(ICacheFilter cacheFilter:cacheFilters){
-
this.scriptExpressionGroupsProxy.addOptimizationExpression(this.name,cacheFilter);
+ List<ICacheFilter> cacheFilters = scriptProxy.getCacheFilters();
+ if (cacheFilters != null) {
+ for (ICacheFilter cacheFilter : cacheFilters) {
+
this.scriptExpressionGroupsProxy.addOptimizationExpression(this.name,
cacheFilter);
}
}
return scriptProxy;
}
-
public static void main(String[] args) {
String scriptValue = "source='netstat_ob';\n"
+
"____regex_10001=regex(std_cmdline,'^(((/?([a-zA-Z0-9_\\.\\-]+/){1,20})bin/)|/bin/|/|-)?"
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptProxyFactory.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptProxyFactory.java
index 514d6dd..5118c1d 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptProxyFactory.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptProxyFactory.java
@@ -26,33 +26,31 @@ import org.apache.rocketmq.streams.common.utils.ReflectUtil;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
public class ScriptProxyFactory {
- protected List<AbstractScriptProxy> expressionProxies=new ArrayList<>();
- protected static ScriptProxyFactory expressionProxyFactory=new
ScriptProxyFactory();
- protected static AtomicBoolean isFinishScan=new AtomicBoolean(false);
- protected AbstractScan scan=new AbstractScan() {
+ protected List<AbstractScriptProxy> expressionProxies = new ArrayList<>();
+ protected static ScriptProxyFactory expressionProxyFactory = new
ScriptProxyFactory();
+ protected static AtomicBoolean isFinishScan = new AtomicBoolean(false);
+ protected AbstractScan scan = new AbstractScan() {
@Override protected void doProcessor(Class clazz) {
-
if(AbstractScriptProxy.class.isAssignableFrom(clazz)&&!Modifier.isAbstract(clazz.getModifiers())){
- AbstractScriptProxy
abstractExpressionProxy=(AbstractScriptProxy)ReflectUtil.forInstance(clazz,new
Class[]{IScriptExpression.class},new Object[]{null});
+ if (AbstractScriptProxy.class.isAssignableFrom(clazz) &&
!Modifier.isAbstract(clazz.getModifiers())) {
+ AbstractScriptProxy abstractExpressionProxy =
(AbstractScriptProxy) ReflectUtil.forInstance(clazz, new Class[]
{IScriptExpression.class}, new Object[] {null});
expressionProxies.add(abstractExpressionProxy);
}
}
};
-
-
- public static ScriptProxyFactory getInstance(){
- if(isFinishScan.compareAndSet(false,true)){
+ public static ScriptProxyFactory getInstance() {
+ if (isFinishScan.compareAndSet(false, true)) {
expressionProxyFactory.scan.scanPackages("org.apache.rocketmq.streams.script.optimization.performance");
expressionProxyFactory.scan.scanPackages("org.apache.rocketmq.streams.filter.optimization");
}
return expressionProxyFactory;
}
- public AbstractScriptProxy create(IScriptExpression oriScriptExpression){
- for(AbstractScriptProxy abstractExpressionProxy: expressionProxies){
+ public AbstractScriptProxy create(IScriptExpression oriScriptExpression) {
+ for (AbstractScriptProxy abstractExpressionProxy : expressionProxies) {
abstractExpressionProxy.setOrigExpression(oriScriptExpression);
-
if(abstractExpressionProxy.supportOptimization(oriScriptExpression)){
- return
(AbstractScriptProxy)ReflectUtil.forInstance(abstractExpressionProxy.getClass(),new
Class[]{IScriptExpression.class},new Object[]{oriScriptExpression});
+ if
(abstractExpressionProxy.supportOptimization(oriScriptExpression)) {
+ return (AbstractScriptProxy)
ReflectUtil.forInstance(abstractExpressionProxy.getClass(), new Class[]
{IScriptExpression.class}, new Object[] {oriScriptExpression});
}
}
return null;
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/SimpleScriptExpressionProxy.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/SimpleScriptExpressionProxy.java
index 249b10a..84bf529 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/SimpleScriptExpressionProxy.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/SimpleScriptExpressionProxy.java
@@ -31,21 +31,23 @@ public abstract class SimpleScriptExpressionProxy extends
AbstractScriptProxy {
public SimpleScriptExpressionProxy(IScriptExpression origExpression) {
super(origExpression);
}
- protected List<ICacheFilter> optimizationExpressions=null;
+
+ protected List<ICacheFilter> optimizationExpressions = null;
+
@Override
public List<ICacheFilter> getCacheFilters() {
- IScriptExpression scriptExpression=this.origExpression;
- if(this.optimizationExpressions==null){
- synchronized (this){
- if(this.optimizationExpressions==null){
- List<ICacheFilter> optimizationExpressions=new
ArrayList<>();
- optimizationExpressions.add(new
AbstractCacheFilter(getVarName(),this.origExpression) {
+ IScriptExpression scriptExpression = this.origExpression;
+ if (this.optimizationExpressions == null) {
+ synchronized (this) {
+ if (this.optimizationExpressions == null) {
+ List<ICacheFilter> optimizationExpressions = new
ArrayList<>();
+ optimizationExpressions.add(new
AbstractCacheFilter(getVarName(), this.origExpression) {
@Override public boolean
executeOrigExpression(IMessage message, AbstractContext context) {
FunctionContext functionContext = new
FunctionContext(message);
if (context != null) {
context.syncSubContext(functionContext);
}
- Boolean
isMatch=(Boolean)scriptExpression.executeExpression(message,functionContext);
+ Boolean isMatch = (Boolean)
scriptExpression.executeExpression(message, functionContext);
if (context != null) {
context.syncContext(functionContext);
@@ -53,7 +55,7 @@ public abstract class SimpleScriptExpressionProxy extends
AbstractScriptProxy {
return isMatch;
}
});
- this.optimizationExpressions=optimizationExpressions;
+ this.optimizationExpressions = optimizationExpressions;
}
}
}
@@ -61,15 +63,13 @@ public abstract class SimpleScriptExpressionProxy extends
AbstractScriptProxy {
}
-
@Override public Object executeExpression(IMessage message,
FunctionContext context) {
- Boolean value=
this.optimizationExpressions.get(0).execute(message,context);
-
if(this.origExpression.getNewFieldNames()!=null&&this.origExpression.getNewFieldNames().size()>0){
+ Boolean value = this.optimizationExpressions.get(0).execute(message,
context);
+ if (this.origExpression.getNewFieldNames() != null &&
this.origExpression.getNewFieldNames().size() > 0) {
message.getMessageBody().put(this.origExpression.getNewFieldNames().iterator().next(),
value);
}
return value;
}
-
protected abstract String getVarName();
}
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
index 6b05827..a75d300 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
@@ -87,7 +88,7 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
/**
* 用消息中的哪个字段做时间字段
*/
- protected String timeFieldName ;
+ protected String timeFieldName;
/**
* having column in having clause eg: key:'having_sum_0001'
value:'having_sum_0001=SUM(OrderPrice)<2000' note: here ignore the logical
relation value may be multi expression which split by ${SCRIPT_SPLIT_CHAR}
update: change sql(move the function into select clause) to escape function in
having clause
@@ -102,7 +103,7 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
/**
* SQL中group by的字段,使用;拼接,如"name;age"
*/
- protected String groupByFieldName ;
+ protected String groupByFieldName;
/**
* 意义同blink中,允许最晚的消息到达时间,单位是分钟
@@ -120,7 +121,7 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
/**
* 主要是做兼容,以前设计的窗口时间是分钟为单位,如果有秒作为窗口时间的,通过设置timeUntiAdjust=1来实现。
后续需要调整成直接秒级窗口
*/
- protected int timeUnitAdjust=60;
+ protected int timeUnitAdjust = 60;
/**
* the variable name of window size which can be got from message
*/
@@ -144,14 +145,14 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
/**
*
默认为空,窗口的触发类似flink,在测试模式下,因为消息有界,期望当消息发送完成后能触发,可以设置两条消息的最大间隔,超过这个间隔,将直接触发消息
*/
- protected Long msgMaxGapSecond=10L;
+ protected Long msgMaxGapSecond = 10L;
/**
* 是否支持过期数据的计算 过期:当前时间大于数据所在窗口的触发时间
*/
- protected int
fireMode=0;//0:普通触发,firetime后收到数据丢弃;1:多实例多次独立触发,在watermark时间内,同starttime,endtime创建多个实例,多次触发;2.单实例,多次独立触发,每次触发是最新值
+ protected int fireMode =
0;//0:普通触发,firetime后收到数据丢弃;1:多实例多次独立触发,在watermark时间内,同starttime,endtime创建多个实例,多次触发;2.单实例,多次独立触发,每次触发是最新值
- protected boolean isLocalStorageOnly=true;//是否只用本地存储,可以提高性能,但不保证可靠性
+ protected boolean isLocalStorageOnly = true;//是否只用本地存储,可以提高性能,但不保证可靠性
protected String reduceSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
protected transient IReducer reducer;
/**
@@ -205,12 +206,12 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
/**
* 如果没有db配置,不开启远程存储服务
*/
- if(!ORMUtil.hasConfigueDB()){
- isLocalStorageOnly=true;
+ if (!ORMUtil.hasConfigueDB()) {
+ isLocalStorageOnly = true;
}
- sqlCache=new SQLCache(isLocalStorageOnly);
- AbstractWindow window=this;
- windowCache=new WindowCache(){
+ sqlCache = new SQLCache(isLocalStorageOnly);
+ AbstractWindow window = this;
+ windowCache = new WindowCache() {
@Override
protected String generateShuffleKey(IMessage message) {
@@ -226,12 +227,12 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
initFunctionExecutor();
//启动shuffle channel 实现消息shuffle以及接收shuffle消息并处理
// FireManager.getInstance().startFireCheck();
- if(StringUtil.isNotEmpty(this.reduceSerializeValue)){
- byte[] bytes= Base64Utils.decode( this.reduceSerializeValue);
+ if (StringUtil.isNotEmpty(this.reduceSerializeValue)) {
+ byte[] bytes = Base64Utils.decode(this.reduceSerializeValue);
reducer = InstantiationUtil.deserializeObject(bytes);
}
- eventTimeManager=new EventTimeManager();
- windowMaxValueManager = new WindowMaxValueManager(this,sqlCache);
+ eventTimeManager = new EventTimeManager();
+ windowMaxValueManager = new WindowMaxValueManager(this, sqlCache);
return success;
}
@@ -282,22 +283,22 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
}
*/
- public WindowInstance createWindowInstance(String startTime, String
endTime, String fireTime,String splitId) {
- WindowInstance windowInstance =new WindowInstance();
+ public WindowInstance createWindowInstance(String startTime, String
endTime, String fireTime, String splitId) {
+ WindowInstance windowInstance = new WindowInstance();
windowInstance.setFireTime(fireTime);
windowInstance.setStartTime(startTime);
windowInstance.setEndTime(endTime);
windowInstance.setSplitId(splitId);
windowInstance.setGmtCreate(new Date());
windowInstance.setGmtModified(new Date());
-
windowInstance.setWindowInstanceName(createWindowInstanceName(startTime,endTime,fireTime));
+
windowInstance.setWindowInstanceName(createWindowInstanceName(startTime,
endTime, fireTime));
windowInstance.setWindowName(getConfigureName());
windowInstance.setWindowNameSpace(getNameSpace());
- String windowInstanceId =windowInstance.createWindowInstanceId();
+ String windowInstanceId = windowInstance.createWindowInstanceId();
String dbWindowInstanceId = StringUtil.createMD5Str(windowInstanceId);
windowInstance.setWindowInstanceKey(dbWindowInstanceId);
-
windowInstance.setWindowInstanceSplitName(StringUtil.createMD5Str(MapKeyUtil.createKey(getNameSpace(),
getConfigureName(),splitId)));
+
windowInstance.setWindowInstanceSplitName(StringUtil.createMD5Str(MapKeyUtil.createKey(getNameSpace(),
getConfigureName(), splitId)));
windowInstance.setNewWindowInstance(true);
return windowInstance;
}
@@ -310,8 +311,8 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
* @param fireTime
* @return
*/
- public String createWindowInstanceName(String startTime, String endTime,
String fireTime){
- return fireMode==0?getConfigureName():fireTime;
+ public String createWindowInstanceName(String startTime, String endTime,
String fireTime) {
+ return fireMode == 0 ? getConfigureName() : fireTime;
}
/**
@@ -322,14 +323,14 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
* @return
*/
- public long incrementAndGetSplitNumber(WindowInstance instance,String
shuffleId){
- long maxValue=
windowMaxValueManager.incrementAndGetSplitNumber(instance,shuffleId);
+ public long incrementAndGetSplitNumber(WindowInstance instance, String
shuffleId) {
+ long maxValue =
windowMaxValueManager.incrementAndGetSplitNumber(instance, shuffleId);
return maxValue;
}
public abstract Class getWindowBaseValueClass();
- public abstract int fireWindowInstance(WindowInstance
windowInstance,Map<String,String>queueId2Offset) ;
+ public abstract int fireWindowInstance(WindowInstance windowInstance,
Map<String, String> queueId2Offset);
/**
* 计算每条记录的group by值,对于groupby分组,里面任何字段不能为null值,如果为null值,这条记录会被忽略
@@ -337,21 +338,21 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
* @param message
* @return
*/
- protected String generateShuffleKey(IMessage message){
+ protected String generateShuffleKey(IMessage message) {
if (StringUtil.isEmpty(groupByFieldName)) {
return null;
}
- JSONObject msg=message.getMessageBody();
+ JSONObject msg = message.getMessageBody();
String[] fieldNames = groupByFieldName.split(";");
- String[] values=new String[fieldNames.length];
+ String[] values = new String[fieldNames.length];
boolean isFirst = true;
- int i=0;
+ int i = 0;
for (String filedName : fieldNames) {
if (isFirst) {
isFirst = false;
}
String value = msg.getString(filedName);
- values[i]=value;
+ values[i] = value;
i++;
}
return MapKeyUtil.createKey(values);
@@ -359,8 +360,8 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
public abstract void clearFireWindowInstance(WindowInstance
windowInstance);
- public void clearFire(WindowInstance windowInstance){
- if(windowInstance==null){
+ public void clearFire(WindowInstance windowInstance) {
+ if (windowInstance == null) {
return;
}
clearFireWindowInstance(windowInstance);
@@ -412,7 +413,7 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
scriptBuilder = new StringBuilder();
}
String[] functionParameterNames =
scriptParameterList.stream().map(
- scriptParameter ->
scriptParameter.getScriptParameterStr()).collect(Collectors.toList())
+ scriptParameter ->
scriptParameter.getScriptParameterStr()).collect(Collectors.toList())
.toArray(new String[0]);
AggregationScript accEngine = new AggregationScript(
((ScriptExpression)expression).getNewFieldName(),
functionName,
@@ -460,22 +461,23 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
* @param message
* @return
*/
- public List<WindowInstance> queryOrCreateWindowInstance(IMessage
message,String queueId) {
- return WindowInstance.getOrCreateWindowInstance(this,
WindowInstance.getOccurTime(this, message), timeUnitAdjust,
+ public List<WindowInstance> queryOrCreateWindowInstance(IMessage message,
String queueId) {
+ return WindowInstance.getOrCreateWindowInstance(this,
WindowInstance.getOccurTime(this, message), timeUnitAdjust,
queueId);
}
/**
* 获取window处理的消息中最大的时间
+ *
* @param msg
* @return
*/
- public void updateMaxEventTime(IMessage msg){
- eventTimeManager.updateEventTime(msg,this);
+ public void updateMaxEventTime(IMessage msg) {
+ eventTimeManager.updateEventTime(msg, this);
}
public Long getMaxEventTime(String queueId) {
- return this.eventTimeManager.getMaxEventTime(queueId);
+ return this.eventTimeManager.getMaxEventTime(queueId);
}
/**
@@ -483,15 +485,15 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
*
* @param windowValueList
*/
- public void sendFireMessage(List<WindowValue> windowValueList,String
queueId) {
+ public void sendFireMessage(List<WindowValue> windowValueList, String
queueId) {
int count = 0;
- List<IMessage> msgs=new ArrayList<>();
+ List<IMessage> msgs = new ArrayList<>();
for (WindowValue windowValue : windowValueList) {
JSONObject message = new JSONObject();
- if(JSONObject.class.isInstance(windowValue.getcomputedResult())){
- message=(JSONObject)windowValue.getcomputedResult();
- }else {
+ if (JSONObject.class.isInstance(windowValue.getcomputedResult())) {
+ message = (JSONObject)windowValue.getcomputedResult();
+ } else {
Iterator<Entry<String, Object>> it =
windowValue.iteratorComputedColumnResult();
while (it.hasNext()) {
Entry<String, Object> entry = it.next();
@@ -499,23 +501,23 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
}
}
- Long
fireTime=DateUtil.parseTime(windowValue.getFireTime()).getTime();
- long baseTime= 1577808000000L ;//set base time from 2021-01-01
00:00:00
- int sameFireCount=0;
- if(fireMode!=0){
- Long
endTime=DateUtil.parseTime(windowValue.getEndTime()).getTime();
-
sameFireCount=(int)((fireTime-endTime)/1000)/sizeInterval*timeUnitAdjust;
- if(sameFireCount>=1){
- sameFireCount=1;
+ Long fireTime =
DateUtil.parseTime(windowValue.getFireTime()).getTime();
+ long baseTime = 1577808000000L;//set base time from 2021-01-01
00:00:00
+ int sameFireCount = 0;
+ if (fireMode != 0) {
+ Long endTime =
DateUtil.parseTime(windowValue.getEndTime()).getTime();
+ sameFireCount = (int)((fireTime - endTime) / 1000) /
sizeInterval * timeUnitAdjust;
+ if (sameFireCount >= 1) {
+ sameFireCount = 1;
}
}
//can keep offset in order
- Long
offset=((fireTime-baseTime)/1000*10+sameFireCount)*100000000+windowValue.getPartitionNum();
-
message.put("windowInstanceId",windowValue.getWindowInstancePartitionId());
- message.put("start_time",windowValue.getStartTime());
- message.put("end_time",windowValue.getEndTime());
- message.put("offset",offset);
- Message
newMessage=windowFireSource.createMessage(message,queueId,offset+"",false);
+ Long offset = ((fireTime - baseTime) / 1000 * 10 + sameFireCount)
* 100000000 + windowValue.getPartitionNum();
+ message.put("windowInstanceId",
windowValue.getWindowInstancePartitionId());
+ message.put("start_time", windowValue.getStartTime());
+ message.put("end_time", windowValue.getEndTime());
+ message.put("offset", offset);
+ Message newMessage = windowFireSource.createMessage(message,
queueId, offset + "", false);
newMessage.getHeader().setOffsetIsLong(true);
if (count == windowValueList.size() - 1) {
newMessage.getHeader().setNeedFlush(true);
@@ -526,8 +528,8 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
count++;
}
- if(DebugWriter.getDebugWriter(this.getConfigureName()).isOpenDebug()){
-
DebugWriter.getDebugWriter(this.getConfigureName()).writeWindowFire(this,msgs,queueId);
+ if (DebugWriter.getDebugWriter(this.getConfigureName()).isOpenDebug())
{
+
DebugWriter.getDebugWriter(this.getConfigureName()).writeWindowFire(this, msgs,
queueId);
}
}
@@ -687,7 +689,7 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
public void setReducer(IReducer reducer) {
this.reducer = reducer;
byte[] bytes = InstantiationUtil.serializeObject(reducer);
- this.reduceSerializeValue=Base64Utils.encode(bytes);
+ this.reduceSerializeValue = Base64Utils.encode(bytes);
}
public int getTimeUnitAdjust() {
@@ -752,8 +754,8 @@ public abstract class AbstractWindow extends
BasedConfigurable implements IWindo
return sqlCache;
}
- public void initWindowInstanceMaxSplitNum(WindowInstance instance){
-
getWindowMaxValueManager().initMaxSplitNum(instance,queryWindowInstanceMaxSplitNum(instance));
+ public void initWindowInstanceMaxSplitNum(WindowInstance instance) {
+ getWindowMaxValueManager().initMaxSplitNum(instance,
queryWindowInstanceMaxSplitNum(instance));
}
protected abstract Long queryWindowInstanceMaxSplitNum(WindowInstance
instance);
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
index 03e4224..4bcfe3a 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+
import
org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import
org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
@@ -34,9 +35,9 @@ import org.apache.rocketmq.streams.db.driver.JDBCDriver;
*/
public class SQLCache extends AbstractMultiSplitMessageCache<ISQLElement> {
- protected Boolean isOpenCache=true;//if false,then execute sql when
receive sql
- protected Set<String> firedWindowInstances=new HashSet<>();//fired window
instance ,if the owned sqls have not commit, can cancel the sqls
- protected Map<String,Integer> windowInstance2Index=new HashMap<>();//set
index to ISQLElement group by window instance
+ protected Boolean isOpenCache = true;//if false,then execute sql when
receive sql
+ protected Set<String> firedWindowInstances = new HashSet<>();//fired
window instance ,if the owned sqls have not commit, can cancel the sqls
+ protected Map<String, Integer> windowInstance2Index = new
HashMap<>();//set index to ISQLElement group by window instance
protected boolean isLocalOnly;
@@ -45,7 +46,7 @@ public class SQLCache extends
AbstractMultiSplitMessageCache<ISQLElement> {
this.isLocalOnly = isLocalOnly;
this.flushCallBack = new MessageFlushCallBack(new SQLCacheCallback());
this.setBatchSize(1000);
- this.setAutoFlushTimeGap(30 * 1000);
+ this.setAutoFlushTimeGap(10 * 1000);
this.setAutoFlushSize(100);
this.openAutoFlush();
}