wuchong commented on a change in pull request #14958: URL: https://github.com/apache/flink/pull/14958#discussion_r580824395
########## File path: flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java ########## @@ -118,6 +119,13 @@ private PerJobMiniClusterFactory( private MiniClusterConfiguration getMiniClusterConfig(int maximumParallelism) { Configuration configuration = new Configuration(this.configuration); + // we need to set this since a lot of test expect this because TestBaseUtils.startCluster() + // enabled this by default. (see MiniClusterResource#startMiniCluster) + // or we can remove this after FLINK-18717 is resolved? + if (!configuration.contains(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE)) { Review comment: It seems that this change is not needed anymore. I reverted this change in my local, and all the tests in sql-client are passed. Could you revert this and see whether the build can pass? ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java ########## @@ -654,11 +656,12 @@ private void callSelect(SqlCommandCall cmdCall) { if (resultDesc.isTableauMode()) { try (CliTableauResultView tableauResultView = new CliTableauResultView(terminal, executor, sessionId, resultDesc)) { - if (resultDesc.isMaterialized()) { - tableauResultView.displayBatchResults(); - } else { - tableauResultView.displayStreamResults(); - } + Map<String, String> properties = executor.getSessionProperties(sessionId); + boolean isStreamingMode = + properties + .getOrDefault(EXECUTION_TYPE, EXECUTION_TYPE_VALUE_STREAMING) + .equals(EXECUTION_TYPE_VALUE_STREAMING); Review comment: We should avoid getting mode in this way, because in the future there are other configuration to change the mode. We should have a single plance to get the mode. What do you think about adding `isStreamingMode()` method to `ResultDescriptor`? ########## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ########## @@ -1768,11 +1783,11 @@ private void executeAndVerifySinkResult( final List<String> actualResults = new ArrayList<>(); while (true) { Thread.sleep(50); // slow the processing down - final TypedResult<List<Tuple2<Boolean, Row>>> result = + final TypedResult<List<Row>> result = executor.retrieveResultChanges(sessionId, resultID); if (result.getType() == TypedResult.ResultType.PAYLOAD) { - for (Tuple2<Boolean, Row> change : result.getPayload()) { - actualResults.add(change.toString()); + for (Row row : result.getPayload()) { + actualResults.add(row.getKind().shortString() + "(" + row.toString() + ")"); Review comment: This can simply be `actualResults.add(row.toString())`, because `row.toString()` has take row kind into account. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java ########## @@ -280,7 +274,7 @@ protected String getTitle() { // add change column schemaHeader.append(' '); schemaHeader.style(AttributedStyle.DEFAULT.underline()); - schemaHeader.append("+/-"); + schemaHeader.append(" op"); Review comment: Why have a space at the beginning? ########## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ########## @@ -1768,11 +1783,11 @@ private void executeAndVerifySinkResult( final List<String> actualResults = new ArrayList<>(); while (true) { Thread.sleep(50); // slow the processing down - final TypedResult<List<Tuple2<Boolean, Row>>> result = + final TypedResult<List<Row>> result = executor.retrieveResultChanges(sessionId, resultID); if (result.getType() == TypedResult.ResultType.PAYLOAD) { - for (Tuple2<Boolean, Row> change : result.getPayload()) { - actualResults.add(change.toString()); + for (Row row : result.getPayload()) { + actualResults.add(row.getKind().shortString() + "(" + row.toString() + ")"); Review comment: This can simply be `actualResults.add(row.toString())`, because `row.toString()` has take row kind into account. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java ########## @@ -124,18 +125,11 @@ protected void refresh() { stopRetrieval(false); break; default: - List<Tuple2<Boolean, Row>> changes = result.getPayload(); + List<Row> changes = result.getPayload(); - for (Tuple2<Boolean, Row> change : changes) { + for (Row change : changes) { // convert row - final String[] changeRow = new String[change.f1.getArity() + 1]; - final String[] row = PrintUtils.rowToString(change.f1); - System.arraycopy(row, 0, changeRow, 1, row.length); - if (change.f0) { - changeRow[0] = "+"; - } else { - changeRow[0] = "-"; - } + final String[] row = PrintUtils.rowToString(change, NULL_COLUMN, true); Review comment: It seems that `PrintUtils.rowToString` doesn't correctly stringify values of TIMESTAMP type. I created FLINK-21456 to track this effort. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org