twalthr commented on a change in pull request #17932:
URL: https://github.com/apache/flink/pull/17932#discussion_r759048293



##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/BaseLogicalTypeAssert.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.LogicalTypesTest;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.assertj.core.api.AbstractAssert;
+import org.assertj.core.api.ListAssert;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+
+/** Base assertions for all classes implementing {@link LogicalType}. */
+public abstract class BaseLogicalTypeAssert<
+                SELF extends BaseLogicalTypeAssert<SELF, ACTUAL>, ACTUAL 
extends LogicalType>
+        extends AbstractAssert<SELF, ACTUAL> {
+
+    protected BaseLogicalTypeAssert(ACTUAL logicalType, Class<?> selfType) {
+        super(logicalType, selfType);
+    }
+
+    public SELF isNullable() {
+        isNotNull();
+        assertThat(this.actual.isNullable()).isTrue();
+        return myself;
+    }
+
+    public SELF isNotNullable() {
+        isNotNull();
+        assertThat(this.actual.isNullable()).isFalse();
+        return myself;
+    }
+
+    public ListAssert<LogicalType> getChildren() {
+        isNotNull();
+        return new ListAssert<>(this.actual.getChildren());
+    }
+
+    public SELF hasExactlyChildren(LogicalType... children) {
+        isNotNull();
+        getChildren().containsExactly(children);
+        return myself;
+    }
+
+    public SELF hasSerializableString(String serializableString) {
+        isNotNull();
+        
assertThat(this.actual.asSerializableString()).isEqualTo(serializableString);
+        return myself;
+    }
+
+    public SELF hasNoSerializableString() {
+        isNotNull();
+        
assertThatThrownBy(this.actual::asSerializableString).isInstanceOf(TableException.class);
+        return myself;
+    }
+
+    public SELF hasSummaryString(String summaryString) {
+        isNotNull();
+        assertThat(this.actual.asSummaryString()).isEqualTo(summaryString);
+        return myself;
+    }
+
+    public SELF supportsInputConversion(Class<?> clazz) {
+        isNotNull();
+        assertThat(this.actual.supportsInputConversion(clazz)).isTrue();
+        return myself;
+    }
+
+    public SELF doesntSupportInputConversion(Class<?> clazz) {
+        isNotNull();
+        assertThat(this.actual.supportsInputConversion(clazz)).isFalse();
+        return myself;
+    }
+
+    public SELF supportsOutputConversion(Class<?> clazz) {
+        isNotNull();
+        assertThat(this.actual.supportsOutputConversion(clazz)).isTrue();
+        return myself;
+    }
+
+    public SELF doesntSupportOutputConversion(Class<?> clazz) {
+        isNotNull();
+        assertThat(this.actual.supportsOutputConversion(clazz)).isFalse();
+        return myself;
+    }
+
+    public SELF isJavaSerializable() {
+        isNotNull();
+        try {
+            assertThat(
+                            InstantiationUtil.<LogicalType>deserializeObject(
+                                    
InstantiationUtil.serializeObject(this.actual),
+                                    LogicalTypesTest.class.getClassLoader()))
+                    .isEqualTo(this.actual);
+        } catch (IOException | ClassNotFoundException e) {
+            fail(
+                    "Error when trying to serialize logical type "
+                            + this.actual.asSummaryString()
+                            + " to string",
+                    e);
+        }
+        return myself;
+    }
+
+    public DecimalTypeAssert asDecimal() {

Review comment:
       can we make this more generic for all types? maybe take a class argument?

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/BaseLogicalTypeAssert.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.LogicalTypesTest;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.assertj.core.api.AbstractAssert;
+import org.assertj.core.api.ListAssert;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+
+/** Base assertions for all classes implementing {@link LogicalType}. */
+public abstract class BaseLogicalTypeAssert<
+                SELF extends BaseLogicalTypeAssert<SELF, ACTUAL>, ACTUAL 
extends LogicalType>
+        extends AbstractAssert<SELF, ACTUAL> {
+
+    protected BaseLogicalTypeAssert(ACTUAL logicalType, Class<?> selfType) {
+        super(logicalType, selfType);
+    }
+
+    public SELF isNullable() {
+        isNotNull();
+        assertThat(this.actual.isNullable()).isTrue();
+        return myself;
+    }
+
+    public SELF isNotNullable() {
+        isNotNull();
+        assertThat(this.actual.isNullable()).isFalse();
+        return myself;
+    }
+
+    public ListAssert<LogicalType> getChildren() {
+        isNotNull();
+        return new ListAssert<>(this.actual.getChildren());
+    }
+
+    public SELF hasExactlyChildren(LogicalType... children) {
+        isNotNull();
+        getChildren().containsExactly(children);
+        return myself;
+    }
+
+    public SELF hasSerializableString(String serializableString) {
+        isNotNull();
+        
assertThat(this.actual.asSerializableString()).isEqualTo(serializableString);
+        return myself;
+    }
+
+    public SELF hasNoSerializableString() {
+        isNotNull();
+        
assertThatThrownBy(this.actual::asSerializableString).isInstanceOf(TableException.class);
+        return myself;
+    }
+
+    public SELF hasSummaryString(String summaryString) {
+        isNotNull();
+        assertThat(this.actual.asSummaryString()).isEqualTo(summaryString);
+        return myself;
+    }
+
+    public SELF supportsInputConversion(Class<?> clazz) {
+        isNotNull();
+        assertThat(this.actual.supportsInputConversion(clazz)).isTrue();
+        return myself;
+    }
+
+    public SELF doesntSupportInputConversion(Class<?> clazz) {

Review comment:
       nit: `doesnt` -> `doesNot`

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.assertj.core.api.AbstractThrowableAssert;
+import org.assertj.core.api.AssertFactory;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.InstanceOfAssertFactory;
+import org.assertj.core.api.ListAssert;
+
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Some reusable assertions and utilities for AssertJ. */
+public final class FlinkAssertions {
+
+    private FlinkAssertions() {}
+
+    /**
+     * Shorthand for {@code * 
assertThat(throwable).extracting(FlinkAssertions::chainOfCauses,

Review comment:
       Remove `*`. Maybe a block code example, instead?

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataAssert.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.assertj.core.api.AbstractAssert;
+import org.assertj.core.api.LongAssert;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Assertions for {@link RowData}. */
+public class RowDataAssert extends AbstractAssert<RowDataAssert, RowData> {
+
+    public RowDataAssert(RowData rowData) {
+        super(rowData, RowDataAssert.class);
+    }
+
+    public RowDataAssert hasKind(RowKind kind) {
+        isNotNull();
+        assertThat(this.actual.getRowKind()).isEqualTo(kind);
+        return this;
+    }
+
+    public RowDataAssert hasArity(int arity) {
+        isNotNull();
+        assertThat(this.actual.getArity()).isEqualTo(arity);
+        return this;
+    }
+
+    public StringDataAssert getString(int index) {

Review comment:
       return string immediately? no test really wants to work on `StringData`. 
we can still add `getStringData` in the future?

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkAssertions.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.assertj.core.api.AbstractThrowableAssert;
+import org.assertj.core.api.AssertFactory;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.InstanceOfAssertFactory;
+import org.assertj.core.api.ListAssert;
+
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Some reusable assertions and utilities for AssertJ. */
+public final class FlinkAssertions {
+
+    private FlinkAssertions() {}
+
+    /**
+     * Shorthand for {@code * 
assertThat(throwable).extracting(FlinkAssertions::chainOfCauses,
+     * FlinkAssertions.STREAM_THROWABLE)}.
+     */
+    public ListAssert<Throwable> assertThatChainOfCauses(Throwable root) {
+        return assertThat(root).extracting(FlinkAssertions::chainOfCauses, 
STREAM_THROWABLE);
+    }
+
+    /**
+     * You can use this method in combination with {@link
+     * AbstractThrowableAssert#extracting(Function, AssertFactory)} to perform 
assertions on a chain
+     * of causes, for example {@code
+     * assertThat(throwable).extracting(FlinkAssertions::chainOfCauses,
+     * FlinkAssertions.STREAM_THROWABLE)}.
+     *
+     * @return the list is ordered from the current {@link Throwable} up to 
the root cause.
+     */
+    public static Stream<Throwable> chainOfCauses(Throwable throwable) {
+        if (throwable == null) {
+            return Stream.empty();
+        }
+        if (throwable.getCause() == null) {
+            return Stream.of(throwable);
+        }
+        return Stream.concat(Stream.of(throwable), 
chainOfCauses(throwable.getCause()));
+    }
+
+    @SuppressWarnings({"rawtypes", "unused"})
+    public static final InstanceOfAssertFactory<Stream, ListAssert<Throwable>> 
STREAM_THROWABLE =

Review comment:
       constants to the top of the class and with JavaDoc.

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/BaseLogicalTypeAssert.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.LogicalTypesTest;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.assertj.core.api.AbstractAssert;
+import org.assertj.core.api.ListAssert;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+
+/** Base assertions for all classes implementing {@link LogicalType}. */
+public abstract class BaseLogicalTypeAssert<
+                SELF extends BaseLogicalTypeAssert<SELF, ACTUAL>, ACTUAL 
extends LogicalType>
+        extends AbstractAssert<SELF, ACTUAL> {
+
+    protected BaseLogicalTypeAssert(ACTUAL logicalType, Class<?> selfType) {
+        super(logicalType, selfType);
+    }
+
+    public SELF isNullable() {
+        isNotNull();
+        assertThat(this.actual.isNullable()).isTrue();
+        return myself;
+    }
+
+    public SELF isNotNullable() {
+        isNotNull();
+        assertThat(this.actual.isNullable()).isFalse();
+        return myself;
+    }
+
+    public ListAssert<LogicalType> getChildren() {
+        isNotNull();
+        return new ListAssert<>(this.actual.getChildren());
+    }
+
+    public SELF hasExactlyChildren(LogicalType... children) {
+        isNotNull();
+        getChildren().containsExactly(children);
+        return myself;
+    }
+
+    public SELF hasSerializableString(String serializableString) {
+        isNotNull();
+        
assertThat(this.actual.asSerializableString()).isEqualTo(serializableString);
+        return myself;
+    }
+
+    public SELF hasNoSerializableString() {
+        isNotNull();
+        
assertThatThrownBy(this.actual::asSerializableString).isInstanceOf(TableException.class);
+        return myself;
+    }
+
+    public SELF hasSummaryString(String summaryString) {
+        isNotNull();
+        assertThat(this.actual.asSummaryString()).isEqualTo(summaryString);
+        return myself;
+    }
+
+    public SELF supportsInputConversion(Class<?> clazz) {
+        isNotNull();
+        assertThat(this.actual.supportsInputConversion(clazz)).isTrue();
+        return myself;
+    }
+
+    public SELF doesntSupportInputConversion(Class<?> clazz) {
+        isNotNull();
+        assertThat(this.actual.supportsInputConversion(clazz)).isFalse();
+        return myself;
+    }
+
+    public SELF supportsOutputConversion(Class<?> clazz) {
+        isNotNull();
+        assertThat(this.actual.supportsOutputConversion(clazz)).isTrue();
+        return myself;
+    }
+
+    public SELF doesntSupportOutputConversion(Class<?> clazz) {
+        isNotNull();
+        assertThat(this.actual.supportsOutputConversion(clazz)).isFalse();
+        return myself;
+    }
+
+    public SELF isJavaSerializable() {
+        isNotNull();
+        try {
+            assertThat(
+                            InstantiationUtil.<LogicalType>deserializeObject(
+                                    
InstantiationUtil.serializeObject(this.actual),
+                                    LogicalTypesTest.class.getClassLoader()))
+                    .isEqualTo(this.actual);
+        } catch (IOException | ClassNotFoundException e) {
+            fail(
+                    "Error when trying to serialize logical type "
+                            + this.actual.asSummaryString()
+                            + " to string",
+                    e);
+        }
+        return myself;
+    }
+
+    public DecimalTypeAssert asDecimal() {

Review comment:
       Or better: we just offer the 
`LogicalTypeUtils.getLength/getPrecision/getScale` here and an assertion for 
the type root.

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DecimalTypeAssert.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.table.types.logical.DecimalType;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Assertions for {@link DecimalType}. */
+public class DecimalTypeAssert extends 
BaseLogicalTypeAssert<DecimalTypeAssert, DecimalType> {

Review comment:
       as mentioned above: let's drop this?

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
##########
@@ -68,92 +66,79 @@ public void testProjectRow() {
         final DataType topLevelRow =
                 ROW(FIELD("a0", INT()), FIELD("a1", secondLevelRow), 
FIELD("a1_b1_c0", INT()));
 
-        assertThat(
-                DataTypeUtils.projectRow(topLevelRow, new int[][] {{0}, {1, 1, 
0}}),
-                equalTo(ROW(FIELD("a0", INT()), FIELD("a1_b1_c0", 
BOOLEAN()))));
+        assertThat(DataTypeUtils.projectRow(topLevelRow, new int[][] {{0}, {1, 
1, 0}}))
+                .isEqualTo(ROW(FIELD("a0", INT()), FIELD("a1_b1_c0", 
BOOLEAN())));
 
-        assertThat(
-                DataTypeUtils.projectRow(topLevelRow, new int[][] {{1, 1}, 
{0}}),
-                equalTo(ROW(FIELD("a1_b1", thirdLevelRow), FIELD("a0", 
INT()))));
+        assertThat(DataTypeUtils.projectRow(topLevelRow, new int[][] {{1, 1}, 
{0}}))
+                .isEqualTo(ROW(FIELD("a1_b1", thirdLevelRow), FIELD("a0", 
INT())));
 
         assertThat(
-                DataTypeUtils.projectRow(
-                        topLevelRow, new int[][] {{1, 1, 2}, {1, 1, 1}, {1, 1, 
0}}),
-                equalTo(
+                        DataTypeUtils.projectRow(
+                                topLevelRow, new int[][] {{1, 1, 2}, {1, 1, 
1}, {1, 1, 0}}))
+                .isEqualTo(
                         ROW(
                                 FIELD("a1_b1_c2", INT()),
                                 FIELD("a1_b1_c1", DOUBLE()),
-                                FIELD("a1_b1_c0", BOOLEAN()))));
+                                FIELD("a1_b1_c0", BOOLEAN())));
 
-        assertThat(
-                DataTypeUtils.projectRow(topLevelRow, new int[][] {{1, 1, 0}, 
{2}}),
-                equalTo(ROW(FIELD("a1_b1_c0", BOOLEAN()), FIELD("a1_b1_c0_$0", 
INT()))));
+        assertThat(DataTypeUtils.projectRow(topLevelRow, new int[][] {{1, 1, 
0}, {2}}))
+                .isEqualTo(ROW(FIELD("a1_b1_c0", BOOLEAN()), 
FIELD("a1_b1_c0_$0", INT())));
     }
 
     @Test
     public void testAppendRowFields() {
-        {
-            final DataType row =
-                    ROW(FIELD("a0", BOOLEAN()), FIELD("a1", DOUBLE()), 
FIELD("a2", INT()));
-
-            final DataType expectedRow =
-                    ROW(
-                            FIELD("a0", BOOLEAN()),
-                            FIELD("a1", DOUBLE()),
-                            FIELD("a2", INT()),
-                            FIELD("a3", BIGINT()),
-                            FIELD("a4", TIMESTAMP(3)));
-
-            assertThat(
-                    DataTypeUtils.appendRowFields(
-                            row, Arrays.asList(FIELD("a3", BIGINT()), 
FIELD("a4", TIMESTAMP(3)))),
-                    equalTo(expectedRow));
-        }
-
-        {
-            final DataType row = ROW();
-
-            final DataType expectedRow = ROW(FIELD("a", BOOLEAN()), FIELD("b", 
INT()));
-
-            assertThat(
-                    DataTypeUtils.appendRowFields(
-                            row, Arrays.asList(FIELD("a", BOOLEAN()), 
FIELD("b", INT()))),
-                    equalTo(expectedRow));
-        }
+        assertThat(
+                        DataTypeUtils.appendRowFields(
+                                ROW(
+                                        FIELD("a0", BOOLEAN()),
+                                        FIELD("a1", DOUBLE()),
+                                        FIELD("a2", INT())),
+                                Arrays.asList(FIELD("a3", BIGINT()), 
FIELD("a4", TIMESTAMP(3)))))
+                .isEqualTo(
+                        ROW(
+                                FIELD("a0", BOOLEAN()),
+                                FIELD("a1", DOUBLE()),
+                                FIELD("a2", INT()),
+                                FIELD("a3", BIGINT()),
+                                FIELD("a4", TIMESTAMP(3))));
+
+        assertThat(
+                        DataTypeUtils.appendRowFields(
+                                ROW(), Arrays.asList(FIELD("a", BOOLEAN()), 
FIELD("b", INT()))))
+                .isEqualTo(ROW(FIELD("a", BOOLEAN()), FIELD("b", INT())));
     }
 
+    private static final Condition<DataType> INTERNAL =
+            new Condition<>(DataTypeUtils::isInternal, "internal");
+
     @Test
     public void testIsInternalClass() {
-        assertTrue(DataTypeUtils.isInternal(DataTypes.INT()));
-        
assertTrue(DataTypeUtils.isInternal(DataTypes.INT().notNull().bridgedTo(int.class)));
-        
assertTrue(DataTypeUtils.isInternal(DataTypes.ROW().bridgedTo(RowData.class)));
-        assertFalse(DataTypeUtils.isInternal(DataTypes.ROW()));
+        assertThat(DataTypes.INT()).is(INTERNAL);
+        
assertThat(DataTypes.INT().notNull().bridgedTo(int.class)).is(INTERNAL);

Review comment:
       btw it could be useful to add this to `DataTypeAssert`, also the fields 
names

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
##########
@@ -35,7 +35,14 @@
 import java.util.function.Function;
 import java.util.function.Predicate;
 
-/** Some reusable hamcrest matchers for Flink. */
+/**
+ * Some reusable hamcrest matchers for Flink.
+ *
+ * @deprecated You should assertj assertions, which have built-in assertions 
for {@link
+ *     CompletableFuture}. To check chains of {@link Throwable} causes, use 
{@link
+ *     FlinkAssertions#assertThatChainOfCauses(Throwable)}
+ */
+@Deprecated

Review comment:
       Changes for the core Flink testing infrastructure should not happen in a 
`table-common` commit. Please create a separate commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to