hequn8128 commented on a change in pull request #12246:
URL: https://github.com/apache/flink/pull/12246#discussion_r428525745



##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)

Review comment:
       Remove this line. If table_result is None, the following tests would 
failed.

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 
'k2' = 'b')")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        field_names = ["k1", "k2", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+        table_result = t_env.execute_sql("insert into sinks select * from tbl")
+        self.assertIsNotNone(table_result)
+        self.assertIsNotNone(table_result.get_job_client())
+        job_status_feature = table_result.get_job_client().get_job_status()
+        job_execution_result_feature = 
table_result.get_job_client().get_job_execution_result(
+            get_gateway().jvm.Thread.currentThread().getContextClassLoader())
+        job_execution_result = job_execution_result_feature.result()
+        self.assertIsNotNone(job_execution_result)
+        self.assertIsNotNone(job_execution_result.get_job_id())
+        self.assertIsNotNone(job_execution_result.get_job_execution_result())
+        job_status = job_status_feature.result()
+        self.assertIsNotNone(job_status)
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(),
+                           ["default_catalog.default_database.sinks"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), 
ResultKind.SUCCESS_WITH_CONTENT)
+        table_result.print()
+
+        table_result = t_env.execute_sql("drop table tbl")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())

Review comment:
       Remove this line

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 
'k2' = 'b')")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())

Review comment:
       Remove this line.

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 
'k2' = 'b')")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        field_names = ["k1", "k2", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+        table_result = t_env.execute_sql("insert into sinks select * from tbl")
+        self.assertIsNotNone(table_result)
+        self.assertIsNotNone(table_result.get_job_client())
+        job_status_feature = table_result.get_job_client().get_job_status()
+        job_execution_result_feature = 
table_result.get_job_client().get_job_execution_result(
+            get_gateway().jvm.Thread.currentThread().getContextClassLoader())
+        job_execution_result = job_execution_result_feature.result()
+        self.assertIsNotNone(job_execution_result)
+        self.assertIsNotNone(job_execution_result.get_job_id())
+        self.assertIsNotNone(job_execution_result.get_job_execution_result())
+        job_status = job_status_feature.result()
+        self.assertIsNotNone(job_status)
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(),
+                           ["default_catalog.default_database.sinks"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), 
ResultKind.SUCCESS_WITH_CONTENT)
+        table_result.print()
+
+        table_result = t_env.execute_sql("drop table tbl")
+        self.assertIsNotNone(table_result)

Review comment:
       Remove this line

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 
'k2' = 'b')")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        field_names = ["k1", "k2", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+        table_result = t_env.execute_sql("insert into sinks select * from tbl")
+        self.assertIsNotNone(table_result)
+        self.assertIsNotNone(table_result.get_job_client())
+        job_status_feature = table_result.get_job_client().get_job_status()
+        job_execution_result_feature = 
table_result.get_job_client().get_job_execution_result(
+            get_gateway().jvm.Thread.currentThread().getContextClassLoader())
+        job_execution_result = job_execution_result_feature.result()
+        self.assertIsNotNone(job_execution_result)

Review comment:
       Remove this line.

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())

Review comment:
       Remove this line. If table_schema is None, the test in the next line 
would be failed.

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())

Review comment:
       Remove this line. The test in the next line has also checked the value 
for the result kind.

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 
'k2' = 'b')")
+        self.assertIsNotNone(table_result)

Review comment:
       Remove this line. If table_result is None, the following tests would 
failed.

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 
'k2' = 'b')")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())

Review comment:
       Remove this line.

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 
'k2' = 'b')")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        field_names = ["k1", "k2", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+        table_result = t_env.execute_sql("insert into sinks select * from tbl")
+        self.assertIsNotNone(table_result)
+        self.assertIsNotNone(table_result.get_job_client())
+        job_status_feature = table_result.get_job_client().get_job_status()
+        job_execution_result_feature = 
table_result.get_job_client().get_job_execution_result(
+            get_gateway().jvm.Thread.currentThread().getContextClassLoader())
+        job_execution_result = job_execution_result_feature.result()
+        self.assertIsNotNone(job_execution_result)
+        self.assertIsNotNone(job_execution_result.get_job_id())
+        self.assertIsNotNone(job_execution_result.get_job_execution_result())
+        job_status = job_status_feature.result()
+        self.assertIsNotNone(job_status)
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(),
+                           ["default_catalog.default_database.sinks"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), 
ResultKind.SUCCESS_WITH_CONTENT)
+        table_result.print()
+
+        table_result = t_env.execute_sql("drop table tbl")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())

Review comment:
       Remove this line

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 
'k2' = 'b')")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        field_names = ["k1", "k2", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+        table_result = t_env.execute_sql("insert into sinks select * from tbl")
+        self.assertIsNotNone(table_result)

Review comment:
       Remove this line.

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 
'k2' = 'b')")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        field_names = ["k1", "k2", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+        table_result = t_env.execute_sql("insert into sinks select * from tbl")
+        self.assertIsNotNone(table_result)
+        self.assertIsNotNone(table_result.get_job_client())

Review comment:
       Remove this line

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,67 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 
'k2' = 'b')")
+        self.assertIsNotNone(table_result)
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assert_equals(table_result.get_table_schema().get_field_names(), 
["result"])
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        field_names = ["k1", "k2", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+        table_result = t_env.execute_sql("insert into sinks select * from tbl")
+        self.assertIsNotNone(table_result)
+        self.assertIsNotNone(table_result.get_job_client())
+        job_status_feature = table_result.get_job_client().get_job_status()
+        job_execution_result_feature = 
table_result.get_job_client().get_job_execution_result(
+            get_gateway().jvm.Thread.currentThread().getContextClassLoader())
+        job_execution_result = job_execution_result_feature.result()
+        self.assertIsNotNone(job_execution_result)
+        self.assertIsNotNone(job_execution_result.get_job_id())
+        self.assertIsNotNone(job_execution_result.get_job_execution_result())
+        job_status = job_status_feature.result()
+        self.assertIsNotNone(job_status)

Review comment:
       Remove tests about job status. The cluster would exist before calling 
the `get_job_status` while leads to test failures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to