This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 4caea26e68f8b0c90dbd8f79b0cc2aef0b577d40
Author: lburgazzoli <[email protected]>
AuthorDate: Fri Oct 12 16:29:59 2018 +0200

    chore(kamel): simplify integration status watcher
---
 cmd/kamel/kamel.go      |  7 +++++-
 pkg/client/cmd/run.go   | 50 +++++++++++++-----------------------
 pkg/util/util.go        | 21 ++++++++++++++++
 pkg/util/watch/watch.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 112 insertions(+), 33 deletions(-)

diff --git a/cmd/kamel/kamel.go b/cmd/kamel/kamel.go
index a89042f..502a978 100644
--- a/cmd/kamel/kamel.go
+++ b/cmd/kamel/kamel.go
@@ -31,7 +31,11 @@ import (
 func main() {
        rand.Seed(time.Now().UTC().UnixNano())
 
-       ctx := context.Background()
+       ctx, cancel := context.WithCancel(context.Background())
+
+       // Cancel ctx as soon as main returns
+       defer cancel()
+
        rootCmd, err := cmd.NewKamelCommand(ctx)
        exitOnError(err)
 
@@ -42,6 +46,7 @@ func main() {
 func exitOnError(err error) {
        if err != nil {
                fmt.Println("Error:", err)
+
                os.Exit(1)
        }
 }
diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go
index 5b6664d..a22b58f 100644
--- a/pkg/client/cmd/run.go
+++ b/pkg/client/cmd/run.go
@@ -19,6 +19,7 @@ package cmd
 
 import (
        "fmt"
+       "io"
        "io/ioutil"
        "net/http"
        "os"
@@ -34,8 +35,6 @@ import (
        "github.com/pkg/errors"
        "github.com/sirupsen/logrus"
 
-       "io"
-
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
        "github.com/apache/camel-k/pkg/util/kubernetes"
        "github.com/apache/camel-k/pkg/util/log"
@@ -165,41 +164,28 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args 
[]string) error {
 }
 
 func (o *runCmdOptions) waitForIntegrationReady(integration 
*v1alpha1.Integration) error {
-       // Block this goroutine until the integration is in a final status
-       changes, err := watch.StateChanges(o.Context, integration)
-       if err != nil {
-               return err
-       }
-
-       var lastStatusSeen *v1alpha1.IntegrationStatus
-
-watcher:
-       for {
-               select {
-               case <-o.Context.Done():
-                       return nil
-               case i, ok := <-changes:
-                       if !ok {
-                               break watcher
+       handler := func(i *v1alpha1.Integration) bool {
+               //
+               // TODO when we add health checks, we should wait until they 
are passed
+               //
+               if i.Status.Phase != "" {
+                       fmt.Println("integration \""+integration.Name+"\" in 
phase", i.Status.Phase)
+
+                       if i.Status.Phase == v1alpha1.IntegrationPhaseRunning {
+                               // TODO display some error info when available 
in the status
+                               return false
                        }
-                       lastStatusSeen = &i.Status
-                       phase := string(i.Status.Phase)
-                       if phase != "" {
-                               fmt.Println("integration 
\""+integration.Name+"\" in phase", phase)
-                               // TODO when we add health checks, we should 
wait until they are passed
-                               if i.Status.Phase == 
v1alpha1.IntegrationPhaseRunning || i.Status.Phase == 
v1alpha1.IntegrationPhaseError {
-                                       // TODO display some error info when 
available in the status
-                                       break watcher
-                               }
+
+                       if i.Status.Phase == v1alpha1.IntegrationPhaseError {
+                               fmt.Println("integration deployment failed")
+                               return false
                        }
                }
-       }
 
-       // TODO we may not be able to reach this state, since the build will be 
done without sources (until we add health checks)
-       if lastStatusSeen != nil && lastStatusSeen.Phase == 
v1alpha1.IntegrationPhaseError {
-               return errors.New("integration deployment failed")
+               return true
        }
-       return nil
+
+       return watch.HandleStateChanges(o.Context, integration, handler)
 }
 
 func (o *runCmdOptions) printLogs(integration *v1alpha1.Integration) error {
diff --git a/pkg/util/util.go b/pkg/util/util.go
index 7792210..dcfbd36 100644
--- a/pkg/util/util.go
+++ b/pkg/util/util.go
@@ -17,6 +17,12 @@ limitations under the License.
 
 package util
 
+import (
+       "os"
+       "os/signal"
+       "syscall"
+)
+
 // StringSliceContains --
 func StringSliceContains(slice []string, items []string) bool {
        for i := 0; i < len(items); i++ {
@@ -51,3 +57,18 @@ func StringSliceUniqueAdd(slice *[]string, item string) bool 
{
 
        return true
 }
+
+// WaitForSignal --
+func WaitForSignal(sig chan os.Signal, exit func(int)) {
+       signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE)
+       go func() {
+               s := <-sig
+               switch s {
+               case syscall.SIGINT, syscall.SIGTERM:
+                       exit(130) // Ctrl+c
+               case syscall.SIGPIPE:
+                       exit(0)
+               }
+               exit(1)
+       }()
+}
diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go
index 1ed9671..73d865e 100644
--- a/pkg/util/watch/watch.go
+++ b/pkg/util/watch/watch.go
@@ -47,7 +47,9 @@ func StateChanges(ctx context.Context, integration 
*v1alpha1.Integration) (<-cha
        var lastObservedState *v1alpha1.IntegrationPhase
 
        go func() {
+               defer watcher.Stop()
                defer close(out)
+
                for {
                        select {
                        case <-ctx.Done():
@@ -81,3 +83,68 @@ func StateChanges(ctx context.Context, integration 
*v1alpha1.Integration) (<-cha
 
        return out, nil
 }
+
+//
+// HandleStateChanges watches a integration resource and invoke the given 
handler when its status changes.
+//
+//     err := watch.HandleStateChanges(ctx, integration, func(i 
*v1alpha1.Integration) bool {
+//         if i.Status.Phase == v1alpha1.IntegrationPhaseRunning {
+//                         return false
+//                 }
+//
+//                 return true
+//         })
+//
+// This function blocks until the handler function returns true or either the 
events channel or the context is closed.
+//
+func HandleStateChanges(ctx context.Context, integration 
*v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) 
error {
+       resourceClient, _, err := 
k8sclient.GetResourceClient(integration.APIVersion, integration.Kind, 
integration.Namespace)
+       if err != nil {
+               return err
+       }
+       watcher, err := resourceClient.Watch(metav1.ListOptions{
+               FieldSelector: "metadata.name=" + integration.Name,
+       })
+       if err != nil {
+               return err
+       }
+
+       defer watcher.Stop()
+       events := watcher.ResultChan()
+
+       var lastObservedState *v1alpha1.IntegrationPhase
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return nil
+               case e, ok := <-events:
+                       if !ok {
+                               return nil
+                       }
+
+                       if e.Object != nil {
+                               if runtimeUnstructured, ok := 
e.Object.(runtime.Unstructured); ok {
+                                       unstr := unstructured.Unstructured{
+                                               Object: 
runtimeUnstructured.UnstructuredContent(),
+                                       }
+                                       icopy := integration.DeepCopy()
+                                       err := 
k8sutil.UnstructuredIntoRuntimeObject(&unstr, icopy)
+                                       if err != nil {
+                                               logrus.Error("Unexpected error 
detected when watching resource", err)
+                                               return nil
+                                       }
+
+                                       if lastObservedState == nil || 
*lastObservedState != icopy.Status.Phase {
+                                               lastObservedState = 
&icopy.Status.Phase
+                                               if !handler(icopy) {
+                                                       return nil
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+
+       return nil
+}

Reply via email to