[ https://issues.apache.org/jira/browse/FLINK-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15899862#comment-15899862 ]
ASF GitHub Bot commented on FLINK-5975: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104735256 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -262,9 +263,71 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (String s : Arrays.asList(specs)) { + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + List<String> parts = Arrays.asList(s.split(":")); + switch(parts.size()) { + case 1: + vol.setContainerPath(parts.get(0)); + break; + case 2: + String modeOrPath = parts.get(1).toLowerCase().trim(); + if (modeOrPath.equals("rw")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RW); + } else if (modeOrPath.equals("ro")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RO); + } else { + vol.setHostPath(parts.get(0)).setContainerPath(modeOrPath); + } + break; + case 3: + String mode = parts.get(2).toLowerCase().trim(); + if (!mode.equals("rw") && !mode.equals("ro")) { + throw new IllegalArgumentException("invalid mode in volume"); + } + if (mode.equals("rw")) { + vol.setMode(Protos.Volume.Mode.RW); + } else { + vol.setMode(Protos.Volume.Mode.RO); + } + vol.setHostPath(parts.get(0)).setContainerPath(parts.get(1)); + break; + default: + throw new IllegalArgumentException("volume specification is invalid"); + + } + --- End diff -- remove empty line > Mesos should support adding volumes to launched taskManagers > ------------------------------------------------------------ > > Key: FLINK-5975 > URL: https://issues.apache.org/jira/browse/FLINK-5975 > Project: Flink > Issue Type: Improvement > Components: Mesos > Affects Versions: 1.2.0, 1.3.0 > Reporter: Addison Higham > Priority: Minor > > Flink needs access to shared storage. > In many cases, this is HDFS, but it would be nice to also support file URIs > on an mounted NFS for example. > Mesos exposes APIs for adding volumes, so it should be relatively simply to > add this. > As an example, here is the spark code for supporting volumes: > https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala#L35 > -- This message was sent by Atlassian JIRA (v6.3.15#6346)