This is an automated email from the ASF dual-hosted git repository. marat pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-karavan.git
commit 3cdf091b1c7547bed63a2dff8866d719309a14cc Author: Marat Gubaidullin <[email protected]> AuthorDate: Wed May 3 15:11:21 2023 -0400 Container creator prototype for #757 --- .../apache/camel/karavan/api/LogWatchResource.java | 3 - .../apache/camel/karavan/api/RunnerResource.java | 46 +++++++ .../karavan/informer/RunnerPodEventHandler.java | 84 ++++++++++++ .../apache/camel/karavan/service/CodeService.java | 2 +- .../camel/karavan/service/KubernetesService.java | 142 +++++++++++++++++---- .../src/main/resources/application.properties | 2 +- 6 files changed, 251 insertions(+), 28 deletions(-) diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java b/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java index ae7ef8b6..e4e5efff 100644 --- a/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java +++ b/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java @@ -35,9 +35,7 @@ import javax.ws.rs.sse.SseEventSink; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; -import java.util.Date; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; @Path("/api/logwatch") public class LogWatchResource { @@ -51,7 +49,6 @@ public class LogWatchResource { @ConfigProperty(name = "karavan.environment") String environment; - @Inject ManagedExecutor managedExecutor; diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/api/RunnerResource.java b/karavan-app/src/main/java/org/apache/camel/karavan/api/RunnerResource.java new file mode 100644 index 00000000..da001a54 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/api/RunnerResource.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.karavan.api; + +import org.apache.camel.karavan.model.Project; +import org.apache.camel.karavan.service.InfinispanService; +import org.apache.camel.karavan.service.KubernetesService; + +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("/api/runner") +public class RunnerResource { + + @Inject + KubernetesService kubernetesService; + + @Inject + InfinispanService infinispanService; + + @POST + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + public String runProject(Project project) throws Exception { + Project p = infinispanService.getProject(project.getProjectId()); + return kubernetesService.tryCreatePod(p.getProjectId()); + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/informer/RunnerPodEventHandler.java b/karavan-app/src/main/java/org/apache/camel/karavan/informer/RunnerPodEventHandler.java new file mode 100644 index 00000000..28047871 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/informer/RunnerPodEventHandler.java @@ -0,0 +1,84 @@ +package org.apache.camel.karavan.informer; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodCondition; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import org.apache.camel.karavan.model.PodStatus; +import org.apache.camel.karavan.service.InfinispanService; +import org.apache.camel.karavan.service.KubernetesService; +import org.jboss.logging.Logger; + +import java.util.Optional; + +public class RunnerPodEventHandler implements ResourceEventHandler<Pod> { + + private static final Logger LOGGER = Logger.getLogger(RunnerPodEventHandler.class.getName()); + private InfinispanService infinispanService; + private KubernetesService kubernetesService; + + public RunnerPodEventHandler(InfinispanService infinispanService, KubernetesService kubernetesService) { + this.infinispanService = infinispanService; + this.kubernetesService = kubernetesService; + } + + @Override + public void onAdd(Pod pod) { + try { + LOGGER.info("onAdd " + pod.getMetadata().getName()); + PodStatus ps = getPodStatus(pod); + infinispanService.savePodStatus(ps); + } catch (Exception e){ + LOGGER.error(e.getMessage()); + } + } + + @Override + public void onUpdate(Pod oldPod, Pod newPod) { + try { + LOGGER.info("onUpdate " + newPod.getMetadata().getName()); + PodStatus ps = getPodStatus(newPod); + infinispanService.savePodStatus(ps); + } catch (Exception e){ + LOGGER.error(e.getMessage()); + } + } + + @Override + public void onDelete(Pod pod, boolean deletedFinalStateUnknown) { + try { + LOGGER.info("onDelete " + pod.getMetadata().getName()); + String deployment = pod.getMetadata().getLabels().get("app"); + PodStatus ps = new PodStatus( + pod.getMetadata().getName(), + deployment, + kubernetesService.environment); + infinispanService.deletePodStatus(ps); + } catch (Exception e){ + LOGGER.error(e.getMessage()); + } + } + + + public PodStatus getPodStatus(Pod pod) { + String deployment = pod.getMetadata().getLabels().get("app"); + try { + Optional<PodCondition> initialized = pod.getStatus().getConditions().stream().filter(c -> c.getType().equals("Initialized")).findFirst(); + Optional<PodCondition> ready = pod.getStatus().getConditions().stream().filter(c -> c.getType().equals("Initialized")).findFirst(); + return new PodStatus( + pod.getMetadata().getName(), + pod.getStatus().getPhase(), + initialized.isEmpty() ? false : initialized.get().getStatus().equals("True"), + ready.isEmpty() ? false : ready.get().getStatus().equals("True"), + pod.getStatus().getReason(), + deployment, + kubernetesService.environment + ); + } catch (Exception ex) { + LOGGER.error(ex.getMessage()); + return new PodStatus( + pod.getMetadata().getName(), + deployment, + kubernetesService.environment); + } + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/service/CodeService.java b/karavan-app/src/main/java/org/apache/camel/karavan/service/CodeService.java index a7cdb1a2..5493f5d5 100644 --- a/karavan-app/src/main/java/org/apache/camel/karavan/service/CodeService.java +++ b/karavan-app/src/main/java/org/apache/camel/karavan/service/CodeService.java @@ -80,7 +80,7 @@ public class CodeService { try { List<ProjectFile> files = infinispanService.getProjectFiles(Project.NAME_TEMPLATES); return files.stream().filter(f -> f.getName().equalsIgnoreCase(fileName)) - .map(f -> f.getCode()).findFirst().orElse(null); + .map(ProjectFile::getCode).findFirst().orElse(null); } catch (Exception e){ LOGGER.error(e.getMessage()); } diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java b/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java index 9fc8499e..ebdb649e 100644 --- a/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java +++ b/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java @@ -29,10 +29,7 @@ import io.fabric8.tekton.client.DefaultTektonClient; import io.fabric8.tekton.pipeline.v1beta1.*; import io.quarkus.vertx.ConsumeEvent; import io.vertx.mutiny.core.eventbus.EventBus; -import org.apache.camel.karavan.informer.DeploymentEventHandler; -import org.apache.camel.karavan.informer.PipelineRunEventHandler; -import org.apache.camel.karavan.informer.PodEventHandler; -import org.apache.camel.karavan.informer.ServiceEventHandler; +import org.apache.camel.karavan.informer.*; import org.apache.camel.karavan.model.Project; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.health.HealthCheck; @@ -56,6 +53,12 @@ public class KubernetesService implements HealthCheck{ private static final Logger LOGGER = Logger.getLogger(KubernetesService.class.getName()); public static final String START_INFORMERS = "start-informers"; public static final String STOP_INFORMERS = "stop-informers"; + public static final int INFORMERS = 5; + private static final String CAMEL_PREFIX = "camel"; + private static final String KARAVAN_PREFIX = "karavan"; + private static final String RUNNER_SUFFIX = "runner"; + private static final String JBANG_CACHE_SUFFIX = "jbang-cache"; + private static final String M2_CACHE_SUFFIX = "m2-cache"; @Inject EventBus eventBus; @@ -78,36 +81,45 @@ public class KubernetesService implements HealthCheck{ return kubernetesClient().adapt(OpenShiftClient.class); } - @ConfigProperty(name = "kubernetes.namespace", defaultValue = "karavan") + @ConfigProperty(name = "kubernetes.namespace", defaultValue = KARAVAN_PREFIX) String currentNamespace; @ConfigProperty(name = "karavan.environment") public String environment; - List<SharedIndexInformer> informers = new ArrayList<>(4); + + List<SharedIndexInformer> informers = new ArrayList<>(INFORMERS); @ConsumeEvent(value = START_INFORMERS, blocking = true) void startInformers(String data) { try { stopInformers(null); LOGGER.info("Starting Kubernetes Informers"); - String runtimeLabel = getRuntimeLabel(); - SharedIndexInformer<Deployment> deploymentInformer = kubernetesClient().apps().deployments().inNamespace(getNamespace()).withLabel(runtimeLabel, "camel").inform(); + SharedIndexInformer<Deployment> deploymentInformer = kubernetesClient().apps().deployments().inNamespace(getNamespace()) + .withLabels(getRuntimeLabels()).inform(); deploymentInformer.addEventHandlerWithResyncPeriod(new DeploymentEventHandler(infinispanService, this),30 * 1000L); informers.add(deploymentInformer); - SharedIndexInformer<Service> serviceInformer = kubernetesClient().services().inNamespace(getNamespace()).withLabel(runtimeLabel, "camel").inform(); + SharedIndexInformer<Service> serviceInformer = kubernetesClient().services().inNamespace(getNamespace()) + .withLabels(getRuntimeLabels()).inform(); serviceInformer.addEventHandlerWithResyncPeriod(new ServiceEventHandler(infinispanService, this),30 * 1000L); informers.add(serviceInformer); - SharedIndexInformer<PipelineRun> pipelineRunInformer = tektonClient().v1beta1().pipelineRuns().inNamespace(getNamespace()).withLabel(runtimeLabel, "camel").inform(); + SharedIndexInformer<PipelineRun> pipelineRunInformer = tektonClient().v1beta1().pipelineRuns().inNamespace(getNamespace()) + .withLabels(getRuntimeLabels()).inform(); pipelineRunInformer.addEventHandlerWithResyncPeriod(new PipelineRunEventHandler(infinispanService, this),30 * 1000L); informers.add(pipelineRunInformer); - SharedIndexInformer<Pod> podRunInformer = kubernetesClient().pods().inNamespace(getNamespace()).withLabel(runtimeLabel, "camel").inform(); + SharedIndexInformer<Pod> podRunInformer = kubernetesClient().pods().inNamespace(getNamespace()) + .withLabels(getRuntimeLabels()).inform(); podRunInformer.addEventHandlerWithResyncPeriod(new PodEventHandler(infinispanService, this),30 * 1000L); informers.add(podRunInformer); + + SharedIndexInformer<Pod> runnerInformer = kubernetesClient().pods().inNamespace(getNamespace()).withLabels(getKaravanTypeLabel()).inform(); + runnerInformer.addEventHandlerWithResyncPeriod(new RunnerPodEventHandler(infinispanService, this),30 * 1000L); + informers.add(runnerInformer); + LOGGER.info("Started Kubernetes Informers"); } catch (Exception e) { LOGGER.error("Error starting informers: " + e.getMessage()); @@ -117,7 +129,7 @@ public class KubernetesService implements HealthCheck{ @Override public HealthCheckResponse call() { - if(informers.size() == 4) { + if(informers.size() == INFORMERS) { return HealthCheckResponse.up("All Kubernetes informers are running."); } else { @@ -140,14 +152,13 @@ public class KubernetesService implements HealthCheck{ String pipeline = getPipelineName(project); LOGGER.info("Pipeline " + pipeline + " is creating for " + project.getProjectId()); - Map<String, String> labels = Map.of( - "karavan-project-id", project.getProjectId(), - "tekton.dev/pipeline", pipeline, - getRuntimeLabel(), "camel" + Map<String, String> labels = getRuntimeLabels( + Map.of("karavan-project-id", project.getProjectId(), + "tekton.dev/pipeline", pipeline) ); ObjectMeta meta = new ObjectMetaBuilder() - .withGenerateName("karavan-" + project.getProjectId() + "-") + .withGenerateName(KARAVAN_PREFIX + "-" + project.getProjectId() + "-") .withLabels(labels) .withNamespace(getNamespace()) .build(); @@ -159,8 +170,10 @@ public class KubernetesService implements HealthCheck{ .withServiceAccountName("pipeline") .withParams(new ParamBuilder().withName("PROJECT_ID").withNewValue(project.getProjectId()).build()) .withWorkspaces( - new WorkspaceBindingBuilder().withName("karavan-m2-cache").withNewPersistentVolumeClaim("karavan-m2-cache", false).build(), - new WorkspaceBindingBuilder().withName("karavan-jbang-cache").withNewPersistentVolumeClaim("karavan-jbang-cache", false).build()) + new WorkspaceBindingBuilder().withName(KARAVAN_PREFIX + "-" + M2_CACHE_SUFFIX) + .withNewPersistentVolumeClaim(KARAVAN_PREFIX + "-" + M2_CACHE_SUFFIX, false).build(), + new WorkspaceBindingBuilder().withName(KARAVAN_PREFIX + "-" + JBANG_CACHE_SUFFIX) + .withNewPersistentVolumeClaim(KARAVAN_PREFIX + "-" + JBANG_CACHE_SUFFIX, false).build()) .build(); PipelineRunBuilder pipelineRunBuilder = new PipelineRunBuilder() @@ -292,8 +305,7 @@ public class KubernetesService implements HealthCheck{ public List<String> getCamelDeployments(String namespace) { try { - String labelName = getRuntimeLabel(); - return kubernetesClient().apps().deployments().inNamespace(namespace).withLabel(labelName, "camel").list().getItems() + return kubernetesClient().apps().deployments().inNamespace(namespace).withLabels(getRuntimeLabels()).list().getItems() .stream().map(deployment -> deployment.getMetadata().getName()).collect(Collectors.toList()); } catch (Exception ex) { LOGGER.error(ex.getMessage()); @@ -370,12 +382,96 @@ public class KubernetesService implements HealthCheck{ return result; } + public String tryCreatePod(String projectId) { + String name = projectId + "-" + RUNNER_SUFFIX; + createPVC(name + JBANG_CACHE_SUFFIX); + createPVC(name + M2_CACHE_SUFFIX); + Pod old = kubernetesClient().pods().inNamespace(getNamespace()).withName(name).get(); + if (old == null) { + createPod(name); + } + return name; + } + + private void createPod(String name) { + Pod pod = getPod(name); + Pod result = kubernetesClient().resource(pod).create(); + LOGGER.info("Created pod " + result.getMetadata().getName()); + } + + private Pod getPod(String name) { + ObjectMeta meta = new ObjectMetaBuilder() + .withName(name) + .withLabels(getKaravanTypeLabel()) + .withNamespace("karavan") + .build(); + + ContainerPort port = new ContainerPortBuilder() + .withContainerPort(8080) + .withName("http") + .withProtocol("TCP") + .build(); + + Container container = new ContainerBuilder() + .withName(name) + .withImage("entropy1/camel-karavan-runner") +// .withImage("ghcr.io/apache/camel-karavan-runner:3.20.2-snapshot") + .withPorts(port) + .withVolumeMounts(new VolumeMountBuilder().withName(name + "-" + JBANG_CACHE_SUFFIX).withMountPath("/root/.m2").build()) + .withVolumeMounts(new VolumeMountBuilder().withName(name + "-" + M2_CACHE_SUFFIX).withMountPath("/jbang/.jbang/cache").build()) + .build(); + + PodSpec spec = new PodSpecBuilder() + .withContainers(container) + .withVolumes(new VolumeBuilder().withName(name + "-" + JBANG_CACHE_SUFFIX) + .withNewPersistentVolumeClaim(name + "-" + JBANG_CACHE_SUFFIX, false).build()) + .withVolumes(new VolumeBuilder().withName(name + "-" + M2_CACHE_SUFFIX) + .withNewPersistentVolumeClaim(name + "-" + M2_CACHE_SUFFIX, false).build()) + .build(); + + return new PodBuilder() + .withMetadata(meta) + .withSpec(spec) + .build(); + } + + private void createPVC(String pvcName) { + PersistentVolumeClaim old = kubernetesClient().persistentVolumeClaims().inNamespace(getNamespace()).withName(pvcName).get(); + if (old == null) { + PersistentVolumeClaim pvc = new PersistentVolumeClaimBuilder() + .withNewMetadata() + .withName(pvcName) + .withNamespace(getNamespace()) + .withLabels(getKaravanTypeLabel()) + .endMetadata() + .withNewSpec() + .withResources(new ResourceRequirementsBuilder().withRequests(Map.of("storage", new Quantity("2Gi"))).build()) + .withVolumeMode("Filesystem") + .withAccessModes("ReadWriteOnce") + .endSpec() + .build(); + kubernetesClient().resource(pvc).create(); + } + } + public Secret getKaravanSecret() { return kubernetesClient().secrets().inNamespace(getNamespace()).withName("karavan").get(); } - public String getRuntimeLabel() { - return isOpenshift() ? "app.openshift.io/runtime" : "app.kubernetes.io/runtime"; + public Map<String, String> getRuntimeLabels() { + Map<String, String> result = new HashMap<>(); + result.put(isOpenshift() ? "app.openshift.io/runtime" : "app.kubernetes.io/runtime", CAMEL_PREFIX); + return result; + } + + public Map<String, String> getRuntimeLabels(Map<String, String> add) { + Map<String, String> map = getRuntimeLabels(); + map.putAll(add); + return map; + } + + public static Map<String, String> getKaravanTypeLabel() { + return Map.of("karavan/type" , "runner"); } public boolean isOpenshift() { diff --git a/karavan-app/src/main/resources/application.properties b/karavan-app/src/main/resources/application.properties index 077b399b..27bdae53 100644 --- a/karavan-app/src/main/resources/application.properties +++ b/karavan-app/src/main/resources/application.properties @@ -25,7 +25,6 @@ quarkus.infinispan-client.password=password # Use BASIC as a Docker for Mac workaround quarkus.infinispan-client.client-intelligence=BASIC - # Public Dev %dev.karavan.auth=public %dev.quarkus.oidc.enabled=false @@ -90,6 +89,7 @@ quarkus.container-image.builder=docker quarkus.kubernetes-client.connection-timeout=2000 quarkus.kubernetes-client.request-timeout=10000 +quarkus.kubernetes-client.devservices.enabled=false quarkus.swagger-ui.always-include=true
