hackergin commented on code in PR #25988:
URL: https://github.com/apache/flink/pull/25988#discussion_r1919769640


##########
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh:
##########
@@ -0,0 +1,242 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source "$(dirname "$0")"/common_kubernetes.sh
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+CLUSTER_ROLE_BINDING="flink-role-binding-default"
+APPLICATION_CLUSTER_ID="flink-native-k8s-sql-mt-application-1"
+SESSION_CLUSTER_ID="flink-native-k8s-sql-mt-session-1"
+FLINK_IMAGE_NAME="test_kubernetes_materialized_table-1"
+LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
+IMAGE_BUILD_RETRIES=3
+IMAGE_BUILD_BACKOFF=2
+
+# copy test-filesystem jar & hadoop plugin
+TEST_FILE_SYSTEM_JAR=`ls 
${END_TO_END_DIR}/../flink-test-utils-parent/flink-table-filesystem-test-utils/target/flink-table-filesystem-test-utils-*.jar`
+cp $TEST_FILE_SYSTEM_JAR ${FLINK_DIR}/lib/
+add_optional_plugin "s3-fs-hadoop"
+
+# start kubernetes
+start_kubernetes
+kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit 
--serviceaccount=default:default --namespace=default
+
+# build image
+if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image 
${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
+       echo "ERROR: Could not build image. Aborting..."
+       exit 1
+fi
+
+# setup materialized table data dir
+echo "[INFO] Start S3 env"
+source "$(dirname "$0")"/common_s3_minio.sh
+s3_setup hadoop
+S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/"
+MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}"
+
+
+echo "[INFO] Start SQL Gateway"
+set_config_key "sql-gateway.endpoint.rest.address" "localhost"
+start_sql_gateway
+
+SQL_GATEWAY_REST_PORT=8083
+
+function internal_cleanup {
+    kubectl delete deployment ${APPLICATION_CLUSTER_ID}
+    kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
+}
+
+function open_session() {
+  local session_options=$1
+
+  if [ -z "$session_options" ]; then
+    session_options="{}"
+  fi
+
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"properties\": $session_options}" \
+    "http://localhost:$SQL_GATEWAY_REST_PORT/sessions"; | jq -r '.sessionHandle'
+}
+
+function configure_session() {
+  local session_handle=$1
+  local statement=$2
+
+  response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/configure-session)
+
+  if [ "$response" != "{}" ]; then
+    echo "Configure session $session_handle $statement failed: $response"
+    exit 1
+  fi
+  echo "Configured session $session_handle $statement successfully"
+}
+
+function close_session() {
+  local session_handle=$1
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"session_handle\": \"$session_handle\"}" \
+    http://localhost:$SQL_GATEWAY_REST_PORT/sessions/close
+}
+
+function execute_statement() {
+  local session_handle=$1
+  local statement=$2
+
+  local response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/v3/sessions/$session_handle/statements)
+
+  local operation_handle=$(echo $response | jq -r '.operationHandle')
+  if [ -z "$operation_handle" ] || [ "$operation_handle" == "null" ]; then
+    echo "Failed to execute statement: $statement, response: $response"
+    exit 1
+  fi
+  get_operation_result $session_handle $operation_handle
+}
+
+function get_operation_result() {
+  local session_handle=$1
+  local operation_handle=$2
+
+  local fields_array=()
+  local 
next_uri="v3/sessions/$session_handle/operations/$operation_handle/result/0"
+  while [ ! -z "$next_uri" ] && [ "$next_uri" != "null" ];
+  do
+    response=$(curl -s -X GET \
+      -H "Content-Type:  \
+       application/json" \
+      http://localhost:$SQL_GATEWAY_REST_PORT/$next_uri)
+    result_type=$(echo $response | jq -r '.resultType')
+    result_kind=$(echo $response | jq -r '.resultKind')
+    next_uri=$(echo $response | jq -r '.nextResultUri')
+    errors=$(echo $response | jq -r '.errors')
+    if [ "$errors" != "null" ]; then
+      echo "fetch operation $operation_handle failed: $errors"
+      exit 1
+    fi
+    if [ result_kind == "SUCCESS" ]; then
+      fields_array+="SUCCESS"
+      break;
+    fi
+    if [ "$result_type" != "NOT_READY" ] && [ "$result_kind" == 
"SUCCESS_WITH_CONTENT" ]; then
+      new_fields=$(echo $response | jq -r '.results.data[].fields')
+      fields_array+=$new_fields
+    else
+      sleep 1
+    fi
+  done
+  echo $fields_array
+}
+
+function create_materialized_table_in_continous_mode() {
+  local session_handle=$1
+  local table_name=$2
+  local operation_handle=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE MATERIALIZED TABLE $table_name \
+        PARTITIONED BY (ds) \
+        with (\
+          'format' = 'json',\
+          'sink.rolling-policy.rollover-interval' = '10s',\
+          'sink.rolling-policy.check-interval' = '10s'\
+        )\
+        FRESHNESS = INTERVAL '10' SECOND \
+        AS SELECT \
+          DATE_FORMAT(\`timestamp\`, 'yyyy-MM-dd') AS ds, \
+          \`user\`, \
+          \`type\` \
+        FROM filesystem_source \/*+ options('source.monitor-interval' = '10s') 
*\/ \"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements | 
jq -r '.operationHandle')
+  get_operation_result $session_handle $operation_handle
+}
+
+function create_filesystem_source() {
+  local session_handle=$1
+  local table_name=$2
+  local path=$3
+
+  create_source_result=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE TABLE $table_name (\
+      \`timestamp\` TIMESTAMP_LTZ(3),\
+      \`user\` STRING,\
+      \`type\` STRING\
+    ) WITH (\
+      'format' = 'csv'\
+    )\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements/)
+
+    echo $create_source_result
+}
+
+S3_ENDPOINT=${S3_ENDPOINT//localhost/$(get_host_machine_address)}
+echo "[INFO] Create Materialized Table in Application Mode"
+session_options="{\"table.catalog-store.kind\": \"file\",
+                 \"execution.target\": \"kubernetes-application\",
+                 \"kubernetes.cluster-id\": \"${APPLICATION_CLUSTER_ID}\",
+                 \"kubernetes.container.image.ref\": \"${FLINK_IMAGE_NAME}\",
+                 \"table.catalog-store.file.path\": 
\"$MATERIALIZED_TABLE_DATA_DIR/\",
+                 \"kubernetes.rest-service.exposed.type\": \"NodePort\",
+                 \"s3.endpoint\": \"$S3_ENDPOINT\",
+                 \"workflow-scheduler.type\": \"embedded\"}"
+
+session_handle=$(open_session "$session_options")
+echo "[INFO] Session Handle $session_handle"
+
+mkdir -p $MATERIALIZED_TABLE_DATA_DIR/catalog
+configure_session "$session_handle" "create catalog if not exists test_catalog 
with (\
+                                                'type' = 'test-filesystem',\
+                                                'default-database' = 
'test_db',\
+                                                'path' = 
'$MATERIALIZED_TABLE_DATA_DIR/'\
+                                    )"
+# prepare default db & default catalog
+configure_session $session_handle "use catalog test_catalog"
+configure_session $session_handle "create database if not exists test_db"
+create_filesystem_source $session_handle "filesystem_source"
+
+# 1. create materialized table in continuous mode
+create_materialized_table_in_continous_mode $session_handle 
"my_materialized_table_in_continuous_mode"
+
+configure_session $session_handle "set 'execution.checkpointing.savepoint-dir' 
= '$MATERIALIZED_TABLE_DATA_DIR/savepoint'"
+kubectl wait --for=condition=Available --timeout=30s 
deploy/${APPLICATION_CLUSTER_ID} || exit 1
+jm_pod_name=$(kubectl get pods 
--selector="app=${APPLICATION_CLUSTER_ID},component=jobmanager" -o 
jsonpath='{..metadata.name}')
+wait_num_checkpoints $jm_pod_name 1
+
+# 2. suspend & resume materialized table in continuous mode
+execute_statement $session_handle "alter materialized table 
my_materialized_table_in_continuous_mode suspend"
+
+kubectl delete deployment $APPLICATION_CLUSTER_ID

Review Comment:
   In my test, I found that after "stop with Savepoint", the deployment does 
not end. I need to spend some time to verify it again. In theory, it should 
exit automatically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to