This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 58e15df09 [INLONG-5613][Sort] Add interval join support for FlinkSqlParser (#5614) 58e15df09 is described below commit 58e15df09cbaf0f8d4b3c7eb33836f8e1a4f1f91 Author: Charles <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Mon Aug 22 17:08:35 2022 +0800 [INLONG-5613][Sort] Add interval join support for FlinkSqlParser (#5614) --- .../protocol/transformation/FieldRelation.java | 2 +- .../protocol/transformation/FilterFunction.java | 6 +- .../sort/protocol/transformation/Function.java | 17 +- .../protocol/transformation/FunctionParam.java | 10 +- .../transformation/function/AddFunction.java | 67 ++++++++ .../transformation/function/BetweenFunction.java | 84 +++++++++ .../CastFunction.java} | 52 +++--- .../transformation/function/IntervalFunction.java | 70 ++++++++ .../transformation/function/SubtractFunction.java | 67 ++++++++ ...elation.java => InnerTemporalJoinRelation.java} | 6 +- ...tionRelation.java => IntervalJoinRelation.java} | 31 ++-- .../transformation/relation/JoinRelation.java | 5 +- ...ion.java => LeftOuterTemporalJoinRelation.java} | 4 +- .../transformation/relation/NodeRelation.java | 5 +- .../relation/TemporalJoinRelation.java | 4 +- .../transformation/function/AddFunctionTest.java | 44 +++++ .../function/BetweenFunctionTest.java | 49 ++++++ .../function/IntervalFunctionTest.java | 41 +++++ .../function/SubtractFunctionTest.java | 44 +++++ .../relation/InnerTemporalJoinRelationTest.java | 56 ++++++ .../relation/IntervalJoinRelationTest.java | 55 ++++++ .../relation/LeftTemporalJoinRelationTest.java | 56 ++++++ .../inlong/sort/parser/impl/FlinkSqlParser.java | 24 ++- .../parser/IntervalJoinRelationSqlParseTest.java | 187 +++++++++++++++++++++ .../MySqlTemporalJoinRelationSqlParseTest.java | 8 +- .../RedisTemporalJoinRelationSqlParseTest.java | 8 +- 26 files changed, 938 insertions(+), 64 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java index 1955dcb29..36ca76779 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java @@ -45,7 +45,7 @@ public class FieldRelation { @JsonCreator public FieldRelation(@JsonProperty("inputField") FunctionParam inputField, - @JsonProperty("outputField") FieldInfo outputField) { + @JsonProperty("outputField") FieldInfo outputField) { this.inputField = inputField; this.outputField = outputField; } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java index 69a145cc2..bae643d30 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java @@ -19,6 +19,7 @@ package org.apache.inlong.sort.protocol.transformation; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction; import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; @@ -31,8 +32,9 @@ import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilter property = "type") @JsonSubTypes({ @JsonSubTypes.Type(value = SingleValueFilterFunction.class, name = "singleValueFilter"), - @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = "multiValueFilter")} -) + @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = "multiValueFilter"), + @JsonSubTypes.Type(value = BetweenFunction.class, name = "betweenFunction") +}) public interface FilterFunction extends Function { } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java index 2ef8be7cb..42747e9b0 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java @@ -20,10 +20,16 @@ package org.apache.inlong.sort.protocol.transformation; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.inlong.sort.protocol.transformation.function.AddFunction; +import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction; import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper; +import org.apache.inlong.sort.protocol.transformation.function.CustomFunction; +import org.apache.inlong.sort.protocol.transformation.function.EncryptFunction; import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction; import org.apache.inlong.sort.protocol.transformation.function.HopFunction; import org.apache.inlong.sort.protocol.transformation.function.HopStartFunction; +import org.apache.inlong.sort.protocol.transformation.function.IntervalFunction; +import org.apache.inlong.sort.protocol.transformation.function.JsonGetterFunction; import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction; import org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFunction; @@ -32,6 +38,7 @@ import org.apache.inlong.sort.protocol.transformation.function.SessionFunction; import org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction; import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.function.SplitIndexFunction; +import org.apache.inlong.sort.protocol.transformation.function.SubtractFunction; import org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction; import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction; import org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction; @@ -58,7 +65,15 @@ import java.util.List; @JsonSubTypes.Type(value = SplitIndexFunction.class, name = "splitIndex"), @JsonSubTypes.Type(value = RegexpReplaceFunction.class, name = "regexpReplace"), @JsonSubTypes.Type(value = RegexpReplaceFirstFunction.class, name = "regexpReplaceFirst"), - @JsonSubTypes.Type(value = CascadeFunctionWrapper.class, name = "cascadeFunctionWrapper") + @JsonSubTypes.Type(value = CascadeFunctionWrapper.class, name = "cascadeFunctionWrapper"), + @JsonSubTypes.Type(value = EncryptFunction.class, name = "encrypt"), + @JsonSubTypes.Type(value = JsonGetterFunction.class, name = "jsonGetterFunction"), + @JsonSubTypes.Type(value = CustomFunction.class, name = "customFunction"), + @JsonSubTypes.Type(value = BetweenFunction.class, name = "betweenFunction"), + @JsonSubTypes.Type(value = IntervalFunction.class, name = "intervalFunction"), + @JsonSubTypes.Type(value = AddFunction.class, name = "addFunction"), + @JsonSubTypes.Type(value = SubtractFunction.class, name = "subtractFunction") + }) public interface Function extends FunctionParam { diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java index effd06200..ccda5f42a 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java @@ -22,12 +22,15 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.MetaFieldInfo; +import org.apache.inlong.sort.protocol.transformation.function.AddFunction; +import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction; import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper; import org.apache.inlong.sort.protocol.transformation.function.CustomFunction; import org.apache.inlong.sort.protocol.transformation.function.EncryptFunction; import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction; import org.apache.inlong.sort.protocol.transformation.function.HopFunction; import org.apache.inlong.sort.protocol.transformation.function.HopStartFunction; +import org.apache.inlong.sort.protocol.transformation.function.IntervalFunction; import org.apache.inlong.sort.protocol.transformation.function.JsonGetterFunction; import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction; @@ -37,6 +40,7 @@ import org.apache.inlong.sort.protocol.transformation.function.SessionFunction; import org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction; import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.function.SplitIndexFunction; +import org.apache.inlong.sort.protocol.transformation.function.SubtractFunction; import org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction; import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction; import org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction; @@ -100,7 +104,11 @@ import org.apache.inlong.sort.protocol.transformation.operator.OrOperator; @JsonSubTypes.Type(value = CascadeFunctionWrapper.class, name = "cascadeFunctionWrapper"), @JsonSubTypes.Type(value = EncryptFunction.class, name = "encrypt"), @JsonSubTypes.Type(value = JsonGetterFunction.class, name = "jsonGetterFunction"), - @JsonSubTypes.Type(value = CustomFunction.class, name = "customFunction") + @JsonSubTypes.Type(value = CustomFunction.class, name = "customFunction"), + @JsonSubTypes.Type(value = BetweenFunction.class, name = "betweenFunction"), + @JsonSubTypes.Type(value = IntervalFunction.class, name = "intervalFunction"), + @JsonSubTypes.Type(value = AddFunction.class, name = "addFunction"), + @JsonSubTypes.Type(value = SubtractFunction.class, name = "subtractFunction") }) public interface FunctionParam { diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/AddFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/AddFunction.java new file mode 100644 index 000000000..75025b583 --- /dev/null +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/AddFunction.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.transformation.function; + +import com.google.common.base.Preconditions; +import lombok.Data; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.inlong.sort.protocol.transformation.Function; +import org.apache.inlong.sort.protocol.transformation.FunctionParam; + +import javax.annotation.Nonnull; +import java.util.Arrays; +import java.util.List; + +/** + * The function for add + */ +@JsonTypeName("addFunction") +@Data +public class AddFunction implements Function { + + @Nonnull + @JsonProperty("leftField") + private final FunctionParam leftField; + @Nonnull + @JsonProperty("rightField") + private final FunctionParam rightField; + + @JsonCreator + public AddFunction(@Nonnull @JsonProperty("leftField") FunctionParam leftField, + @Nonnull @JsonProperty("rightField") FunctionParam rightField) { + this.leftField = Preconditions.checkNotNull(leftField, "leftField is null"); + this.rightField = Preconditions.checkNotNull(rightField, "rightField is null"); + } + + @Override + public List<FunctionParam> getParams() { + return Arrays.asList(leftField, rightField); + } + + @Override + public String getName() { + return "+"; + } + + @Override + public String format() { + return String.format("%s %s %s", leftField.format(), getName(), rightField.format()); + } +} diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java new file mode 100644 index 000000000..df7cbbcd4 --- /dev/null +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunction.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.transformation.function; + +import com.google.common.base.Preconditions; +import lombok.Data; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.inlong.sort.protocol.transformation.FilterFunction; +import org.apache.inlong.sort.protocol.transformation.FunctionParam; +import org.apache.inlong.sort.protocol.transformation.LogicOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; + +import javax.annotation.Nonnull; +import java.util.Arrays; +import java.util.List; + +/** + * The function for between + */ +@JsonTypeName("betweenFunction") +@Data +public class BetweenFunction implements FilterFunction { + + @Nonnull + @JsonProperty("field") + private final FunctionParam field; + @Nonnull + @JsonProperty("start") + private final FunctionParam start; + @Nonnull + @JsonProperty("end") + private final FunctionParam end; + @Nonnull + @JsonProperty("logicOperator") + private final LogicOperator logicOperator; + + @JsonCreator + public BetweenFunction( + @Nonnull @JsonProperty("logicOperator") LogicOperator logicOperator, + @Nonnull @JsonProperty("field") FunctionParam field, + @Nonnull @JsonProperty("start") FunctionParam start, + @Nonnull @JsonProperty("end") FunctionParam end) { + this.field = Preconditions.checkNotNull(field, "field is null"); + this.start = Preconditions.checkNotNull(start, "start is null"); + this.end = Preconditions.checkNotNull(end, "end is null"); + this.logicOperator = Preconditions.checkNotNull(logicOperator, "logicOperator is null"); + } + + @Override + public List<FunctionParam> getParams() { + return Arrays.asList(logicOperator, field, start, end); + } + + @Override + public String getName() { + return "BETWEEN"; + } + + @Override + public String format() { + String format = "%s %s %s %s AND %s"; + if (logicOperator == EmptyOperator.getInstance()) { + format = "%s%s %s %s AND %s"; + } + return String.format(format, logicOperator.format(), field.format(), getName(), start.format(), end.format()); + } +} diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CastFunction.java similarity index 51% copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java copy to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CastFunction.java index 1955dcb29..6dcd858c1 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/CastFunction.java @@ -15,38 +15,50 @@ * limitations under the License. */ -package org.apache.inlong.sort.protocol.transformation; +package org.apache.inlong.sort.protocol.transformation.function; +import com.google.common.base.Preconditions; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.transformation.ConstantParam; +import org.apache.inlong.sort.protocol.transformation.Function; +import org.apache.inlong.sort.protocol.transformation.FunctionParam; +import java.util.Arrays; +import java.util.List; -/** - * Defines the relation between fields from input to output field - */ -@JsonTypeInfo( - use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.PROPERTY, - property = "type") -@JsonTypeName("fieldRelation") +@JsonTypeName("cast") @Data @NoArgsConstructor -public class FieldRelation { +public class CastFunction implements Function { + + @JsonProperty("field") + private FunctionParam field; - @JsonProperty("inputField") - private FunctionParam inputField; - @JsonProperty("outputField") - private FieldInfo outputField; + private String type; @JsonCreator - public FieldRelation(@JsonProperty("inputField") FunctionParam inputField, - @JsonProperty("outputField") FieldInfo outputField) { - this.inputField = inputField; - this.outputField = outputField; + public CastFunction(@JsonProperty("field") FunctionParam field, + @JsonProperty("type") String type) { + this.field = Preconditions.checkNotNull(field, "field is null"); + this.type = Preconditions.checkNotNull(type, "type is null"); + } + + @Override + public List<FunctionParam> getParams() { + return Arrays.asList(field, new ConstantParam(type)); + } + + @Override + public String getName() { + return "CAST"; + } + + @Override + public String format() { + return String.format("%s(%s AS %s)", getName(), field.format(), type); } } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunction.java new file mode 100644 index 000000000..2c6897dca --- /dev/null +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunction.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.transformation.function; + +import com.google.common.base.Preconditions; +import lombok.Data; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.inlong.sort.protocol.transformation.Function; +import org.apache.inlong.sort.protocol.transformation.FunctionParam; +import org.apache.inlong.sort.protocol.transformation.StringConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam; + +import javax.annotation.Nonnull; +import java.util.Arrays; +import java.util.List; + +/** + * The function for interval + */ +@JsonTypeName("intervalFunction") +@Data +public class IntervalFunction implements Function { + + @Nonnull + @JsonProperty("interval") + private final StringConstantParam interval; + + @Nonnull + @JsonProperty("timeUnit") + private final TimeUnitConstantParam timeUnit; + + @JsonCreator + public IntervalFunction(@Nonnull @JsonProperty("interval") StringConstantParam interval, + @Nonnull @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) { + this.interval = Preconditions.checkNotNull(interval, "interval is null"); + this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null"); + } + + @Override + public List<FunctionParam> getParams() { + return Arrays.asList(interval, timeUnit); + } + + @Override + public String getName() { + return "INTERVAL"; + } + + @Override + public String format() { + return String.format("%s %s %s", getName(), interval.format(), timeUnit.format()); + } +} diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunction.java new file mode 100644 index 000000000..24133695a --- /dev/null +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunction.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.transformation.function; + +import com.google.common.base.Preconditions; +import lombok.Data; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.inlong.sort.protocol.transformation.Function; +import org.apache.inlong.sort.protocol.transformation.FunctionParam; + +import javax.annotation.Nonnull; +import java.util.Arrays; +import java.util.List; + +/** + * The function for subtract + */ +@JsonTypeName("subtractFunction") +@Data +public class SubtractFunction implements Function { + + @Nonnull + @JsonProperty("leftField") + private final FunctionParam leftField; + @Nonnull + @JsonProperty("rightField") + private final FunctionParam rightField; + + @JsonCreator + public SubtractFunction(@Nonnull @JsonProperty("leftField") FunctionParam leftField, + @Nonnull @JsonProperty("rightField") FunctionParam rightField) { + this.leftField = Preconditions.checkNotNull(leftField, "leftField is null"); + this.rightField = Preconditions.checkNotNull(rightField, "rightField is null"); + } + + @Override + public List<FunctionParam> getParams() { + return Arrays.asList(leftField, rightField); + } + + @Override + public String getName() { + return "-"; + } + + @Override + public String format() { + return String.format("%s %s %s", leftField.format(), getName(), rightField.format()); + } +} diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelation.java similarity index 92% copy from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java copy to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelation.java index 2b9b5782d..e1c283fcf 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelation.java @@ -37,7 +37,7 @@ import java.util.Map; @EqualsAndHashCode(callSuper = true) @Data @NoArgsConstructor -public class InnerTemporalJoinRelationRelation extends TemporalJoinRelation { +public class InnerTemporalJoinRelation extends TemporalJoinRelation { /** * Constructor @@ -50,11 +50,11 @@ public class InnerTemporalJoinRelationRelation extends TemporalJoinRelation { * @param systemTime The system time for temporal join */ @JsonCreator - public InnerTemporalJoinRelationRelation( + public InnerTemporalJoinRelation( @JsonProperty("inputs") List<String> inputs, @JsonProperty("outputs") List<String> outputs, @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap, - @Nullable @JsonProperty("systemTimeMap") FieldInfo systemTime) { + @Nullable @JsonProperty("systemTime") FieldInfo systemTime) { super(inputs, outputs, joinConditionMap, systemTime); } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelation.java similarity index 64% rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelation.java index 2b9b5782d..e1a0d1153 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelation.java @@ -17,27 +17,25 @@ package org.apache.inlong.sort.protocol.transformation.relation; +import com.google.common.base.Preconditions; import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.transformation.FilterFunction; -import javax.annotation.Nullable; +import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; /** - * Inner temporal join relation + * This class defines the interval join relation.In interval join, the join conditions is same as filters, + * and so we forbid the filters for interval join. And the same time, + * the joinConditionMap will be allowed to have only one value. */ -@JsonTypeName("innerTemporalJoin") +@JsonTypeName("intervalJoin") @EqualsAndHashCode(callSuper = true) @Data -@NoArgsConstructor -public class InnerTemporalJoinRelationRelation extends TemporalJoinRelation { +public class IntervalJoinRelation extends JoinRelation { /** * Constructor @@ -47,19 +45,18 @@ public class InnerTemporalJoinRelationRelation extends TemporalJoinRelation { * @param joinConditionMap The joinConditionMap is a map of join conditions * the key of joinConditionMap is the node id of join node * the value of joinConditionMap is a list of join contidition - * @param systemTime The system time for temporal join */ - @JsonCreator - public InnerTemporalJoinRelationRelation( - @JsonProperty("inputs") List<String> inputs, + public IntervalJoinRelation(@JsonProperty("inputs") List<String> inputs, @JsonProperty("outputs") List<String> outputs, - @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap, - @Nullable @JsonProperty("systemTimeMap") FieldInfo systemTime) { - super(inputs, outputs, joinConditionMap, systemTime); + @JsonProperty("joinConditionMap") LinkedHashMap<String, List<FilterFunction>> joinConditionMap) { + super(inputs, outputs, joinConditionMap); + Preconditions.checkState(joinConditionMap.size() == 1, + String.format("The size of joinConditionMap must be one for %s", this.getClass().getSimpleName())); } @Override public String format() { - return "INNER JOIN"; + throw new UnsupportedOperationException(String.format("Format is not supported for %s", + this.getClass().getSimpleName())); } } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java index c98e2be30..8ec80c4ee 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java @@ -41,8 +41,9 @@ import java.util.Map; @JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = "innerJoin"), @JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = "leftOuterJoin"), @JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = "rightOutJoin"), - @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, name = "innerTemporalJoin"), - @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin") + @JsonSubTypes.Type(value = InnerTemporalJoinRelation.class, name = "innerTemporalJoin"), + @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelation.class, name = "leftOuterTemporalJoin"), + @JsonSubTypes.Type(value = IntervalJoinRelation.class, name = "intervalJoin") }) @EqualsAndHashCode(callSuper = true) @Data diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelation.java similarity index 94% rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelation.java index 2a873381e..ad61b93ab 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelation.java @@ -37,7 +37,7 @@ import java.util.Map; @EqualsAndHashCode(callSuper = true) @Data @NoArgsConstructor -public class LeftOuterTemporalJoinRelationRelation extends TemporalJoinRelation { +public class LeftOuterTemporalJoinRelation extends TemporalJoinRelation { /** * LeftOuterTemporalJoin Constructor @@ -50,7 +50,7 @@ public class LeftOuterTemporalJoinRelationRelation extends TemporalJoinRelation * @param systemTime The system time for temporal join */ @JsonCreator - public LeftOuterTemporalJoinRelationRelation( + public LeftOuterTemporalJoinRelation( @JsonProperty("inputs") List<String> inputs, @JsonProperty("outputs") List<String> outputs, @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap, diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java index 43a3e725c..bcfa7058f 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java @@ -40,8 +40,9 @@ import java.util.List; @JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = "innerJoin"), @JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = "leftOuterJoin"), @JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = "rightOutJoin"), - @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, name = "innerTemporalJoin"), - @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin"), + @JsonSubTypes.Type(value = InnerTemporalJoinRelation.class, name = "innerTemporalJoin"), + @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelation.class, name = "leftOuterTemporalJoin"), + @JsonSubTypes.Type(value = IntervalJoinRelation.class, name = "intervalJoin"), @JsonSubTypes.Type(value = UnionNodeRelation.class, name = "union"), @JsonSubTypes.Type(value = NodeRelation.class, name = "baseRelation") }) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java index 6a4fc1725..f25dc4b06 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java @@ -40,8 +40,8 @@ import java.util.Map; include = JsonTypeInfo.As.PROPERTY, property = "type") @JsonSubTypes({ - @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, name = "innerTemporalJoin"), - @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin") + @JsonSubTypes.Type(value = InnerTemporalJoinRelation.class, name = "innerTemporalJoin"), + @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelation.class, name = "leftOuterTemporalJoin") }) @EqualsAndHashCode(callSuper = true) @Data diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/AddFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/AddFunctionTest.java new file mode 100644 index 000000000..344c2e8ac --- /dev/null +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/AddFunctionTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.transformation.function; + +import org.apache.inlong.sort.formats.common.TimestampFormatInfo; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.transformation.Function; +import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest; +import org.apache.inlong.sort.protocol.transformation.StringConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit; + +/** + * Test for {@link AddFunction} + */ +public class AddFunctionTest extends FunctionBaseTest { + + @Override + public Function getTestObject() { + return new AddFunction(new FieldInfo("event_time", new TimestampFormatInfo()), + new IntervalFunction(new StringConstantParam("5"), + new TimeUnitConstantParam(TimeUnit.SECOND))); + } + + @Override + public String getExpectFormat() { + return "`event_time` + INTERVAL '5' SECOND"; + } +} diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunctionTest.java new file mode 100644 index 000000000..89518c7e8 --- /dev/null +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/BetweenFunctionTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.transformation.function; + +import org.apache.inlong.sort.formats.common.TimestampFormatInfo; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.transformation.Function; +import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest; +import org.apache.inlong.sort.protocol.transformation.StringConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit; +import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; + +/** + * Test for {@link BetweenFunction} + */ +public class BetweenFunctionTest extends FunctionBaseTest { + + @Override + public Function getTestObject() { + return new BetweenFunction(EmptyOperator.getInstance(), new FieldInfo("order_time", new TimestampFormatInfo()), + new SubtractFunction(new FieldInfo("update_time", new TimestampFormatInfo()), + new IntervalFunction(new StringConstantParam("5"), + new TimeUnitConstantParam(TimeUnit.SECOND))), + new AddFunction(new FieldInfo("update_time", new TimestampFormatInfo()), + new IntervalFunction(new StringConstantParam("5"), + new TimeUnitConstantParam(TimeUnit.SECOND)))); + } + + @Override + public String getExpectFormat() { + return "`order_time` BETWEEN `update_time` - INTERVAL '5' SECOND AND `update_time` + INTERVAL '5' SECOND"; + } +} diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunctionTest.java new file mode 100644 index 000000000..640d6d3b0 --- /dev/null +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/IntervalFunctionTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.transformation.function; + +import org.apache.inlong.sort.protocol.transformation.Function; +import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest; +import org.apache.inlong.sort.protocol.transformation.StringConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit; + +/** + * Test for {@link IntervalFunction} + */ +public class IntervalFunctionTest extends FunctionBaseTest { + + @Override + public Function getTestObject() { + return new IntervalFunction(new StringConstantParam("5"), + new TimeUnitConstantParam(TimeUnit.SECOND)); + } + + @Override + public String getExpectFormat() { + return "INTERVAL '5' SECOND"; + } +} diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunctionTest.java new file mode 100644 index 000000000..57a3eef16 --- /dev/null +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SubtractFunctionTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.transformation.function; + +import org.apache.inlong.sort.formats.common.TimestampFormatInfo; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.transformation.Function; +import org.apache.inlong.sort.protocol.transformation.FunctionBaseTest; +import org.apache.inlong.sort.protocol.transformation.StringConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit; + +/** + * Test for {@link SubtractFunction} + */ +public class SubtractFunctionTest extends FunctionBaseTest { + + @Override + public Function getTestObject() { + return new SubtractFunction(new FieldInfo("event_time", new TimestampFormatInfo()), + new IntervalFunction(new StringConstantParam("5"), + new TimeUnitConstantParam(TimeUnit.SECOND))); + } + + @Override + public String getExpectFormat() { + return "`event_time` - INTERVAL '5' SECOND"; + } +} diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationTest.java new file mode 100644 index 000000000..5109f874a --- /dev/null +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.transformation.relation; + +import org.apache.inlong.sort.SerializeBaseTest; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.formats.common.TimestampFormatInfo; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.transformation.ConstantParam; +import org.apache.inlong.sort.protocol.transformation.FilterFunction; +import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; +import org.apache.inlong.sort.protocol.transformation.operator.AndOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator; +import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; + +/** + * Tests for {@link InnerTemporalJoinRelation} + */ +public class InnerTemporalJoinRelationTest extends SerializeBaseTest<InnerTemporalJoinRelation> { + + @Override + public InnerTemporalJoinRelation getTestObject() { + LinkedHashMap<String, List<FilterFunction>> joinConditionMap = new LinkedHashMap<>(); + joinConditionMap.put("2", Arrays.asList( + new SingleValueFilterFunction(EmptyOperator.getInstance(), + new FieldInfo("name", "1", new StringFormatInfo()), + EqualOperator.getInstance(), new FieldInfo("name", "2", + new StringFormatInfo())), + new SingleValueFilterFunction(AndOperator.getInstance(), + new FieldInfo("name", "1", new StringFormatInfo()), + NotEqualOperator.getInstance(), new ConstantParam("test")))); + return new InnerTemporalJoinRelation(Arrays.asList("1", "2", "3"), + Collections.singletonList("4"), joinConditionMap, new FieldInfo("ts", new TimestampFormatInfo())); + } +} diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelationTest.java new file mode 100644 index 000000000..9e365ae04 --- /dev/null +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/IntervalJoinRelationTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.transformation.relation; + +import org.apache.inlong.sort.SerializeBaseTest; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.transformation.ConstantParam; +import org.apache.inlong.sort.protocol.transformation.FilterFunction; +import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; +import org.apache.inlong.sort.protocol.transformation.operator.AndOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator; +import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; + +/** + * Tests for {@link IntervalJoinRelation} + */ +public class IntervalJoinRelationTest extends SerializeBaseTest<IntervalJoinRelation> { + + @Override + public IntervalJoinRelation getTestObject() { + LinkedHashMap<String, List<FilterFunction>> joinConditionMap = new LinkedHashMap<>(); + joinConditionMap.put("2", Arrays.asList( + new SingleValueFilterFunction(EmptyOperator.getInstance(), + new FieldInfo("name", "1", new StringFormatInfo()), + EqualOperator.getInstance(), new FieldInfo("name", "2", + new StringFormatInfo())), + new SingleValueFilterFunction(AndOperator.getInstance(), + new FieldInfo("name", "1", new StringFormatInfo()), + NotEqualOperator.getInstance(), new ConstantParam("test")))); + return new IntervalJoinRelation(Arrays.asList("1", "2", "3"), + Collections.singletonList("4"), joinConditionMap); + } +} diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftTemporalJoinRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftTemporalJoinRelationTest.java new file mode 100644 index 000000000..1831aa903 --- /dev/null +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftTemporalJoinRelationTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.transformation.relation; + +import org.apache.inlong.sort.SerializeBaseTest; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.formats.common.TimestampFormatInfo; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.transformation.ConstantParam; +import org.apache.inlong.sort.protocol.transformation.FilterFunction; +import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; +import org.apache.inlong.sort.protocol.transformation.operator.AndOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator; +import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; + +/** + * Tests for {@link LeftOuterTemporalJoinRelation} + */ +public class LeftTemporalJoinRelationTest extends SerializeBaseTest<LeftOuterTemporalJoinRelation> { + + @Override + public LeftOuterTemporalJoinRelation getTestObject() { + LinkedHashMap<String, List<FilterFunction>> joinConditionMap = new LinkedHashMap<>(); + joinConditionMap.put("2", Arrays.asList( + new SingleValueFilterFunction(EmptyOperator.getInstance(), + new FieldInfo("name", "1", new StringFormatInfo()), + EqualOperator.getInstance(), new FieldInfo("name", "2", + new StringFormatInfo())), + new SingleValueFilterFunction(AndOperator.getInstance(), + new FieldInfo("name", "1", new StringFormatInfo()), + NotEqualOperator.getInstance(), new ConstantParam("test")))); + return new LeftOuterTemporalJoinRelation(Arrays.asList("1", "2", "3"), + Collections.singletonList("4"), joinConditionMap, new FieldInfo("ts", new TimestampFormatInfo())); + } +} diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java index faa7fdf71..02722a9c5 100644 --- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java +++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java @@ -46,6 +46,7 @@ import org.apache.inlong.sort.protocol.transformation.FieldRelation; import org.apache.inlong.sort.protocol.transformation.FilterFunction; import org.apache.inlong.sort.protocol.transformation.Function; import org.apache.inlong.sort.protocol.transformation.FunctionParam; +import org.apache.inlong.sort.protocol.transformation.relation.IntervalJoinRelation; import org.apache.inlong.sort.protocol.transformation.relation.JoinRelation; import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation; import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation; @@ -198,7 +199,6 @@ public class FlinkSqlParser implements Parser { /** * parse node relation - * <p/> * Here we only parse the output node in the relation, * and the input node parsing is achieved by parsing the dependent node parsing of the output node. * @@ -427,6 +427,15 @@ public class FlinkSqlParser implements Parser { Map<String, List<FilterFunction>> conditionMap = relation.getJoinConditionMap(); if (relation instanceof TemporalJoinRelation) { parseTemporalJoin((TemporalJoinRelation) relation, nodeMap, tableNameAliasMap, conditionMap, sb); + } else if (relation instanceof IntervalJoinRelation) { + Preconditions.checkState(filters == null || filters.isEmpty(), + String.format("filters must be empty for %s", relation.getClass().getSimpleName())); + parseIntervalJoin((IntervalJoinRelation) relation, nodeMap, tableNameAliasMap, sb); + List<FilterFunction> conditions = conditionMap.values().stream().findFirst().orElse(null); + Preconditions.checkState(conditions != null && !conditions.isEmpty(), + String.format("Join conditions must no be empty for %s", relation.getClass().getSimpleName())); + fillOutTableNameAlias(new ArrayList<>(conditions), tableNameAliasMap); + parseFilterFields(FilterStrategy.RETAIN, conditions, sb); } else { parseRegularJoin(relation, nodeMap, tableNameAliasMap, conditionMap, sb); } @@ -443,6 +452,15 @@ public class FlinkSqlParser implements Parser { return sb.toString(); } + private void parseIntervalJoin(IntervalJoinRelation relation, Map<String, Node> nodeMap, + Map<String, String> tableNameAliasMap, StringBuilder sb) { + for (int i = 1; i < relation.getInputs().size(); i++) { + String inputId = relation.getInputs().get(i); + sb.append(", ").append(nodeMap.get(inputId).genTableName()) + .append(" ").append(tableNameAliasMap.get(inputId)); + } + } + private void parseRegularJoin(JoinRelation relation, Map<String, Node> nodeMap, Map<String, String> tableNameAliasMap, Map<String, List<FilterFunction>> conditionMap, StringBuilder sb) { for (int i = 1; i < relation.getInputs().size(); i++) { @@ -590,9 +608,9 @@ public class FlinkSqlParser implements Parser { */ private void parseFilterFields(FilterStrategy filterStrategy, List<FilterFunction> filters, StringBuilder sb) { if (filters != null && !filters.isEmpty()) { - sb.append("\n WHERE "); + sb.append("\nWHERE "); String subSql = StringUtils - .join(filters.stream().map(FunctionParam::format).collect(Collectors.toList()), " "); + .join(filters.stream().map(FunctionParam::format).collect(Collectors.toList()), "\n "); if (filterStrategy == FilterStrategy.REMOVE) { sb.append("not (").append(subSql).append(")"); } else { diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java new file mode 100644 index 000000000..2fc4fa03d --- /dev/null +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.parser; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.inlong.common.enums.MetaField; +import org.apache.inlong.sort.formats.common.DecimalFormatInfo; +import org.apache.inlong.sort.formats.common.LongFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.formats.common.TimestampFormatInfo; +import org.apache.inlong.sort.parser.impl.FlinkSqlParser; +import org.apache.inlong.sort.parser.result.ParseResult; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.GroupInfo; +import org.apache.inlong.sort.protocol.MetaFieldInfo; +import org.apache.inlong.sort.protocol.StreamInfo; +import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode; +import org.apache.inlong.sort.protocol.node.Node; +import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; +import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat; +import org.apache.inlong.sort.protocol.node.format.JsonFormat; +import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode; +import org.apache.inlong.sort.protocol.transformation.FieldRelation; +import org.apache.inlong.sort.protocol.transformation.FilterFunction; +import org.apache.inlong.sort.protocol.transformation.StringConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit; +import org.apache.inlong.sort.protocol.transformation.WatermarkField; +import org.apache.inlong.sort.protocol.transformation.function.AddFunction; +import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction; +import org.apache.inlong.sort.protocol.transformation.function.IntervalFunction; +import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; +import org.apache.inlong.sort.protocol.transformation.function.SubtractFunction; +import org.apache.inlong.sort.protocol.transformation.operator.AndOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator; +import org.apache.inlong.sort.protocol.transformation.relation.IntervalJoinRelation; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Test for Interval join for {@link IntervalJoinRelation} {@link FlinkSqlParser} with {@link KafkaExtractNode} + */ +public class IntervalJoinRelationSqlParseTest extends AbstractTestBase { + + private KafkaExtractNode buildIntervalJoinLeftStream() { + List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("price", new DecimalFormatInfo(32, 2)), + new FieldInfo("currency", new StringFormatInfo()), + new FieldInfo("order_time", new TimestampFormatInfo(3)), + new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME) + ); + return new KafkaExtractNode("1", "kafka_input_1", fields, + new WatermarkField(new FieldInfo("order_time", new TimestampFormatInfo(3))), + null, "orders", "localhost:9092", + new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, + "groupId_1", null); + } + + private KafkaExtractNode buildIntervalJoinRightStream() { + List<FieldInfo> fields = Arrays.asList( + new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2)), + new FieldInfo("currency", new StringFormatInfo()), + new FieldInfo("update_time", new TimestampFormatInfo(3)), + new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME) + ); + return new KafkaExtractNode("2", "kafka_input_2", fields, + new WatermarkField(new FieldInfo("update_time", new TimestampFormatInfo(3))), + null, "currency_rates", "localhost:9092", + new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, + "groupId_2", null); + } + + private KafkaLoadNode buildKafkaLoadNode() { + List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("price", new DecimalFormatInfo(32, 2)), + new FieldInfo("currency", new StringFormatInfo()), + new FieldInfo("order_time", new TimestampFormatInfo(3)), + new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2)) + ); + List<FieldRelation> relations = Arrays.asList( + new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()), + new FieldInfo("id", new LongFormatInfo())), + new FieldRelation(new FieldInfo("price", "1", new DecimalFormatInfo(32, 2)), + new FieldInfo("price", new DecimalFormatInfo(32, 2))), + new FieldRelation(new FieldInfo("currency", "1", new StringFormatInfo()), + new FieldInfo("currency", new StringFormatInfo())), + new FieldRelation(new FieldInfo("order_time", "1", new TimestampFormatInfo(3)), + new FieldInfo("order_time", new TimestampFormatInfo(3))), + new FieldRelation(new FieldInfo("conversion_rate", "2", new DecimalFormatInfo(32, 2)), + new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2))) + ); + return new KafkaLoadNode("3", "kafka_output", fields, relations, null, + null, "orders_output", "localhost:9092", new CanalJsonFormat(), + null, null, null); + } + + /** + * build node relation + * + * @param inputs extract node + * @param outputs load node + * @return node relation + */ + private IntervalJoinRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) { + List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList()); + List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList()); + LinkedHashMap<String, List<FilterFunction>> conditionMap = new LinkedHashMap<>(); + conditionMap.put("2", Arrays.asList( + new SingleValueFilterFunction( + EmptyOperator.getInstance(), + new FieldInfo("currency", "1", new StringFormatInfo()), + EqualOperator.getInstance(), + new FieldInfo("currency", "2", new StringFormatInfo()) + ), + new BetweenFunction( + AndOperator.getInstance(), + new FieldInfo("order_time", "1", new TimestampFormatInfo()), + new SubtractFunction(new FieldInfo("update_time", "2", new TimestampFormatInfo()), + new IntervalFunction(new StringConstantParam("10"), new TimeUnitConstantParam( + TimeUnit.SECOND))), + new AddFunction(new FieldInfo("update_time", "2", new TimestampFormatInfo()), + new IntervalFunction(new StringConstantParam("5"), new TimeUnitConstantParam( + TimeUnit.SECOND))) + ) + )); + return new IntervalJoinRelation(inputIds, outputIds, conditionMap); + } + + /** + * Test inner temporal join with event time for extract is mysql {@link KafkaExtractNode} + * and load is mysql {@link KafkaLoadNode} + * + * @throws Exception The exception may be thrown when executing + */ + @Test + public void testIntervalJoinParse() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(10000); + env.disableOperatorChaining(); + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); + Node leftStream = buildIntervalJoinLeftStream(); + Node rightStream = buildIntervalJoinRightStream(); + Node kafkaLoadNode = buildKafkaLoadNode(); + StreamInfo streamInfo = new StreamInfo("1", + Arrays.asList(leftStream, rightStream, kafkaLoadNode), + Collections.singletonList( + buildNodeRelation(Arrays.asList(leftStream, rightStream), + Collections.singletonList(kafkaLoadNode))) + ); + GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo)); + FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo); + ParseResult result = parser.parse(); + Assert.assertTrue(result.tryExecute()); + } +} diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java index 69453fd95..8fe23b450 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java @@ -46,8 +46,8 @@ import org.apache.inlong.sort.protocol.transformation.WatermarkField; import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator; -import org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelationRelation; -import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelationRelation; +import org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelation; +import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelation; import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation; import org.junit.Assert; import org.junit.Test; @@ -146,9 +146,9 @@ public class MySqlTemporalJoinRelationSqlParseTest extends AbstractTestBase { new FieldInfo("currency", "1", new LongFormatInfo()), EqualOperator.getInstance(), new FieldInfo("currency", "2", new StringFormatInfo())))); if (left) { - return new LeftOuterTemporalJoinRelationRelation(inputIds, outputIds, conditionMap, systemTime); + return new LeftOuterTemporalJoinRelation(inputIds, outputIds, conditionMap, systemTime); } - return new InnerTemporalJoinRelationRelation(inputIds, outputIds, conditionMap, systemTime); + return new InnerTemporalJoinRelation(inputIds, outputIds, conditionMap, systemTime); } /** diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java index 36b3977e5..fdb28ccf7 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java @@ -44,8 +44,8 @@ import org.apache.inlong.sort.protocol.transformation.FilterFunction; import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator; -import org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelationRelation; -import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelationRelation; +import org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelation; +import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelation; import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation; import org.junit.Assert; import org.junit.Test; @@ -144,10 +144,10 @@ public class RedisTemporalJoinRelationSqlParseTest extends AbstractTestBase { new FieldInfo("id", "1", new LongFormatInfo()), EqualOperator.getInstance(), new FieldInfo("k", "5", new StringFormatInfo())))); if (left) { - return new LeftOuterTemporalJoinRelationRelation(inputIds, outputIds, conditionMap, + return new LeftOuterTemporalJoinRelation(inputIds, outputIds, conditionMap, new FieldInfo("proc_time")); } - return new InnerTemporalJoinRelationRelation(inputIds, outputIds, conditionMap, new FieldInfo("proc_time")); + return new InnerTemporalJoinRelation(inputIds, outputIds, conditionMap, new FieldInfo("proc_time")); } /**