This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new ff8b0db KYLIN-4581 Add spark and flink engine test case for release
test
ff8b0db is described below
commit ff8b0dbc8156c5a63e3895d7f8276f88c46312f8
Author: yaqian.zhang <[email protected]>
AuthorDate: Fri Jun 19 11:48:25 2020 +0800
KYLIN-4581 Add spark and flink engine test case for release test
---
build/smoke-test/smoke-test.sh | 1 +
build/smoke-test/testBuildCube.py | 51 ++++++++++++++------
build/smoke-test/testQuery.py | 56 ++++++++++++++--------
.../kylin/rest/controller/CubeController.java | 25 ++++++++++
4 files changed, 99 insertions(+), 34 deletions(-)
diff --git a/build/smoke-test/smoke-test.sh b/build/smoke-test/smoke-test.sh
index 0a7e362..cc10889 100755
--- a/build/smoke-test/smoke-test.sh
+++ b/build/smoke-test/smoke-test.sh
@@ -69,6 +69,7 @@ sed -i 's/#*\(kylin.query.pushdown.jdbc.url*\)/\1/'
conf/kylin.properties
sed -i 's/#*\(kylin.query.pushdown.jdbc.driver*\)/\1/' conf/kylin.properties
sed -i 's/#*\(kylin.query.pushdown.jdbc.username*\)/\1/' conf/kylin.properties
+${KYLIN_HOME}/bin/download-flink.sh
${KYLIN_HOME}/bin/kylin.sh start
echo "Wait 3 minutes for service start."
diff --git a/build/smoke-test/testBuildCube.py
b/build/smoke-test/testBuildCube.py
index 9452cf2..733d1eb 100644
--- a/build/smoke-test/testBuildCube.py
+++ b/build/smoke-test/testBuildCube.py
@@ -24,23 +24,39 @@ import time
class testBuildCube(unittest.TestCase):
+
+ _base_url = "http://sandbox:7070/kylin/api"
+
+ _headers = {
+ 'content-type': "application/json",
+ 'authorization': "Basic QURNSU46S1lMSU4=",
+ 'cache-control': "no-cache"
+ }
+
+ _clone_cube_url = _base_url + "/cubes/kylin_sales_cube/clone"
+
def setUp(self):
- pass
+ self.clone_cube("kylin_sales_cube_spark", "SPARK")
+ self.clone_cube("kylin_sales_cube_flink", "FLINK")
def tearDown(self):
pass
- def testBuild(self):
- base_url = "http://sandbox:7070/kylin/api"
- url = base_url + "/cubes/kylin_sales_cube/rebuild"
- headers = {
- 'content-type': "application/json",
- 'authorization': "Basic QURNSU46S1lMSU4=",
- 'cache-control': "no-cache"
- }
+ def clone_cube(self, cube_name, engine_type):
+ payload = {'project': 'learn_kylin',
+ 'cubeName': cube_name}
+ response = requests.request("PUT", self._clone_cube_url, json=payload,
headers=self._headers)
+ self.assertEqual(response.status_code, 200, 'Clone cube : ' +
cube_name + ' failed.')
+ update_engine_url = self._base_url + "/cubes/" + cube_name + "/" +
engine_type
+ response = requests.request("PUT", update_engine_url,
headers=self._headers)
+ self.assertEqual(response.status_code, 200, 'Update engine type of
cube : ' + cube_name + ' failed.')
+
+ def singleBuild(self, cube_name):
+
+ url = self._base_url + "/cubes/" + cube_name + "/rebuild"
# reload metadata before build cubes
- cache_response = requests.request("PUT", base_url +
"/cache/all/all/update", headers=headers)
+ cache_response = requests.request("PUT", self._base_url +
"/cache/all/all/update", headers=self._headers)
self.assertEqual(cache_response.status_code, 200, 'Metadata cache not
refreshed.')
payload = "{\"startTime\": 1325376000000, \"endTime\": 1456790400000,
\"buildType\":\"BUILD\"}"
@@ -49,7 +65,7 @@ class testBuildCube(unittest.TestCase):
while status_code != 200 and try_time <= 3:
print 'Submit build job, try_time = ' + str(try_time)
try:
- response = requests.request("PUT", url, data=payload,
headers=headers)
+ response = requests.request("PUT", url, data=payload,
headers=self._headers)
status_code = response.status_code
except:
status_code = 0
@@ -64,8 +80,8 @@ class testBuildCube(unittest.TestCase):
print 'Build job is submitted...'
job_response = json.loads(response.text)
job_uuid = job_response['uuid']
- job_url = base_url + "/jobs/" + job_uuid
- job_response = requests.request("GET", job_url, headers=headers)
+ job_url = self._base_url + "/jobs/" + job_uuid
+ job_response = requests.request("GET", job_url,
headers=self._headers)
self.assertEqual(job_response.status_code, 200, 'Build job
information fetched failed.')
@@ -75,7 +91,7 @@ class testBuildCube(unittest.TestCase):
while job_status in ('RUNNING', 'PENDING') and try_time <= 30:
print 'Wait for job complete, try_time = ' + str(try_time)
try:
- job_response = requests.request("GET", job_url,
headers=headers)
+ job_response = requests.request("GET", job_url,
headers=self._headers)
job_info = json.loads(job_response.text)
job_status = job_info['job_status']
except:
@@ -88,6 +104,13 @@ class testBuildCube(unittest.TestCase):
self.assertEquals(job_status, 'FINISHED', 'Build cube failed, job
status is ' + job_status)
print 'Job complete.'
+ def testBuild(self):
+ self.singleBuild("kylin_sales_cube_spark")
+ self.singleBuild("kylin_sales_cube_flink")
+ self.singleBuild("kylin_sales_cube")
+ # wait for kylin_sales_cube to READY
+ time.sleep(10)
+
if __name__ == '__main__':
print 'Test Build Cube for Kylin sample.'
diff --git a/build/smoke-test/testQuery.py b/build/smoke-test/testQuery.py
index ec1702e..ddf6cdd 100644
--- a/build/smoke-test/testQuery.py
+++ b/build/smoke-test/testQuery.py
@@ -41,6 +41,7 @@ class testQuery(unittest.TestCase):
def testQuery(self):
sql_files = glob.glob('sql/*.sql')
+ cube_list = ['kylin_sales_cube', 'kylin_sales_cube_spark',
'kylin_sales_cube_flink']
index = 0
query_url = testQuery.base_url + "/query"
for sql_file in sql_files:
@@ -50,26 +51,36 @@ class testQuery(unittest.TestCase):
for sql_statement_line in sql_statement_lines:
if not sql_statement_line.startswith('--'):
sql_statement += sql_statement_line.strip() + ' '
- payload = "{\"sql\": \"" + sql_statement.strip() + "\",
\"offset\": 0, \"limit\": \"50000\", \"acceptPartial\":false,
\"project\":\"learn_kylin\"}"
- print 'Test Query #' + str(index) + ': \n' + sql_statement
- response = requests.request("POST", query_url, data=payload,
headers=testQuery.headers)
-
- self.assertEqual(response.status_code, 200, 'Query failed.')
- actual_result = json.loads(response.text)
- print 'Query duration: ' + str(actual_result['duration']) + 'ms'
- del actual_result['duration']
- del actual_result['hitExceptionCache']
- del actual_result['storageCacheUsed']
- del actual_result['totalScanCount']
- del actual_result['totalScanBytes']
-
- expect_result = json.loads(open(sql_file[:-4] +
'.json').read().strip())
- self.assertEqual(actual_result, expect_result, 'Query result does
not equal.')
-
- def testQueryPushDown(self):
- sql_files = glob.glob('sql/*.sql')
- index = 0
- url = testQuery.base_url + "/cubes/kylin_sales_cube/disable"
+ for cube_name in cube_list:
+ payload = "{\"sql\": \"" + sql_statement.strip() + "\",
\"offset\": 0, \"limit\": \"50000\", " \
+
"\"acceptPartial\":false, " \
+
"\"project\":\"learn_kylin\", " \
+
"\"backdoorToggles\":{" \
+
"\"DEBUG_TOGGLE_HIT_CUBE\":\"" + cube_name + "\"}} "
+ print 'Test Query #' + str(index) + ': \n' + sql_statement
+ response = requests.request("POST", query_url, data=payload,
headers=testQuery.headers)
+
+ self.assertEqual(response.status_code, 200, 'Query failed.')
+ actual_result = json.loads(response.text)
+ print 'Query duration: ' + str(actual_result['duration']) +
'ms'
+ del actual_result['duration']
+ del actual_result['hitExceptionCache']
+ del actual_result['storageCacheUsed']
+ del actual_result['totalScanCount']
+ del actual_result['totalScanBytes']
+
+ expect_result = json.loads(open(sql_file[:-4] +
'.json').read().strip())
+ expect_result["cube"] = u'CUBE[name=' + cube_name + ']'
+
+ self.assertEqual(actual_result, expect_result, 'Query result
does not equal.')
+
+ def disableCube(self):
+ self.disableSingleCube("kylin_sales_cube")
+ self.disableSingleCube("kylin_sales_cube_spark")
+ self.disableSingleCube("kylin_sales_cube_flink")
+
+ def disableSingleCube(self, cube_name):
+ url = testQuery.base_url + "/cubes/" + cube_name + "/disable"
status_code = 0
try_time = 1
while status_code != 200 and try_time <= 3:
@@ -86,9 +97,14 @@ class testQuery(unittest.TestCase):
self.assertEqual(status_code, 200, 'Disable cube failed.')
+ def testQueryPushDown(self):
+ self.disableCube()
# Sleep 3 seconds to ensure cache wiped while do query pushdown
time.sleep(3)
+ sql_files = glob.glob('sql/*.sql')
+ index = 0
+
query_url = testQuery.base_url + "/query"
for sql_file in sql_files:
index += 1
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index a96cdda..c71bacd 100644
---
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -53,6 +53,7 @@ import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.metadata.model.IEngineAware;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentRange.TSRange;
@@ -781,6 +782,30 @@ public class CubeController extends BasicController {
return cubeRequest;
}
+ @RequestMapping(value = "/{cubeName}/{engineType}", method =
RequestMethod.PUT)
+ @ResponseBody
+ public void updateCubeEngineType(@PathVariable String cubeName,
@PathVariable String engineType) throws IOException {
+ checkCubeExists(cubeName);
+ CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
+ CubeDesc desc = cube.getDescriptor();
+ int engineTypeID = desc.getEngineType();
+ switch(engineType) {
+ case "MR_V2":
+ engineTypeID = IEngineAware.ID_MR_V2;
+ break;
+ case "SPARK":
+ engineTypeID = IEngineAware.ID_SPARK;
+ break;
+ case "FLINK":
+ engineTypeID = IEngineAware.ID_FLINK;
+ break;
+ default:
+ logger.warn("Engine type {} is not support", engineType);
+ }
+ desc.setEngineType(engineTypeID);
+ cubeService.updateCubeAndDesc(cube, desc, cube.getProject(), true);
+ }
+
/**
* get Hbase Info
*