This is an automated email from the ASF dual-hosted git repository.
knarendran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluo-muchos.git
The following commit(s) were added to refs/heads/main by this push:
new d55351e Validation checks & additional refactoring on Azure
deployments (#395)
d55351e is described below
commit d55351e86f970229ff6f9d0dd313d7161a6d005b
Author: Karthick Narendran <[email protected]>
AuthorDate: Wed Jun 9 19:48:41 2021 +0100
Validation checks & additional refactoring on Azure deployments (#395)
Co-authored-by: Karthick Narendran <[email protected]>
---
ansible/group_vars/.gitignore | 5 +-
ansible/roles/azure/tasks/create_multiple_vmss.yml | 4 +-
.../roles/azure/tasks/create_optional_proxy.yml | 2 +-
ansible/roles/azure/tasks/create_vmss.yml | 4 +-
lib/muchos/azure.py | 35 +--
lib/muchos/config/azure.py | 175 +++++++++++++-
lib/muchos/config/azurevalidationhelpers.py | 153 +++++++++++++
lib/muchos/config/azurevalidations.py | 252 +++++++++++++++++++++
lib/muchos/config/base.py | 47 +++-
lib/muchos/existing.py | 4 +
lib/tests/azure/test_config.py | 3 +
11 files changed, 653 insertions(+), 31 deletions(-)
diff --git a/ansible/group_vars/.gitignore b/ansible/group_vars/.gitignore
index 244b0c6..5e7d273 100644
--- a/ansible/group_vars/.gitignore
+++ b/ansible/group_vars/.gitignore
@@ -1 +1,4 @@
-/all
+# Ignore everything in this directory
+*
+# Except this file
+!.gitignore
diff --git a/ansible/roles/azure/tasks/create_multiple_vmss.yml
b/ansible/roles/azure/tasks/create_multiple_vmss.yml
index 3bac89b..02537d5 100644
--- a/ansible/roles/azure/tasks/create_multiple_vmss.yml
+++ b/ansible/roles/azure/tasks/create_multiple_vmss.yml
@@ -36,10 +36,10 @@
location: "{{ location }}"
name: "{{ vmss_name }}-{{ item.name_suffix }}"
vm_size: "{{ item.sku }}"
- admin_username: "{{ admin_username }}"
+ admin_username: "{{ cluster_user }}"
ssh_password_enabled: false
ssh_public_keys:
- - path: /home/{{ admin_username }}/.ssh/authorized_keys
+ - path: /home/{{ cluster_user }}/.ssh/authorized_keys
key_data: "{{ lookup('file', '~/.ssh/id_rsa.pub') }}"
capacity: "{{ item.capacity }}"
single_placement_group: "{{ False if item.capacity > 100 else omit }}"
diff --git a/ansible/roles/azure/tasks/create_optional_proxy.yml
b/ansible/roles/azure/tasks/create_optional_proxy.yml
index 41e484b..1de89e8 100644
--- a/ansible/roles/azure/tasks/create_optional_proxy.yml
+++ b/ansible/roles/azure/tasks/create_optional_proxy.yml
@@ -66,7 +66,7 @@
name: "{{ azure_proxy_host }}"
network_interface_names:
- "{{ azure_proxy_host }}-nic"
- vm_size: "{{ azure_proxy_host_vm_sku }}"
+ vm_size: Standard_D8s_v3
admin_username: "{{ cluster_user }}"
ssh_password_enabled: false
ssh_public_keys:
diff --git a/ansible/roles/azure/tasks/create_vmss.yml
b/ansible/roles/azure/tasks/create_vmss.yml
index a2a2be8..22e7de4 100644
--- a/ansible/roles/azure/tasks/create_vmss.yml
+++ b/ansible/roles/azure/tasks/create_vmss.yml
@@ -45,10 +45,10 @@
location: "{{ location }}"
name: "{{ vmss_name }}"
vm_size: "{{ vm_sku }}"
- admin_username: "{{ admin_username }}"
+ admin_username: "{{ cluster_user }}"
ssh_password_enabled: false
ssh_public_keys:
- - path: /home/{{ admin_username }}/.ssh/authorized_keys
+ - path: /home/{{ cluster_user }}/.ssh/authorized_keys
key_data: "{{ lookup('file', '~/.ssh/id_rsa.pub') }}"
capacity: "{{ numnodes }}"
virtual_network_name: "{{ vnet }}"
diff --git a/lib/muchos/azure.py b/lib/muchos/azure.py
index a12bede..54cdbb3 100644
--- a/lib/muchos/azure.py
+++ b/lib/muchos/azure.py
@@ -32,16 +32,10 @@ class VmssCluster(ExistingCluster):
def launch(self):
config = self.config
- azure_config = dict(config.items("azure"))
- azure_config["admin_username"] = config.get("general", "cluster_user")
- azure_config["hdfs_ha"] = config.get("general", "hdfs_ha")
+ azure_config = config.ansible_host_vars()
azure_config["vmss_name"] = config.cluster_name
- azure_config["deploy_path"] = config.deploy_path
- azure_config = {
- k: VmssCluster._parse_config_value(v)
- for k, v in azure_config.items()
- }
- subprocess.call(
+
+ retcode = subprocess.call(
[
"ansible-playbook",
path.join(config.deploy_path, "ansible/azure.yml"),
@@ -49,6 +43,12 @@ class VmssCluster(ExistingCluster):
json.dumps(azure_config),
]
)
+ if retcode != 0:
+ exit(
+ "ERROR - Command failed with return code of {0}".format(
+ retcode
+ )
+ )
def status(self):
config = self.config
@@ -84,7 +84,8 @@ class VmssCluster(ExistingCluster):
[
"ansible-playbook",
path.join(
- config.deploy_path, "ansible/azure_terminate.yml",
+ config.deploy_path,
+ "ansible/azure_terminate.yml",
),
"--extra-vars",
json.dumps(azure_config),
@@ -108,7 +109,10 @@ class VmssCluster(ExistingCluster):
retcode = subprocess.call(
[
"ansible-playbook",
- path.join(config.deploy_path, "ansible/azure_wipe.yml",),
+ path.join(
+ config.deploy_path,
+ "ansible/azure_wipe.yml",
+ ),
"--extra-vars",
json.dumps(azure_config),
]
@@ -151,8 +155,7 @@ class VmssCluster(ExistingCluster):
if self.config.use_multiple_vmss():
vmss_hosts = open(
path.join(
- self.config.deploy_path,
- "conf/azure_vmss_to_hosts.conf"
+ self.config.deploy_path, "conf/azure_vmss_to_hosts.conf"
),
"r",
)
@@ -199,7 +202,8 @@ class VmssCluster(ExistingCluster):
print(
"{0}: {1}".format(
- "worker_data_dirs", curr_worker_dirs,
+ "worker_data_dirs",
+ curr_worker_dirs,
),
file=vmss_file,
)
@@ -214,7 +218,8 @@ class VmssCluster(ExistingCluster):
print(
"{0}: {1}".format(
- "default_data_dirs", curr_default_dirs,
+ "default_data_dirs",
+ curr_default_dirs,
),
file=vmss_file,
)
diff --git a/lib/muchos/config/azure.py b/lib/muchos/config/azure.py
index 4a77b02..0e728cc 100644
--- a/lib/muchos/config/azure.py
+++ b/lib/muchos/config/azure.py
@@ -20,6 +20,7 @@ from .base import BaseConfig
from .decorators import ansible_host_var, is_valid, default
from .validators import is_type, is_in
from yaml import load, FullLoader
+from .azurevalidations import validate_azure_configs
class AzureDeployConfig(BaseConfig):
@@ -53,6 +54,10 @@ class AzureDeployConfig(BaseConfig):
def verify_config(self, action):
self._verify_config(action)
+ results = validate_azure_configs(self, action)
+ if len(results) > 0:
+ exit("ERROR - config failed validation {}".format(results))
+
proxy = self.get("general", "proxy_hostname")
cluster_type = self.get("general", "cluster_type")
if cluster_type not in ["azure"]:
@@ -75,14 +80,26 @@ class AzureDeployConfig(BaseConfig):
def mount_root(self):
return self.get("azure", "mount_root")
- def data_dirs_common(self, nodeType):
+ def data_dirs_internal(
+ self,
+ nodeType,
+ num_disks=None,
+ mount_root_actual=None,
+ curr_vm_sku=None,
+ ):
data_dirs = []
- num_disks = self.data_disk_count()
+ num_disks = self.data_disk_count() if num_disks is None else num_disks
+ mount_root_actual = (
+ self.mount_root()
+ if mount_root_actual is None
+ else mount_root_actual
+ )
+ curr_vm_sku = self.vm_sku() if curr_vm_sku is None else curr_vm_sku
# Check if using temp storage (non-NVME) for HDFS
- if num_disks == 0 and self.mount_root() == "/mnt/resource":
- data_dirs.append(self.mount_root())
+ if num_disks == 0 and mount_root_actual == "/mnt/resource":
+ data_dirs.append(mount_root_actual)
return data_dirs
# Check if using Lsv2 NVME temp storage for HDFS
@@ -95,18 +112,21 @@ class AzureDeployConfig(BaseConfig):
"Standard_L80s_v2": 10,
}
- if num_disks == 0 and self.vm_sku() in lsv2_vm_disk_map.keys():
+ if num_disks == 0 and curr_vm_sku in lsv2_vm_disk_map.keys():
# pretend that we have N data disks
# in this case those are NVME temp disks
- num_disks = lsv2_vm_disk_map[self.vm_sku()]
+ num_disks = lsv2_vm_disk_map[curr_vm_sku]
# Persistent data disks attached to VMs
range_var = num_disks + 1
for diskNum in range(1, range_var):
- data_dirs.append(self.mount_root() + str(diskNum))
+ data_dirs.append(mount_root_actual + str(diskNum))
return data_dirs
+ def data_dirs_common(self, nodeType):
+ return self.data_dirs_internal(nodeType, None, None, None)
+
def metrics_drive_ids(self):
drive_ids = []
range_var = self.data_disk_count() + 1
@@ -124,6 +144,11 @@ class AzureDeployConfig(BaseConfig):
return self.getint("azure", "data_disk_count")
@ansible_host_var
+ @is_valid(is_type(int))
+ def disk_size_gb(self):
+ return self.getint("azure", "disk_size_gb")
+
+ @ansible_host_var
@default("/dev/disk/azure/scsi1")
def azure_disk_device_path(self):
return self.get("azure", "azure_disk_device_path")
@@ -180,6 +205,26 @@ class AzureDeployConfig(BaseConfig):
def use_adlsg2(self):
return self.getboolean("azure", "use_adlsg2")
+ @ansible_host_var
+ @default("Standard_LRS")
+ @is_valid(
+ is_in(
+ [
+ "Standard_LRS",
+ "Standard_GRS",
+ "Standard_RAGRS",
+ "Standard_ZRS",
+ "Premium_LRS",
+ ]
+ )
+ )
+ def adls_storage_type(self):
+ return self.get("azure", "adls_storage_type")
+
+ @ansible_host_var
+ def user_assigned_identity(self):
+ return self.get("azure", "user_assigned_identity")
+
@ansible_host_var(name="azure_tenant_id")
@default(None)
def azure_tenant_id(self):
@@ -195,6 +240,11 @@ class AzureDeployConfig(BaseConfig):
def principal_id(self):
return self.get("azure", "principal_id")
+ @ansible_host_var
+ @default(None)
+ def instance_volumes_input(self):
+ return self.get("azure", "instance_volumes_input")
+
@ansible_host_var(name="instance_volumes_adls")
@default(None)
def instance_volumes_adls(self):
@@ -205,3 +255,114 @@ class AzureDeployConfig(BaseConfig):
@is_valid(is_in([True, False]))
def use_multiple_vmss(self):
return self.getboolean("azure", "use_multiple_vmss")
+
+ @ansible_host_var
+ @is_valid(is_type(int))
+ def numnodes(self):
+ return self.getint("azure", "numnodes")
+
+ @ansible_host_var
+ @default(None)
+ def resource_group(self):
+ return self.get("azure", "resource_group")
+
+ @ansible_host_var
+ @default(None)
+ def vnet(self):
+ return self.get("azure", "vnet")
+
+ @ansible_host_var
+ @default(None)
+ def vnet_cidr(self):
+ return self.get("azure", "vnet_cidr")
+
+ @ansible_host_var
+ @default(None)
+ def subnet(self):
+ return self.get("azure", "subnet")
+
+ @ansible_host_var
+ @default(None)
+ def subnet_cidr(self):
+ return self.get("azure", "subnet_cidr")
+
+ @ansible_host_var
+ @default(None)
+ def location(self):
+ return self.get("azure", "location")
+
+ @ansible_host_var
+ @default("")
+ def azure_proxy_host(self):
+ return self.get("azure", "azure_proxy_host")
+
+ @ansible_host_var
+ @default(None)
+ def azure_proxy_host_vm_sku(self):
+ return self.get("azure", "azure_proxy_host_vm_sku")
+
+ @ansible_host_var
+ @default("Standard_LRS")
+ @is_valid(is_in(["Standard_LRS", "Premium_LRS", "StandardSSD_LRS"]))
+ def managed_disk_type(self):
+ return self.get("azure", "managed_disk_type")
+
+ @ansible_host_var
+ def accnet_capable_skus(self):
+ return list(
+ map(
+ lambda r: r.name,
+ filter(
+ lambda s: len(
+ list(
+ filter(
+ lambda c: c.name
+ == "AcceleratedNetworkingEnabled"
+ and c.value == "True",
+ s.capabilities,
+ )
+ )
+ )
+ > 0,
+ self.vm_skus_for_location,
+ ),
+ )
+ )
+
+ @ansible_host_var
+ def premiumio_capable_skus(self):
+ return list(
+ map(
+ lambda r: r.name,
+ filter(
+ lambda s: len(
+ list(
+ filter(
+ lambda c: c.name == "PremiumIO"
+ and c.value == "True",
+ s.capabilities,
+ )
+ )
+ )
+ > 0,
+ self.vm_skus_for_location,
+ ),
+ )
+ )
+
+ def max_data_disks_for_skus(self):
+ n = list(map(lambda r: r.name, self.vm_skus_for_location))
+ d = list(
+ map(
+ lambda s: int(
+ next(
+ filter(
+ lambda c: c.name == "MaxDataDiskCount",
+ s.capabilities,
+ )
+ ).value
+ ),
+ self.vm_skus_for_location,
+ )
+ )
+ return dict(zip(n, d))
diff --git a/lib/muchos/config/azurevalidationhelpers.py
b/lib/muchos/config/azurevalidationhelpers.py
new file mode 100644
index 0000000..421cd10
--- /dev/null
+++ b/lib/muchos/config/azurevalidationhelpers.py
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+def vmss_status_succeeded_if_exists(config, client):
+ multi_vmss = config.getboolean("azure", "use_multiple_vmss")
+ resource_group = config.get("azure", "resource_group")
+ if not multi_vmss:
+ try:
+ vmss = client.virtual_machine_scale_sets.get(
+ resource_group_name=resource_group,
+ vm_scale_set_name=config.cluster_name,
+ )
+ except: # noqa
+ return True
+ else:
+ return vmss.provisioning_state == "Succeeded"
+ else:
+ for vmss_config in config.azure_multiple_vmss_vars.get(
+ "vars_list", []
+ ):
+ cluster_name = "{}-{}".format(
+ config.cluster_name, vmss_config.get("name_suffix", "")
+ )
+ try:
+ vmss = client.virtual_machine_scale_sets.get(
+ resource_group_name=resource_group,
+ vm_scale_set_name=cluster_name,
+ )
+ except: # noqa
+ pass
+ else:
+ if vmss.provisioning_state != "Succeeded":
+ return False
+ return True
+
+
+def validate_disk_count(
+ context,
+ specified_disk_count,
+ mount_root,
+ disk_pattern,
+ validation_errors,
+):
+ # min_data_disk_count is 1 unless we are using exclusively
+ # ephemeral storage (data_disk_count is 0), which in turn is when:
+ # mount_root is /mnt/resource OR
+ # azure_disk_device_pattern is nvme*n1
+ min_data_disk_count = 1
+ using_temporary_disks = False
+
+ if mount_root == "/mnt/resource" or disk_pattern == "nvme*n1":
+ min_data_disk_count = 0
+ using_temporary_disks = True
+
+ # also ensure that the mount root is not /mnt/resource
+ # when the NVME drives are being used
+ if mount_root == "/mnt/resource" and disk_pattern == "nvme*n1":
+ validation_errors.append(
+ "mount_root cannot be "
+ "/mnt/resource when using NVME temp disks!"
+ )
+
+ # additional check to ensure that we don't have data disks specified
+ # when using temp storage
+ if using_temporary_disks and specified_disk_count > 0:
+ validation_errors.append(
+ "Config error for {}: data_disk_count must be 0 "
+ "when using temporary storage!".format(context)
+ )
+
+ # final check if using persistent storage (implied through the variable
+ # min_data_disk_count) that there are sufficient data disks configured
+ if specified_disk_count < min_data_disk_count:
+ validation_errors.append(
+ "Config error for {}: data_disk_count "
+ "must be >= {}!".format(context, min_data_disk_count)
+ )
+
+ return
+
+
+def vmss_cluster_has_appropriate_data_disk_count(config, client):
+ multi_vmss = config.use_multiple_vmss()
+ disk_validation_errors = []
+
+ if not multi_vmss:
+ validate_disk_count(
+ "Cluster",
+ config.data_disk_count(),
+ config.mount_root(),
+ config.azure_disk_device_pattern(),
+ disk_validation_errors,
+ )
+ else:
+ for vmss in config.azure_multiple_vmss_vars.get("vars_list", []):
+ validate_disk_count(
+ "VMSS {}".format(vmss.get("name_suffix")),
+ vmss.get("data_disk_count", 0),
+ vmss.get("mount_root", config.mount_root()),
+ vmss.get(
+ "azure_disk_device_pattern",
+ config.azure_disk_device_pattern(),
+ ),
+ disk_validation_errors,
+ )
+
+ if len(disk_validation_errors) > 0:
+ return " ".join(disk_validation_errors)
+
+
+def vmss_exists(config, client):
+ multi_vmss = config.getboolean("azure", "use_multiple_vmss")
+ resource_group = config.get("azure", "resource_group")
+ if not multi_vmss:
+ try:
+ _ = client.virtual_machine_scale_sets.get(
+ resource_group_name=resource_group,
+ vm_scale_set_name=config.cluster_name,
+ )
+ except: # noqa
+ return False
+ else:
+ return True
+ else:
+ for vmss_config in config.azure_multiple_vmss_vars.get(
+ "vars_list", []
+ ):
+ cluster_name = "{}-{}".format(
+ config.cluster_name, vmss_config.get("name_suffix", "")
+ )
+ try:
+ _ = client.virtual_machine_scale_sets.get(
+ resource_group_name=resource_group,
+ vm_scale_set_name=cluster_name,
+ )
+ except: # noqa
+ return False
+ return True
diff --git a/lib/muchos/config/azurevalidations.py
b/lib/muchos/config/azurevalidations.py
new file mode 100644
index 0000000..803fd39
--- /dev/null
+++ b/lib/muchos/config/azurevalidations.py
@@ -0,0 +1,252 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from .base import ConfigValidator
+from .azurevalidationhelpers import (
+ vmss_status_succeeded_if_exists,
+ vmss_cluster_has_appropriate_data_disk_count,
+ vmss_exists,
+)
+from azure.mgmt.compute import ComputeManagementClient
+from azure.common.client_factory import get_client_from_cli_profile
+
+
+def validate_azure_configs(config, action):
+ # get VM SKU resources for this location. we have to use
+ # a specific API version to do this as this resource_skus
+ # list operation is not allowed in any other API versions
+ # which are available with the version of Azure SDK
+ # that ships with Ansible for Azure
+ config.client = get_client_from_cli_profile(
+ ComputeManagementClient, api_version="2017-09-01"
+ )
+ config.vm_skus_for_location = list(
+ filter(
+ lambda s: s.resource_type == "virtualMachines"
+ and config.location() in s.locations,
+ config.client.resource_skus.list(),
+ )
+ )
+
+ # switch to 2018-06-01 API which has support for other operations
+ # including VMSS checks
+ config.client = get_client_from_cli_profile(
+ ComputeManagementClient, api_version="2018-06-01"
+ )
+
+ validations = (
+ AZURE_VALIDATIONS["common"] + AZURE_VALIDATIONS[action]
+ if action in AZURE_VALIDATIONS
+ else []
+ )
+ return list(
+ filter(
+ lambda r: isinstance(r, str),
+ map(lambda v: v(config, config.client), validations),
+ )
+ )
+
+
+AZURE_VALIDATIONS = {
+ "common": [
+ # if VMSS instances are pending upgrade to latest version
+ # block the execution of the setup phase.
+ ConfigValidator(
+ vmss_status_succeeded_if_exists,
+ "VMSS must not exist or be in 'Succeeded' state",
+ ),
+ # Validate that the data disk configuration is appropriate
+ # considering temp disk usage etc.
+ ConfigValidator(vmss_cluster_has_appropriate_data_disk_count, None),
+ ConfigValidator(lambda config, client: not config.use_multiple_vmss()),
+ # the VM SKU specified is not a valid Azure VM SKU
+ ConfigValidator(
+ lambda config, client: config.vm_sku()
+ in {s.name: s for s in config.vm_skus_for_location},
+ "azure.vm_sku must be a valid VM SKU for the selected location",
+ ),
+ ConfigValidator(
+ lambda config, client: not config.use_multiple_vmss()
+ or all(
+ [
+ vmss.get("sku")
+ in {s.name: s for s in config.vm_skus_for_location}
+ for vmss in config.azure_multiple_vmss_vars.get(
+ "vars_list", []
+ )
+ ]
+ ),
+ "when use_multiple_vmss == True, any VMSS with sku "
+ "must be a valid VM SKU for the selected location",
+ ),
+ # managed_disk_type in
+ # ['Standard_LRS', 'StandardSSD_LRS', Premium_LRS']
+ ConfigValidator(
+ lambda config, client: config.managed_disk_type()
+ in ["Standard_LRS", "StandardSSD_LRS", "Premium_LRS"],
+ "managed_disk_type must be "
+ "one of Standard_LRS, StandardSSD_LRS, or Premium_LRS",
+ ),
+ ConfigValidator(
+ lambda config, client: not config.use_multiple_vmss()
+ or all(
+ [
+ vmss.get("disk_sku")
+ in ["Standard_LRS", "StandardSSD_LRS", "Premium_LRS"]
+ for vmss in config.azure_multiple_vmss_vars.get(
+ "vars_list", []
+ )
+ ]
+ ),
+ "when use_multiple_vmss == True, any VMSS with disk_sku must "
+ "be one of Standard_LRS, StandardSSD_LRS or Premium_LRS",
+ ),
+ # Cannot specify Premium managed disks if VMSS SKU is / are not capable
+ ConfigValidator(
+ lambda config, client: config.use_multiple_vmss()
+ or not config.managed_disk_type() == "Premium_LRS"
+ or config.vm_sku() in config.premiumio_capable_skus(),
+ "azure.vm_sku must be Premium I/O capable VM SKU "
+ "in order to use Premium Managed Disks",
+ ),
+ ConfigValidator(
+ lambda config, client: not config.use_multiple_vmss()
+ or all(
+ [
+ vmss.get("sku") in config.premiumio_capable_skus()
+ if vmss.get("disk_sku") == "Premium_LRS"
+ else True
+ for vmss in config.azure_multiple_vmss_vars.get(
+ "vars_list", []
+ )
+ ]
+ ),
+ "when use_multiple_vmss == True, any VMSS set to use Premium "
+ "Managed Disks must use a Premium I/O capable VM SKU",
+ ),
+ # Data disk count specified cannot exceed MaxDataDisks for VM SKU
+ ConfigValidator(
+ lambda config, client: config.use_multiple_vmss()
+ or config.data_disk_count()
+ <= config.max_data_disks_for_skus().get(config.vm_sku(), 0),
+ "Number of data disks specified exceeds allowed limit for VM SKU",
+ ),
+ ConfigValidator(
+ lambda config, client: not config.use_multiple_vmss()
+ or all(
+ [
+ vmss.get("data_disk_count")
+ <= config.max_data_disks_for_skus().get(config.vm_sku(), 0)
+ for vmss in config.azure_multiple_vmss_vars.get(
+ "vars_list", []
+ )
+ ]
+ ),
+ "when use_multiple_vmss == True, no VMSS can specify number of "
+ "data disks exceeding the allowed limit for the respective VM SKU",
+ ),
+ # in the multiple VMSS case, a azure_multiple_vmss_vars.yml file
+ # must be provided
+ ConfigValidator(
+ lambda config, client: not config.use_multiple_vmss()
+ or hasattr(config, "azure_multiple_vmss_vars"),
+ "in the multiple VMSS case, an azure_multiple_vmss_vars.yml"
+ " file must be provided",
+ ),
+ # in the multiple VMSS case, each name suffix should be unique
+ ConfigValidator(
+ lambda config, client: not config.use_multiple_vmss()
+ or len(config.azure_multiple_vmss_vars.get("vars_list", []))
+ == len(
+ set(
+ [
+ v.get("name_suffix")
+ for v in config.azure_multiple_vmss_vars.get(
+ "vars_list", []
+ )
+ ]
+ )
+ ),
+ "in the multiple VMSS case, each name suffix of a VMSS"
+ " must be unique",
+ ),
+ # ADLS Gen2 is only supported if Accumulo 2.x is used
+ ConfigValidator(
+ lambda config, client: not config.use_adlsg2()
+ or config.version("accumulo").split(".")[0] == "2",
+ "ADLS Gen2 support requires Accumulo 2.x",
+ ),
+ ],
+ "launch": [
+ # Fail when HDFS HA is NOT enabled and azure_multiple_vmss_vars.yml
+ # specifies assignments for HA service roles
+ ConfigValidator(
+ lambda config, client: not config.use_multiple_vmss()
+ or config.hdfs_ha()
+ or all(
+ (
+ "journalnode" not in current_vmss["roles"]
+ and "zkfc" not in current_vmss["roles"]
+ )
+ for current_vmss in config.azure_multiple_vmss_vars[
+ "vars_list"
+ ]
+ ),
+ "HDFS HA is NOT enabled, but azure_multiple_vmss_vars.yml "
+ "specifies assignments for HA service roles",
+ ),
+ # Fail when HDFS HA is enabled and azure_multiple_vmss_vars.yml
+ # does NOT specify nodes with HA service roles
+ ConfigValidator(
+ lambda config, client: not config.use_multiple_vmss()
+ or not config.hdfs_ha()
+ or
+ # TODO implement a count based check for the below,
+ # do not just check existence of ZKFC and Journal Node roles
+ (
+ any(
+ ("journalnode" in current_vmss["roles"])
+ for current_vmss in config.azure_multiple_vmss_vars[
+ "vars_list"
+ ]
+ )
+ and any(
+ ("zkfc" in current_vmss["roles"])
+ for current_vmss in config.azure_multiple_vmss_vars[
+ "vars_list"
+ ]
+ )
+ ),
+ "HDFS HA is enabled, but azure_multiple_vmss_vars.yml does NOT"
+ " specify ZKFC and / or Journal Node service roles",
+ ),
+ ],
+ "setup": [
+ ConfigValidator(
+ vmss_exists,
+ "VMSS must exist, please run launch first before running setup",
+ ),
+ ],
+ "wipe": [
+ ConfigValidator(vmss_exists, "VMSS must exist to allow running wipe")
+ ],
+ "terminate": [
+ ConfigValidator(
+ vmss_exists, "VMSS must exist to allow running terminate"
+ )
+ ],
+}
diff --git a/lib/muchos/config/base.py b/lib/muchos/config/base.py
index 07d970e..5a50278 100644
--- a/lib/muchos/config/base.py
+++ b/lib/muchos/config/base.py
@@ -21,9 +21,11 @@ from configparser import ConfigParser
from distutils.version import StrictVersion
from os.path import isfile
from sys import exit
+from traceback import format_exc
from .decorators import (
ansible_host_var,
ansible_play_var,
+ default,
get_ansible_vars,
is_valid,
)
@@ -69,7 +71,6 @@ _HOST_VAR_DEFAULTS = {
"accumulo_version": None,
"cluster_type": None,
"cluster_group": None,
- "cluster_user": None,
"default_data_dirs": None,
"download_software": None,
"fluo_home": "'{{ install_dir }}/fluo-{{ fluo_version }}'",
@@ -83,7 +84,6 @@ _HOST_VAR_DEFAULTS = {
"hadoop_version": None,
"hadoop_major_version": "{{ hadoop_version.split('.')[0] }}",
"hdfs_root": "hdfs://{{ nameservice_id }}",
- "hdfs_ha": None,
"nameservice_id": None,
"num_tservers": 1,
"install_dir": None,
@@ -639,4 +639,45 @@ class BaseConfig(ConfigParser, metaclass=ABCMeta):
@ansible_host_var
def master_manager(self):
accumulo_version = self.get("general", "accumulo_version")
- return "manager" if accumulo_version >= '2.1.0' else "master"
+ return "manager" if accumulo_version >= "2.1.0" else "master"
+
+ @ansible_host_var(name="deploy_path")
+ def muchos_deploy_path(self):
+ return self.deploy_path
+
+ @ansible_host_var
+ def cluster_user(self):
+ return self.get("general", "cluster_user")
+
+ @ansible_host_var
+ @default(False)
+ @is_valid(is_in([True, False]))
+ def hdfs_ha(self):
+ return self.getboolean("general", "hdfs_ha")
+
+
+# ConfigValidator is a helper to wrap validation functions.
+# The failure_message is returned if validation fails, else
+# None is returned
+class ConfigValidator(object):
+ def __init__(self, validation_func, failure_message=None):
+ self.validation_func = validation_func
+ self.failure_message = failure_message
+
+ def __call__(self, *args, **kwargs):
+ try:
+ result = self.validation_func(*args, **kwargs)
+ if isinstance(result, str):
+ return (
+ result
+ if self.failure_message is None
+ else "{}: {}".format(self.failure_message, result)
+ )
+
+ if not result:
+ return self.failure_message
+ except Exception as e:
+ return "{}: unexpected exception during validation - {}".format(
+ self.failure_message, format_exc(e)
+ )
+ return None
diff --git a/lib/muchos/existing.py b/lib/muchos/existing.py
index b519742..346917a 100644
--- a/lib/muchos/existing.py
+++ b/lib/muchos/existing.py
@@ -171,6 +171,10 @@ class ExistingCluster:
file=hosts_file,
)
+ # Call a method which can be used by different cluster types to
+ # write additional specialized configs into the Ansible hosts file
+ self.add_specialized_configs(hosts_file)
+
print("\n[all:vars]", file=hosts_file)
for (name, value) in sorted(host_vars.items()):
print("{0} = {1}".format(name, value), file=hosts_file)
diff --git a/lib/tests/azure/test_config.py b/lib/tests/azure/test_config.py
index da85520..7fa1830 100644
--- a/lib/tests/azure/test_config.py
+++ b/lib/tests/azure/test_config.py
@@ -46,6 +46,7 @@ def test_azure_cluster():
assert c.get("azure", "managed_disk_type") == "Standard_LRS"
assert c.user_home() == "/home/centos"
assert c.mount_root() == "/var/data"
+ assert c.use_multiple_vmss() is False
assert c.worker_data_dirs() == ["/var/data1", "/var/data2", "/var/data3"]
assert c.default_data_dirs() == ["/var/data1", "/var/data2", "/var/data3"]
assert c.metrics_drive_ids() == ["var-data1", "var-data2", "var-data3"]
@@ -122,3 +123,5 @@ def test_azure_cluster():
("worker3", "worker"),
("worker4", "worker"),
]
+
+ # TODO: add test cases for the validations