This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 58e15df09 [INLONG-5613][Sort] Add interval join support for 
FlinkSqlParser (#5614)
58e15df09 is described below

commit 58e15df09cbaf0f8d4b3c7eb33836f8e1a4f1f91
Author: Charles <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Mon Aug 22 17:08:35 2022 +0800

    [INLONG-5613][Sort] Add interval join support for FlinkSqlParser (#5614)
---
 .../protocol/transformation/FieldRelation.java     |   2 +-
 .../protocol/transformation/FilterFunction.java    |   6 +-
 .../sort/protocol/transformation/Function.java     |  17 +-
 .../protocol/transformation/FunctionParam.java     |  10 +-
 .../transformation/function/AddFunction.java       |  67 ++++++++
 .../transformation/function/BetweenFunction.java   |  84 +++++++++
 .../CastFunction.java}                             |  52 +++---
 .../transformation/function/IntervalFunction.java  |  70 ++++++++
 .../transformation/function/SubtractFunction.java  |  67 ++++++++
 ...elation.java => InnerTemporalJoinRelation.java} |   6 +-
 ...tionRelation.java => IntervalJoinRelation.java} |  31 ++--
 .../transformation/relation/JoinRelation.java      |   5 +-
 ...ion.java => LeftOuterTemporalJoinRelation.java} |   4 +-
 .../transformation/relation/NodeRelation.java      |   5 +-
 .../relation/TemporalJoinRelation.java             |   4 +-
 .../transformation/function/AddFunctionTest.java   |  44 +++++
 .../function/BetweenFunctionTest.java              |  49 ++++++
 .../function/IntervalFunctionTest.java             |  41 +++++
 .../function/SubtractFunctionTest.java             |  44 +++++
 .../relation/InnerTemporalJoinRelationTest.java    |  56 ++++++
 .../relation/IntervalJoinRelationTest.java         |  55 ++++++
 .../relation/LeftTemporalJoinRelationTest.java     |  56 ++++++
 .../inlong/sort/parser/impl/FlinkSqlParser.java    |  24 ++-
 .../parser/IntervalJoinRelationSqlParseTest.java   | 187 +++++++++++++++++++++
 .../MySqlTemporalJoinRelationSqlParseTest.java     |   8 +-
 .../RedisTemporalJoinRelationSqlParseTest.java     |   8 +-
 26 files changed, 938 insertions(+), 64 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
index 1955dcb29..36ca76779 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
@@ -45,7 +45,7 @@ public class FieldRelation {
 
     @JsonCreator
     public FieldRelation(@JsonProperty("inputField") FunctionParam inputField,
-                         @JsonProperty("outputField") FieldInfo outputField) {
+            @JsonProperty("outputField") FieldInfo outputField) {
         this.inputField = inputField;
         this.outputField = outputField;
     }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
index 69a145cc2..bae643d30 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.protocol.transformation;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
 
@@ -31,8 +32,9 @@ import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilter
         property = "type")
 @JsonSubTypes({
         @JsonSubTypes.Type(value = SingleValueFilterFunction.class, name = 
"singleValueFilter"),
-        @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = 
"multiValueFilter")}
-)
+        @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = 
"multiValueFilter"),
+        @JsonSubTypes.Type(value = BetweenFunction.class, name = 
"betweenFunction")
+})
 public interface FilterFunction extends Function {
 
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
index 2ef8be7cb..42747e9b0 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
@@ -20,10 +20,16 @@ package org.apache.inlong.sort.protocol.transformation;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.AddFunction;
+import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
+import org.apache.inlong.sort.protocol.transformation.function.EncryptFunction;
 import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
 import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.HopStartFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.IntervalFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.JsonGetterFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFunction;
@@ -32,6 +38,7 @@ import 
org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.SplitIndexFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.SubtractFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction;
 import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction;
@@ -58,7 +65,15 @@ import java.util.List;
         @JsonSubTypes.Type(value = SplitIndexFunction.class, name = 
"splitIndex"),
         @JsonSubTypes.Type(value = RegexpReplaceFunction.class, name = 
"regexpReplace"),
         @JsonSubTypes.Type(value = RegexpReplaceFirstFunction.class, name = 
"regexpReplaceFirst"),
-        @JsonSubTypes.Type(value = CascadeFunctionWrapper.class, name = 
"cascadeFunctionWrapper")
+        @JsonSubTypes.Type(value = CascadeFunctionWrapper.class, name = 
"cascadeFunctionWrapper"),
+        @JsonSubTypes.Type(value = EncryptFunction.class, name = "encrypt"),
+        @JsonSubTypes.Type(value = JsonGetterFunction.class, name = 
"jsonGetterFunction"),
+        @JsonSubTypes.Type(value = CustomFunction.class, name = 
"customFunction"),
+        @JsonSubTypes.Type(value = BetweenFunction.class, name = 
"betweenFunction"),
+        @JsonSubTypes.Type(value = IntervalFunction.class, name = 
"intervalFunction"),
+        @JsonSubTypes.Type(value = AddFunction.class, name = "addFunction"),
+        @JsonSubTypes.Type(value = SubtractFunction.class, name = 
"subtractFunction")
+
 })
 public interface Function extends FunctionParam {
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index effd06200..ccda5f42a 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -22,12 +22,15 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.transformation.function.AddFunction;
+import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
 import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
 import org.apache.inlong.sort.protocol.transformation.function.EncryptFunction;
 import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
 import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.HopStartFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.IntervalFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.JsonGetterFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction;
@@ -37,6 +40,7 @@ import 
org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.SplitIndexFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.SubtractFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction;
 import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction;
@@ -100,7 +104,11 @@ import 
org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
         @JsonSubTypes.Type(value = CascadeFunctionWrapper.class, name = 
"cascadeFunctionWrapper"),
         @JsonSubTypes.Type(value = EncryptFunction.class, name = "encrypt"),
         @JsonSubTypes.Type(value = JsonGetterFunction.class, name = 
"jsonGetterFunction"),
-        @JsonSubTypes.Type(value = CustomFunction.class, name = 
"customFunction")
+        @JsonSubTypes.Type(value = CustomFunction.class, name = 
"customFunction"),
+        @JsonSubTypes.Type(value = BetweenFunction.class, name = 
"betweenFunction"),
+        @JsonSubTypes.Type(value = IntervalFunction.class, name = 
"intervalFunction"),
+        @JsonSubTypes.Type(value = AddFunction.class, name = "addFunction"),
+        @JsonSubTypes.Type(value = SubtractFunction.class, name = 
"subtractFunction")
 })
 public interface FunctionParam {
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/AddFunction.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/AddFunction.java
new file mode 100644
index 000000000..75025b583
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/AddFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+
+import javax.annotation.Nonnull;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The function for add
+ */
+@JsonTypeName("addFunction")
+@Data
+public class AddFunction implements Function {
+
+    @Nonnull
+    @JsonProperty("leftField")
+    private final FunctionParam leftField;
+    @Nonnull
+    @JsonProperty("rightField")
+    private final FunctionParam rightField;
+
+    @JsonCreator
+    public AddFunction(@Nonnull @JsonProperty("leftField") FunctionParam 
leftField,
+            @Nonnull @JsonProperty("rightField") FunctionParam rightField) {
+        this.leftField = Preconditions.checkNotNull(leftField, "leftField is 
null");
+        this.rightField = Preconditions.checkNotNull(rightField, "rightField 
is null");
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(leftField, rightField);
+    }
+
+    @Override
+    public String getName() {
+        return "+";
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s %s %s", leftField.format(), getName(), 
rightField.format());
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java
new file mode 100644
index 000000000..df7cbbcd4
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.LogicOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+
+import javax.annotation.Nonnull;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The function for between
+ */
+@JsonTypeName("betweenFunction")
+@Data
+public class BetweenFunction implements FilterFunction {
+
+    @Nonnull
+    @JsonProperty("field")
+    private final FunctionParam field;
+    @Nonnull
+    @JsonProperty("start")
+    private final FunctionParam start;
+    @Nonnull
+    @JsonProperty("end")
+    private final FunctionParam end;
+    @Nonnull
+    @JsonProperty("logicOperator")
+    private final LogicOperator logicOperator;
+
+    @JsonCreator
+    public BetweenFunction(
+            @Nonnull @JsonProperty("logicOperator") LogicOperator 
logicOperator,
+            @Nonnull @JsonProperty("field") FunctionParam field,
+            @Nonnull @JsonProperty("start") FunctionParam start,
+            @Nonnull @JsonProperty("end") FunctionParam end) {
+        this.field = Preconditions.checkNotNull(field, "field is null");
+        this.start = Preconditions.checkNotNull(start, "start is null");
+        this.end = Preconditions.checkNotNull(end, "end is null");
+        this.logicOperator = Preconditions.checkNotNull(logicOperator, 
"logicOperator is null");
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(logicOperator, field, start, end);
+    }
+
+    @Override
+    public String getName() {
+        return "BETWEEN";
+    }
+
+    @Override
+    public String format() {
+        String format = "%s %s %s %s AND %s";
+        if (logicOperator == EmptyOperator.getInstance()) {
+            format = "%s%s %s %s AND %s";
+        }
+        return String.format(format, logicOperator.format(), field.format(), 
getName(), start.format(), end.format());
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CastFunction.java
similarity index 51%
copy from 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CastFunction.java
index 1955dcb29..6dcd858c1 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CastFunction.java
@@ -15,38 +15,50 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.transformation;
+package org.apache.inlong.sort.protocol.transformation.function;
 
+import com.google.common.base.Preconditions;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
 
+import java.util.Arrays;
+import java.util.List;
 
-/**
- * Defines the relation between fields from input to output field
- */
-@JsonTypeInfo(
-        use = JsonTypeInfo.Id.NAME,
-        include = JsonTypeInfo.As.PROPERTY,
-        property = "type")
-@JsonTypeName("fieldRelation")
+@JsonTypeName("cast")
 @Data
 @NoArgsConstructor
-public class FieldRelation {
+public class CastFunction implements Function {
+
+    @JsonProperty("field")
+    private FunctionParam field;
 
-    @JsonProperty("inputField")
-    private FunctionParam inputField;
-    @JsonProperty("outputField")
-    private FieldInfo outputField;
+    private String type;
 
     @JsonCreator
-    public FieldRelation(@JsonProperty("inputField") FunctionParam inputField,
-                         @JsonProperty("outputField") FieldInfo outputField) {
-        this.inputField = inputField;
-        this.outputField = outputField;
+    public CastFunction(@JsonProperty("field") FunctionParam field,
+            @JsonProperty("type") String type) {
+        this.field = Preconditions.checkNotNull(field, "field is null");
+        this.type = Preconditions.checkNotNull(type, "type is null");
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(field, new ConstantParam(type));
+    }
+
+    @Override
+    public String getName() {
+        return "CAST";
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s(%s AS %s)", getName(), field.format(), type);
     }
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunction.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunction.java
new file mode 100644
index 000000000..2c6897dca
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+
+import javax.annotation.Nonnull;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The function for interval
+ */
+@JsonTypeName("intervalFunction")
+@Data
+public class IntervalFunction implements Function {
+
+    @Nonnull
+    @JsonProperty("interval")
+    private final StringConstantParam interval;
+
+    @Nonnull
+    @JsonProperty("timeUnit")
+    private final TimeUnitConstantParam timeUnit;
+
+    @JsonCreator
+    public IntervalFunction(@Nonnull @JsonProperty("interval") 
StringConstantParam interval,
+            @Nonnull @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) 
{
+        this.interval = Preconditions.checkNotNull(interval, "interval is 
null");
+        this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is 
null");
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(interval, timeUnit);
+    }
+
+    @Override
+    public String getName() {
+        return "INTERVAL";
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s %s %s", getName(), interval.format(), 
timeUnit.format());
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunction.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunction.java
new file mode 100644
index 000000000..24133695a
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+
+import javax.annotation.Nonnull;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The function for subtract
+ */
+@JsonTypeName("subtractFunction")
+@Data
+public class SubtractFunction implements Function {
+
+    @Nonnull
+    @JsonProperty("leftField")
+    private final FunctionParam leftField;
+    @Nonnull
+    @JsonProperty("rightField")
+    private final FunctionParam rightField;
+
+    @JsonCreator
+    public SubtractFunction(@Nonnull @JsonProperty("leftField") FunctionParam 
leftField,
+            @Nonnull @JsonProperty("rightField") FunctionParam rightField) {
+        this.leftField = Preconditions.checkNotNull(leftField, "leftField is 
null");
+        this.rightField = Preconditions.checkNotNull(rightField, "rightField 
is null");
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(leftField, rightField);
+    }
+
+    @Override
+    public String getName() {
+        return "-";
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s %s %s", leftField.format(), getName(), 
rightField.format());
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelation.java
similarity index 92%
copy from 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
copy to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelation.java
index 2b9b5782d..e1c283fcf 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelation.java
@@ -37,7 +37,7 @@ import java.util.Map;
 @EqualsAndHashCode(callSuper = true)
 @Data
 @NoArgsConstructor
-public class InnerTemporalJoinRelationRelation extends TemporalJoinRelation {
+public class InnerTemporalJoinRelation extends TemporalJoinRelation {
 
     /**
      * Constructor
@@ -50,11 +50,11 @@ public class InnerTemporalJoinRelationRelation extends 
TemporalJoinRelation {
      * @param systemTime The system time for temporal join
      */
     @JsonCreator
-    public InnerTemporalJoinRelationRelation(
+    public InnerTemporalJoinRelation(
             @JsonProperty("inputs") List<String> inputs,
             @JsonProperty("outputs") List<String> outputs,
             @JsonProperty("joinConditionMap") Map<String, 
List<FilterFunction>> joinConditionMap,
-            @Nullable @JsonProperty("systemTimeMap") FieldInfo systemTime) {
+            @Nullable @JsonProperty("systemTime") FieldInfo systemTime) {
         super(inputs, outputs, joinConditionMap, systemTime);
     }
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelation.java
similarity index 64%
rename from 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
rename to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelation.java
index 2b9b5782d..e1a0d1153 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelation.java
@@ -17,27 +17,25 @@
 
 package org.apache.inlong.sort.protocol.transformation.relation;
 
+import com.google.common.base.Preconditions;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
-import javax.annotation.Nullable;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
- * Inner temporal join relation
+ * This class defines the interval join relation.In interval join, the join 
conditions is same as filters,
+ * and so we forbid the filters for interval join. And the same time,
+ * the joinConditionMap will be allowed to have only one value.
  */
-@JsonTypeName("innerTemporalJoin")
+@JsonTypeName("intervalJoin")
 @EqualsAndHashCode(callSuper = true)
 @Data
-@NoArgsConstructor
-public class InnerTemporalJoinRelationRelation extends TemporalJoinRelation {
+public class IntervalJoinRelation extends JoinRelation {
 
     /**
      * Constructor
@@ -47,19 +45,18 @@ public class InnerTemporalJoinRelationRelation extends 
TemporalJoinRelation {
      * @param joinConditionMap The joinConditionMap is a map of join conditions
      *         the key of joinConditionMap is the node id of join node
      *         the value of joinConditionMap is a list of join contidition
-     * @param systemTime The system time for temporal join
      */
-    @JsonCreator
-    public InnerTemporalJoinRelationRelation(
-            @JsonProperty("inputs") List<String> inputs,
+    public IntervalJoinRelation(@JsonProperty("inputs") List<String> inputs,
             @JsonProperty("outputs") List<String> outputs,
-            @JsonProperty("joinConditionMap") Map<String, 
List<FilterFunction>> joinConditionMap,
-            @Nullable @JsonProperty("systemTimeMap") FieldInfo systemTime) {
-        super(inputs, outputs, joinConditionMap, systemTime);
+            @JsonProperty("joinConditionMap") LinkedHashMap<String, 
List<FilterFunction>> joinConditionMap) {
+        super(inputs, outputs, joinConditionMap);
+        Preconditions.checkState(joinConditionMap.size() == 1,
+                String.format("The size of joinConditionMap must be one for 
%s", this.getClass().getSimpleName()));
     }
 
     @Override
     public String format() {
-        return "INNER JOIN";
+        throw new UnsupportedOperationException(String.format("Format is not 
supported for %s",
+                this.getClass().getSimpleName()));
     }
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
index c98e2be30..8ec80c4ee 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
@@ -41,8 +41,9 @@ import java.util.Map;
         @JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = 
"innerJoin"),
         @JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = 
"leftOuterJoin"),
         @JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = 
"rightOutJoin"),
-        @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, 
name = "innerTemporalJoin"),
-        @JsonSubTypes.Type(value = 
LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin")
+        @JsonSubTypes.Type(value = InnerTemporalJoinRelation.class, name = 
"innerTemporalJoin"),
+        @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelation.class, name = 
"leftOuterTemporalJoin"),
+        @JsonSubTypes.Type(value = IntervalJoinRelation.class, name = 
"intervalJoin")
 })
 @EqualsAndHashCode(callSuper = true)
 @Data
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelation.java
similarity index 94%
rename from 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
rename to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelation.java
index 2a873381e..ad61b93ab 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelation.java
@@ -37,7 +37,7 @@ import java.util.Map;
 @EqualsAndHashCode(callSuper = true)
 @Data
 @NoArgsConstructor
-public class LeftOuterTemporalJoinRelationRelation extends 
TemporalJoinRelation {
+public class LeftOuterTemporalJoinRelation extends TemporalJoinRelation {
 
     /**
      * LeftOuterTemporalJoin Constructor
@@ -50,7 +50,7 @@ public class LeftOuterTemporalJoinRelationRelation extends 
TemporalJoinRelation
      * @param systemTime The system time for temporal join
      */
     @JsonCreator
-    public LeftOuterTemporalJoinRelationRelation(
+    public LeftOuterTemporalJoinRelation(
             @JsonProperty("inputs") List<String> inputs,
             @JsonProperty("outputs") List<String> outputs,
             @JsonProperty("joinConditionMap") Map<String, 
List<FilterFunction>> joinConditionMap,
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
index 43a3e725c..bcfa7058f 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
@@ -40,8 +40,9 @@ import java.util.List;
         @JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = 
"innerJoin"),
         @JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = 
"leftOuterJoin"),
         @JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = 
"rightOutJoin"),
-        @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, 
name = "innerTemporalJoin"),
-        @JsonSubTypes.Type(value = 
LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin"),
+        @JsonSubTypes.Type(value = InnerTemporalJoinRelation.class, name = 
"innerTemporalJoin"),
+        @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelation.class, name = 
"leftOuterTemporalJoin"),
+        @JsonSubTypes.Type(value = IntervalJoinRelation.class, name = 
"intervalJoin"),
         @JsonSubTypes.Type(value = UnionNodeRelation.class, name = "union"),
         @JsonSubTypes.Type(value = NodeRelation.class, name = "baseRelation")
 })
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
index 6a4fc1725..f25dc4b06 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
@@ -40,8 +40,8 @@ import java.util.Map;
         include = JsonTypeInfo.As.PROPERTY,
         property = "type")
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, 
name = "innerTemporalJoin"),
-        @JsonSubTypes.Type(value = 
LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin")
+        @JsonSubTypes.Type(value = InnerTemporalJoinRelation.class, name = 
"innerTemporalJoin"),
+        @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelation.class, name = 
"leftOuterTemporalJoin")
 })
 @EqualsAndHashCode(callSuper = true)
 @Data
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/AddFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/AddFunctionTest.java
new file mode 100644
index 000000000..344c2e8ac
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/AddFunctionTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import 
org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+/**
+ * Test for {@link AddFunction}
+ */
+public class AddFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getTestObject() {
+        return new AddFunction(new FieldInfo("event_time", new 
TimestampFormatInfo()),
+                new IntervalFunction(new StringConstantParam("5"),
+                        new TimeUnitConstantParam(TimeUnit.SECOND)));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "`event_time` + INTERVAL '5' SECOND";
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunctionTest.java
new file mode 100644
index 000000000..89518c7e8
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import 
org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+
+/**
+ * Test for {@link BetweenFunction}
+ */
+public class BetweenFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getTestObject() {
+        return new BetweenFunction(EmptyOperator.getInstance(), new 
FieldInfo("order_time", new TimestampFormatInfo()),
+                new SubtractFunction(new FieldInfo("update_time", new 
TimestampFormatInfo()),
+                        new IntervalFunction(new StringConstantParam("5"),
+                                new TimeUnitConstantParam(TimeUnit.SECOND))),
+                new AddFunction(new FieldInfo("update_time", new 
TimestampFormatInfo()),
+                        new IntervalFunction(new StringConstantParam("5"),
+                                new TimeUnitConstantParam(TimeUnit.SECOND))));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "`order_time` BETWEEN `update_time` - INTERVAL '5' SECOND AND 
`update_time` + INTERVAL '5' SECOND";
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunctionTest.java
new file mode 100644
index 000000000..640d6d3b0
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunctionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import 
org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+/**
+ * Test for {@link IntervalFunction}
+ */
+public class IntervalFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getTestObject() {
+        return new IntervalFunction(new StringConstantParam("5"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "INTERVAL '5' SECOND";
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunctionTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunctionTest.java
new file mode 100644
index 000000000..57a3eef16
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunctionTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import 
org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+/**
+ * Test for {@link SubtractFunction}
+ */
+public class SubtractFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getTestObject() {
+        return new SubtractFunction(new FieldInfo("event_time", new 
TimestampFormatInfo()),
+                new IntervalFunction(new StringConstantParam("5"),
+                        new TimeUnitConstantParam(TimeUnit.SECOND)));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "`event_time` - INTERVAL '5' SECOND";
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationTest.java
new file mode 100644
index 000000000..5109f874a
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.relation;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import 
org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * Tests for {@link InnerTemporalJoinRelation}
+ */
+public class InnerTemporalJoinRelationTest extends 
SerializeBaseTest<InnerTemporalJoinRelation> {
+
+    @Override
+    public InnerTemporalJoinRelation getTestObject() {
+        LinkedHashMap<String, List<FilterFunction>> joinConditionMap = new 
LinkedHashMap<>();
+        joinConditionMap.put("2", Arrays.asList(
+                new SingleValueFilterFunction(EmptyOperator.getInstance(),
+                        new FieldInfo("name", "1", new StringFormatInfo()),
+                        EqualOperator.getInstance(), new FieldInfo("name", "2",
+                        new StringFormatInfo())),
+                new SingleValueFilterFunction(AndOperator.getInstance(),
+                        new FieldInfo("name", "1", new StringFormatInfo()),
+                        NotEqualOperator.getInstance(), new 
ConstantParam("test"))));
+        return new InnerTemporalJoinRelation(Arrays.asList("1", "2", "3"),
+                Collections.singletonList("4"), joinConditionMap, new 
FieldInfo("ts", new TimestampFormatInfo()));
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelationTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelationTest.java
new file mode 100644
index 000000000..9e365ae04
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelationTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.relation;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import 
org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * Tests for {@link IntervalJoinRelation}
+ */
+public class IntervalJoinRelationTest extends 
SerializeBaseTest<IntervalJoinRelation> {
+
+    @Override
+    public IntervalJoinRelation getTestObject() {
+        LinkedHashMap<String, List<FilterFunction>> joinConditionMap = new 
LinkedHashMap<>();
+        joinConditionMap.put("2", Arrays.asList(
+                new SingleValueFilterFunction(EmptyOperator.getInstance(),
+                        new FieldInfo("name", "1", new StringFormatInfo()),
+                        EqualOperator.getInstance(), new FieldInfo("name", "2",
+                        new StringFormatInfo())),
+                new SingleValueFilterFunction(AndOperator.getInstance(),
+                        new FieldInfo("name", "1", new StringFormatInfo()),
+                        NotEqualOperator.getInstance(), new 
ConstantParam("test"))));
+        return new IntervalJoinRelation(Arrays.asList("1", "2", "3"),
+                Collections.singletonList("4"), joinConditionMap);
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftTemporalJoinRelationTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftTemporalJoinRelationTest.java
new file mode 100644
index 000000000..1831aa903
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftTemporalJoinRelationTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.relation;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import 
org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ * Tests for {@link LeftOuterTemporalJoinRelation}
+ */
+public class LeftTemporalJoinRelationTest extends 
SerializeBaseTest<LeftOuterTemporalJoinRelation> {
+
+    @Override
+    public LeftOuterTemporalJoinRelation getTestObject() {
+        LinkedHashMap<String, List<FilterFunction>> joinConditionMap = new 
LinkedHashMap<>();
+        joinConditionMap.put("2", Arrays.asList(
+                new SingleValueFilterFunction(EmptyOperator.getInstance(),
+                        new FieldInfo("name", "1", new StringFormatInfo()),
+                        EqualOperator.getInstance(), new FieldInfo("name", "2",
+                        new StringFormatInfo())),
+                new SingleValueFilterFunction(AndOperator.getInstance(),
+                        new FieldInfo("name", "1", new StringFormatInfo()),
+                        NotEqualOperator.getInstance(), new 
ConstantParam("test"))));
+        return new LeftOuterTemporalJoinRelation(Arrays.asList("1", "2", "3"),
+                Collections.singletonList("4"), joinConditionMap, new 
FieldInfo("ts", new TimestampFormatInfo()));
+    }
+}
diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index faa7fdf71..02722a9c5 100644
--- 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -46,6 +46,7 @@ import 
org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import org.apache.inlong.sort.protocol.transformation.Function;
 import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import 
org.apache.inlong.sort.protocol.transformation.relation.IntervalJoinRelation;
 import org.apache.inlong.sort.protocol.transformation.relation.JoinRelation;
 import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import 
org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
@@ -198,7 +199,6 @@ public class FlinkSqlParser implements Parser {
 
     /**
      * parse node relation
-     * <p/>
      * Here we only parse the output node in the relation,
      * and the input node parsing is achieved by parsing the dependent node 
parsing of the output node.
      *
@@ -427,6 +427,15 @@ public class FlinkSqlParser implements Parser {
         Map<String, List<FilterFunction>> conditionMap = 
relation.getJoinConditionMap();
         if (relation instanceof TemporalJoinRelation) {
             parseTemporalJoin((TemporalJoinRelation) relation, nodeMap, 
tableNameAliasMap, conditionMap, sb);
+        } else if (relation instanceof IntervalJoinRelation) {
+            Preconditions.checkState(filters == null || filters.isEmpty(),
+                    String.format("filters must be empty for %s", 
relation.getClass().getSimpleName()));
+            parseIntervalJoin((IntervalJoinRelation) relation, nodeMap, 
tableNameAliasMap, sb);
+            List<FilterFunction> conditions = 
conditionMap.values().stream().findFirst().orElse(null);
+            Preconditions.checkState(conditions != null && 
!conditions.isEmpty(),
+                    String.format("Join conditions must no be empty for %s", 
relation.getClass().getSimpleName()));
+            fillOutTableNameAlias(new ArrayList<>(conditions), 
tableNameAliasMap);
+            parseFilterFields(FilterStrategy.RETAIN, conditions, sb);
         } else {
             parseRegularJoin(relation, nodeMap, tableNameAliasMap, 
conditionMap, sb);
         }
@@ -443,6 +452,15 @@ public class FlinkSqlParser implements Parser {
         return sb.toString();
     }
 
+    private void parseIntervalJoin(IntervalJoinRelation relation, Map<String, 
Node> nodeMap,
+            Map<String, String> tableNameAliasMap, StringBuilder sb) {
+        for (int i = 1; i < relation.getInputs().size(); i++) {
+            String inputId = relation.getInputs().get(i);
+            sb.append(", ").append(nodeMap.get(inputId).genTableName())
+                    .append(" ").append(tableNameAliasMap.get(inputId));
+        }
+    }
+
     private void parseRegularJoin(JoinRelation relation, Map<String, Node> 
nodeMap,
             Map<String, String> tableNameAliasMap, Map<String, 
List<FilterFunction>> conditionMap, StringBuilder sb) {
         for (int i = 1; i < relation.getInputs().size(); i++) {
@@ -590,9 +608,9 @@ public class FlinkSqlParser implements Parser {
      */
     private void parseFilterFields(FilterStrategy filterStrategy, 
List<FilterFunction> filters, StringBuilder sb) {
         if (filters != null && !filters.isEmpty()) {
-            sb.append("\n    WHERE ");
+            sb.append("\nWHERE ");
             String subSql = StringUtils
-                    
.join(filters.stream().map(FunctionParam::format).collect(Collectors.toList()), 
" ");
+                    
.join(filters.stream().map(FunctionParam::format).collect(Collectors.toList()), 
"\n    ");
             if (filterStrategy == FilterStrategy.REMOVE) {
                 sb.append("not (").append(subSql).append(")");
             } else {
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
new file mode 100644
index 000000000..2fc4fa03d
--- /dev/null
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java
@@ -0,0 +1,187 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import 
org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+import org.apache.inlong.sort.protocol.transformation.function.AddFunction;
+import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.IntervalFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.SubtractFunction;
+import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import 
org.apache.inlong.sort.protocol.transformation.relation.IntervalJoinRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Test for Interval join for {@link IntervalJoinRelation} {@link 
FlinkSqlParser} with {@link KafkaExtractNode}
+ */
+public class IntervalJoinRelationSqlParseTest extends AbstractTestBase {
+
+    private KafkaExtractNode buildIntervalJoinLeftStream() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("price", new DecimalFormatInfo(32, 2)),
+                new FieldInfo("currency", new StringFormatInfo()),
+                new FieldInfo("order_time", new TimestampFormatInfo(3)),
+                new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME)
+        );
+        return new KafkaExtractNode("1", "kafka_input_1", fields,
+                new WatermarkField(new FieldInfo("order_time", new 
TimestampFormatInfo(3))),
+                null, "orders", "localhost:9092",
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null,
+                "groupId_1", null);
+    }
+
+    private KafkaExtractNode buildIntervalJoinRightStream() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2)),
+                new FieldInfo("currency", new StringFormatInfo()),
+                new FieldInfo("update_time", new TimestampFormatInfo(3)),
+                new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME)
+        );
+        return new KafkaExtractNode("2", "kafka_input_2", fields,
+                new WatermarkField(new FieldInfo("update_time", new 
TimestampFormatInfo(3))),
+                null, "currency_rates", "localhost:9092",
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null,
+                "groupId_2", null);
+    }
+
+    private KafkaLoadNode buildKafkaLoadNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("price", new DecimalFormatInfo(32, 2)),
+                new FieldInfo("currency", new StringFormatInfo()),
+                new FieldInfo("order_time", new TimestampFormatInfo(3)),
+                new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2))
+        );
+        List<FieldRelation> relations = Arrays.asList(
+                new FieldRelation(new FieldInfo("id", "1", new 
LongFormatInfo()),
+                        new FieldInfo("id", new LongFormatInfo())),
+                new FieldRelation(new FieldInfo("price", "1", new 
DecimalFormatInfo(32, 2)),
+                        new FieldInfo("price", new DecimalFormatInfo(32, 2))),
+                new FieldRelation(new FieldInfo("currency", "1", new 
StringFormatInfo()),
+                        new FieldInfo("currency", new StringFormatInfo())),
+                new FieldRelation(new FieldInfo("order_time", "1", new 
TimestampFormatInfo(3)),
+                        new FieldInfo("order_time", new 
TimestampFormatInfo(3))),
+                new FieldRelation(new FieldInfo("conversion_rate", "2", new 
DecimalFormatInfo(32, 2)),
+                        new FieldInfo("conversion_rate", new 
DecimalFormatInfo(32, 2)))
+        );
+        return new KafkaLoadNode("3", "kafka_output", fields, relations, null,
+                null, "orders_output", "localhost:9092", new CanalJsonFormat(),
+                null, null, null);
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private IntervalJoinRelation buildNodeRelation(List<Node> inputs, 
List<Node> outputs) {
+        List<String> inputIds = 
inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = 
outputs.stream().map(Node::getId).collect(Collectors.toList());
+        LinkedHashMap<String, List<FilterFunction>> conditionMap = new 
LinkedHashMap<>();
+        conditionMap.put("2", Arrays.asList(
+                new SingleValueFilterFunction(
+                        EmptyOperator.getInstance(),
+                        new FieldInfo("currency", "1", new StringFormatInfo()),
+                        EqualOperator.getInstance(),
+                        new FieldInfo("currency", "2", new StringFormatInfo())
+                ),
+                new BetweenFunction(
+                        AndOperator.getInstance(),
+                        new FieldInfo("order_time", "1", new 
TimestampFormatInfo()),
+                        new SubtractFunction(new FieldInfo("update_time", "2", 
new TimestampFormatInfo()),
+                                new IntervalFunction(new 
StringConstantParam("10"), new TimeUnitConstantParam(
+                                        TimeUnit.SECOND))),
+                        new AddFunction(new FieldInfo("update_time", "2", new 
TimestampFormatInfo()),
+                                new IntervalFunction(new 
StringConstantParam("5"), new TimeUnitConstantParam(
+                                        TimeUnit.SECOND)))
+                )
+        ));
+        return new IntervalJoinRelation(inputIds, outputIds, conditionMap);
+    }
+
+    /**
+     * Test inner temporal join with event time for extract is mysql {@link 
KafkaExtractNode}
+     * and load is mysql {@link KafkaLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testIntervalJoinParse() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node leftStream = buildIntervalJoinLeftStream();
+        Node rightStream = buildIntervalJoinRightStream();
+        Node kafkaLoadNode = buildKafkaLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1",
+                Arrays.asList(leftStream, rightStream, kafkaLoadNode),
+                Collections.singletonList(
+                        buildNodeRelation(Arrays.asList(leftStream, 
rightStream),
+                                Collections.singletonList(kafkaLoadNode)))
+        );
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+}
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
index 69453fd95..8fe23b450 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
@@ -46,8 +46,8 @@ import 
org.apache.inlong.sort.protocol.transformation.WatermarkField;
 import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
 import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
-import 
org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelationRelation;
-import 
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelationRelation;
+import 
org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelation;
+import 
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelation;
 import 
org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
 import org.junit.Assert;
 import org.junit.Test;
@@ -146,9 +146,9 @@ public class MySqlTemporalJoinRelationSqlParseTest extends 
AbstractTestBase {
                 new FieldInfo("currency", "1", new LongFormatInfo()),
                 EqualOperator.getInstance(), new FieldInfo("currency", "2", 
new StringFormatInfo()))));
         if (left) {
-            return new LeftOuterTemporalJoinRelationRelation(inputIds, 
outputIds, conditionMap, systemTime);
+            return new LeftOuterTemporalJoinRelation(inputIds, outputIds, 
conditionMap, systemTime);
         }
-        return new InnerTemporalJoinRelationRelation(inputIds, outputIds, 
conditionMap, systemTime);
+        return new InnerTemporalJoinRelation(inputIds, outputIds, 
conditionMap, systemTime);
     }
 
     /**
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
index 36b3977e5..fdb28ccf7 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
@@ -44,8 +44,8 @@ import 
org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
 import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
-import 
org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelationRelation;
-import 
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelationRelation;
+import 
org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelation;
+import 
org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelation;
 import 
org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
 import org.junit.Assert;
 import org.junit.Test;
@@ -144,10 +144,10 @@ public class RedisTemporalJoinRelationSqlParseTest 
extends AbstractTestBase {
                 new FieldInfo("id", "1", new LongFormatInfo()),
                 EqualOperator.getInstance(), new FieldInfo("k", "5", new 
StringFormatInfo()))));
         if (left) {
-            return new LeftOuterTemporalJoinRelationRelation(inputIds, 
outputIds, conditionMap,
+            return new LeftOuterTemporalJoinRelation(inputIds, outputIds, 
conditionMap,
                     new FieldInfo("proc_time"));
         }
-        return new InnerTemporalJoinRelationRelation(inputIds, outputIds, 
conditionMap, new FieldInfo("proc_time"));
+        return new InnerTemporalJoinRelation(inputIds, outputIds, 
conditionMap, new FieldInfo("proc_time"));
     }
 
     /**

Reply via email to