This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch rcmd
in repository https://gitbox.apache.org/repos/asf/camel.git

commit eacd476fc5e0eb30288e3a6db54aec0fe3ba085a
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Oct 9 17:10:26 2024 +0200

    CAMEL-21193: camel-jbang - Add listen command
---
 .../camel/impl/console/ReceiveDevConsole.java      |  11 +-
 .../dsl/jbang/core/commands/CamelJBangMain.java    |   1 +
 .../core/commands/action/CamelListenAction.java    | 732 +++++++++++++++++++++
 .../core/commands/action/CamelReceiveAction.java   | 296 ---------
 4 files changed, 742 insertions(+), 298 deletions(-)

diff --git 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
index d1cd9b09c85..51452deeb6c 100644
--- 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
+++ 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
@@ -178,7 +178,10 @@ public class ReceiveDevConsole extends AbstractDevConsole {
     private void addMessage(Exchange exchange) {
         JsonObject json
                 = MessageHelper.dumpAsJSonObject(exchange.getMessage(), false, 
false, true, true, true, true, bodyMaxChars);
-        json.put("uuid", uuid.incrementAndGet());
+        json.put("uid", uuid.incrementAndGet());
+        json.put("endpointUri", exchange.getFromEndpoint().toString());
+        json.put("remoteEndpoint", exchange.getFromEndpoint().isRemote());
+        json.put("timestamp", exchange.getMessage().getMessageTimestamp());
 
         // ensure there is space on the queue by polling until at least single 
slot is free
         int drain = queue.size() - capacity + 1;
@@ -193,7 +196,6 @@ public class ReceiveDevConsole extends AbstractDevConsole {
     protected Endpoint findMatchingEndpoint(CamelContext camelContext, String 
endpoint) {
         // TODO: find all processors that are endpoint aware and match pattern
 
-
         Endpoint target = null;
         // is the endpoint a pattern or route id
         boolean scheme = endpoint.contains(":");
@@ -279,4 +281,9 @@ public class ReceiveDevConsole extends AbstractDevConsole {
         return root;
     }
 
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        stopConsumers();
+    }
 }
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
index 58d7b13e44f..ad13f76379d 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
@@ -123,6 +123,7 @@ public class CamelJBangMain implements Callable<Integer> {
                         .addSubcommand("reload", new CommandLine(new 
CamelReloadAction(main)))
                         .addSubcommand("send", new CommandLine(new 
CamelSendAction(main)))
                         .addSubcommand("browse", new CommandLine(new 
CamelBrowseAction(main)))
+                        .addSubcommand("listen", new CommandLine(new 
CamelListenAction(main)))
                         .addSubcommand("stub", new CommandLine(new 
CamelStubAction(main)))
                         .addSubcommand("thread-dump", new CommandLine(new 
CamelThreadDump(main)))
                         .addSubcommand("logger", new CommandLine(new 
LoggerAction(main)))
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java
new file mode 100644
index 00000000000..3539f450e4d
--- /dev/null
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.jbang.core.commands.action;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.text.SimpleDateFormat;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.regex.Pattern;
+
+import com.github.freva.asciitable.AsciiTable;
+import com.github.freva.asciitable.Column;
+import com.github.freva.asciitable.HorizontalAlign;
+import com.github.freva.asciitable.OverflowBehaviour;
+import org.apache.camel.catalog.impl.TimePatternConverter;
+import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
+import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates;
+import org.apache.camel.dsl.jbang.core.common.ProcessHelper;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.StringHelper;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.URISupport;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.camel.util.json.Jsoner;
+import org.fusesource.jansi.Ansi;
+import org.fusesource.jansi.AnsiConsole;
+import picocli.CommandLine;
+
[email protected](name = "listen",
+                     description = "Listen (receive) messages from endpoints", 
sortOptions = false)
+public class CamelListenAction extends ActionBaseCommand {
+
+    private static final int NAME_MAX_WIDTH = 25;
+    private static final int NAME_MIN_WIDTH = 10;
+
+    public static class PrefixCompletionCandidates implements Iterable<String> 
{
+
+        public PrefixCompletionCandidates() {
+        }
+
+        @Override
+        public Iterator<String> iterator() {
+            return List.of("auto", "true", "false").iterator();
+        }
+    }
+
+    public static class ActionCompletionCandidates implements Iterable<String> 
{
+
+        public ActionCompletionCandidates() {
+        }
+
+        @Override
+        public Iterator<String> iterator() {
+            return List.of("dump", "start", "stop", "status", 
"clear").iterator();
+        }
+    }
+
+    @CommandLine.Parameters(description = "Name or pid of running Camel 
integration. (default selects all)", arity = "0..1")
+    String name = "*";
+
+    @CommandLine.Option(names = { "--action" }, completionCandidates = 
CamelTraceAction.ActionCompletionCandidates.class,
+                        defaultValue = "status",
+                        description = "Action to start, stop, clear, list 
status, or dump messages")
+    String action;
+
+    @CommandLine.Option(names = { "--endpoint" },
+            description = "Endpoint to browse messages (can be uri, pattern, 
or refer to a route id)")
+    String endpoint;
+
+    @CommandLine.Option(names = { "--sort" }, completionCandidates = 
PidNameAgeCompletionCandidates.class,
+                        description = "Sort by pid, name or age for showing 
status of messages", defaultValue = "pid")
+    String sort;
+
+    @CommandLine.Option(names = { "--timestamp" }, defaultValue = "true",
+                        description = "Print timestamp.")
+    boolean timestamp = true;
+
+    @CommandLine.Option(names = { "--ago" },
+                        description = "Use ago instead of yyyy-MM-dd HH:mm:ss 
in timestamp.")
+    boolean ago;
+
+    @CommandLine.Option(names = { "--follow" }, defaultValue = "true",
+                        description = "Keep following and outputting new 
messages (use ctrl + c to exit).")
+    boolean follow = true;
+
+    @CommandLine.Option(names = { "--prefix" }, defaultValue = "auto",
+                        completionCandidates = 
CamelTraceAction.PrefixCompletionCandidates.class,
+                        description = "Print prefix with running Camel 
integration name. auto=only prefix when running multiple integrations. 
true=always prefix. false=prefix off.")
+    String prefix = "auto";
+
+    @CommandLine.Option(names = { "--tail" }, defaultValue = "-1",
+                        description = "The number of messages from the end to 
show. Use -1 to read from the beginning. Use 0 to read only new lines. Defaults 
to showing all messages from beginning.")
+    int tail = -1;
+
+    @CommandLine.Option(names = { "--since" },
+                        description = "Return messages newer than a relative 
duration like 5s, 2m, or 1h. The value is in seconds if no unit specified.")
+    String since;
+
+    @CommandLine.Option(names = { "--find" },
+                        description = "Find and highlight matching text 
(ignore case).", arity = "0..*")
+    String[] find;
+
+    @CommandLine.Option(names = { "--grep" },
+                        description = "Filter messages to only output trace 
matching text (ignore case).", arity = "0..*")
+    String[] grep;
+
+    @CommandLine.Option(names = { "--show-headers" }, defaultValue = "true",
+                        description = "Show message headers in traced 
messages")
+    boolean showHeaders = true;
+
+    @CommandLine.Option(names = { "--show-body" }, defaultValue = "true",
+                        description = "Show message body in traced messages")
+    boolean showBody = true;
+
+    @CommandLine.Option(names = { "--logging-color" }, defaultValue = "true", 
description = "Use colored logging")
+    boolean loggingColor = true;
+
+    @CommandLine.Option(names = { "--compact" }, defaultValue = "true",
+                        description = "Compact output (no empty line 
separating messages)")
+    boolean compact = true;
+
+    @CommandLine.Option(names = { "--mask" },
+                        description = "Whether to mask endpoint URIs to avoid 
printing sensitive information such as password or access keys")
+    boolean mask;
+
+    @CommandLine.Option(names = { "--pretty" },
+                        description = "Pretty print message body when using 
JSon or XML format")
+    boolean pretty;
+
+    String findAnsi;
+
+    private int nameMaxWidth;
+    private boolean prefixShown;
+
+    private MessageTableHelper tableHelper;
+
+    private final Map<String, Ansi.Color> nameColors = new HashMap<>();
+
+    public CamelListenAction(CamelJBangMain main) {
+        super(main);
+    }
+
+    @Override
+    public Integer doCall() throws Exception {
+        if ("dump".equals(action)) {
+            return doDumpCall();
+        } else if ("status".equals(action)) {
+            return doStatusCall();
+        }
+
+        List<Long> pids = findPids(name);
+        for (long pid : pids) {
+            if ("clear".equals(action)) {
+                File f = getReceiveFile("" + pid);
+                if (f.exists()) {
+                    IOHelper.writeText("{}", f);
+                }
+            } else {
+                JsonObject root = new JsonObject();
+                root.put("action", "receive");
+                if (endpoint != null) {
+                    root.put("endpoint", endpoint);
+                }
+                if ("start".equals(action)) {
+                    root.put("enabled", "true");
+                } else if ("stop".equals(action)) {
+                    root.put("enabled", "false");
+                }
+                File f = getActionFile(Long.toString(pid));
+                IOHelper.writeText(root.toJson(), f);
+            }
+        }
+
+        return 0;
+    }
+
+    protected Integer doStatusCall() {
+        List<StatusRow> rows = new ArrayList<>();
+
+        List<Long> pids = findPids(name);
+        ProcessHandle.allProcesses()
+                .filter(ph -> pids.contains(ph.pid()))
+                .forEach(ph -> {
+                    JsonObject root = loadStatus(ph.pid());
+                    if (root != null) {
+                        StatusRow row = new StatusRow();
+                        JsonObject context = (JsonObject) root.get("context");
+                        if (context == null) {
+                            return;
+                        }
+                        row.name = context.getString("name");
+                        if ("CamelJBang".equals(row.name)) {
+                            row.name = ProcessHelper.extractName(root, ph);
+                        }
+                        row.pid = Long.toString(ph.pid());
+                        row.uptime = extractSince(ph);
+                        row.age = TimeUtils.printSince(row.uptime);
+                        // TODO: receive
+                        JsonObject jo = root.getMap("trace");
+                        if (jo != null) {
+                            row.enabled = jo.getBoolean("enabled");
+                            row.counter = jo.getLong("counter");
+                            row.pattern = jo.getString("tracePattern");
+                        }
+                        rows.add(row);
+                    }
+                });
+
+        // sort rows
+        rows.sort(this::sortStatusRow);
+
+        if (!rows.isEmpty()) {
+            printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, 
Arrays.asList(
+                    new 
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
+                    new 
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, 
OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(r -> r.name),
+                    new 
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
+                    new Column().header("STATUS").with(this::getStatus),
+                    new Column().header("TOTAL").with(r -> "" + r.counter),
+                    new Column().header("PATTERN").with(r -> r.pattern))));
+        }
+
+        return 0;
+    }
+
+    private String getStatus(StatusRow r) {
+        if (r.enabled) {
+            return "Enabled";
+        }
+        return "Disabled";
+    }
+
+    protected int sortStatusRow(StatusRow o1, StatusRow o2) {
+        String s = sort;
+        int negate = 1;
+        if (s.startsWith("-")) {
+            s = s.substring(1);
+            negate = -1;
+        }
+        switch (s) {
+            case "pid":
+                return Long.compare(Long.parseLong(o1.pid), 
Long.parseLong(o2.pid)) * negate;
+            case "name":
+                return o1.name.compareToIgnoreCase(o2.name) * negate;
+            case "age":
+                return Long.compare(o1.uptime, o2.uptime) * negate;
+            default:
+                return 0;
+        }
+    }
+
+    protected Integer doDumpCall() throws Exception {
+        // setup table helper
+        tableHelper = new MessageTableHelper();
+        tableHelper.setPretty(pretty);
+        tableHelper.setLoggingColor(loggingColor);
+
+        Map<Long, Pid> pids = new LinkedHashMap<>();
+
+        // find new pids
+        updatePids(pids);
+        if (!pids.isEmpty()) {
+            // read existing trace files (skip by tail/since)
+            if (find != null) {
+                findAnsi = 
Ansi.ansi().fg(Ansi.Color.BLACK).bg(Ansi.Color.YELLOW).a("$0").reset().toString();
+                for (int i = 0; i < find.length; i++) {
+                    String f = find[i];
+                    f = Pattern.quote(f);
+                    find[i] = f;
+                }
+            }
+            if (grep != null) {
+                findAnsi = 
Ansi.ansi().fg(Ansi.Color.BLACK).bg(Ansi.Color.YELLOW).a("$0").reset().toString();
+                for (int i = 0; i < grep.length; i++) {
+                    String f = grep[i];
+                    f = Pattern.quote(f);
+                    grep[i] = f;
+                }
+            }
+            Date limit = null;
+            if (since != null) {
+                long millis;
+                if (StringHelper.isDigit(since)) {
+                    // is in seconds by default
+                    millis = TimePatternConverter.toMilliSeconds(since) * 1000;
+                } else {
+                    millis = TimePatternConverter.toMilliSeconds(since);
+                }
+                limit = new Date(System.currentTimeMillis() - millis);
+            }
+            // dump existing traces
+            if (tail != 0) {
+                tailTraceFiles(pids, tail);
+                dumpTraceFiles(pids, tail, limit);
+            }
+        }
+
+        if (follow) {
+            boolean waitMessage = true;
+            StopWatch watch = new StopWatch();
+            boolean more = true;
+            do {
+                if (pids.isEmpty()) {
+                    if (waitMessage) {
+                        printer().println("Waiting for messages ...");
+                        waitMessage = false;
+                    }
+                    Thread.sleep(500);
+                    updatePids(pids);
+                } else {
+                    waitMessage = true;
+                    if (watch.taken() > 500) {
+                        // check for new messages
+                        updatePids(pids);
+                        watch.restart();
+                    }
+                    int lines = readReceiveFiles(pids);
+                    if (lines > 0) {
+                        more = dumpTraceFiles(pids, 0, null);
+                    } else if (lines == 0) {
+                        Thread.sleep(100);
+                    } else {
+                        break;
+                    }
+                }
+            } while (more);
+        }
+
+        return 0;
+    }
+
+    private void tailTraceFiles(Map<Long, Pid> pids, int tail) throws 
Exception {
+        for (Pid pid : pids.values()) {
+            File file = getReceiveFile(pid.pid);
+            if (file.exists()) {
+                pid.reader = new LineNumberReader(new FileReader(file));
+                String line;
+                if (tail <= 0) {
+                    pid.fifo = new ArrayDeque<>();
+                } else {
+                    pid.fifo = new ArrayBlockingQueue<>(tail);
+                }
+                do {
+                    line = pid.reader.readLine();
+                    if (line != null) {
+                        while (!pid.fifo.offer(line)) {
+                            pid.fifo.poll();
+                        }
+                    }
+                } while (line != null);
+            }
+        }
+    }
+
+    private void updatePids(Map<Long, Pid> rows) {
+        List<Long> pids = findPids(name);
+        ProcessHandle.allProcesses()
+                .filter(ph -> pids.contains(ph.pid()))
+                .forEach(ph -> {
+                    JsonObject root = loadStatus(ph.pid());
+                    if (root != null) {
+                        Pid row = new Pid();
+                        row.pid = Long.toString(ph.pid());
+                        JsonObject context = (JsonObject) root.get("context");
+                        if (context == null) {
+                            return;
+                        }
+                        row.name = context.getString("name");
+                        if ("CamelJBang".equals(row.name)) {
+                            row.name = ProcessHelper.extractName(root, ph);
+                        }
+                        int len = row.name.length();
+                        if (len < NAME_MIN_WIDTH) {
+                            len = NAME_MIN_WIDTH;
+                        }
+                        if (len > NAME_MAX_WIDTH) {
+                            len = NAME_MAX_WIDTH;
+                        }
+                        if (len > nameMaxWidth) {
+                            nameMaxWidth = len;
+                        }
+                        if (!rows.containsKey(ph.pid())) {
+                            rows.put(ph.pid(), row);
+                        }
+                    }
+                });
+
+        // remove pids that are no long active from the rows
+        Set<Long> remove = new HashSet<>();
+        for (long pid : rows.keySet()) {
+            if (!pids.contains(pid)) {
+                remove.add(pid);
+            }
+        }
+        for (long pid : remove) {
+            rows.remove(pid);
+        }
+    }
+
+    private int readReceiveFiles(Map<Long, Pid> pids) throws Exception {
+        int lines = 0;
+
+        for (Pid pid : pids.values()) {
+            if (pid.reader == null) {
+                File file = getReceiveFile(pid.pid);
+                if (file.exists()) {
+                    pid.reader = new LineNumberReader(new FileReader(file));
+                    if (tail == 0) {
+                        // only read new lines so forward to end of reader
+                        long size = file.length();
+                        pid.reader.skip(size);
+                    }
+                }
+            }
+            if (pid.reader != null) {
+                String line;
+                do {
+                    try {
+                        line = pid.reader.readLine();
+                        if (line != null) {
+                            lines++;
+                            // switch fifo to be unlimited as we use it for 
new traces
+                            if (pid.fifo == null || pid.fifo instanceof 
ArrayBlockingQueue) {
+                                pid.fifo = new ArrayDeque<>();
+                            }
+                            pid.fifo.offer(line);
+                        }
+                    } catch (IOException e) {
+                        // ignore
+                        line = null;
+                    }
+                } while (line != null);
+            }
+        }
+
+        return lines;
+    }
+
+    private List<Row> parseReceiveLine(Pid pid, String line) {
+        JsonObject root = null;
+        try {
+            root = (JsonObject) Jsoner.deserialize(line);
+        } catch (Exception e) {
+            // ignore
+        }
+        if (root != null) {
+            List<Row> rows = new ArrayList<>();
+            JsonArray arr = root.getCollection("messages");
+            if (arr != null) {
+                for (Object o : arr) {
+                    Row row = new Row(pid);
+                    row.pid = pid.pid;
+                    row.name = pid.name;
+                    JsonObject jo = (JsonObject) o;
+                    row.uid = jo.getLong("uid");
+                    String uri = jo.getString("endpointUri");
+                    if (uri != null) {
+                        row.endpoint = new JsonObject();
+                        if (mask) {
+                            uri = URISupport.sanitizeUri(uri);
+                        }
+                        row.endpoint.put("endpoint", uri);
+                        row.endpoint.put("remote", 
jo.getBooleanOrDefault("remoteEndpoint", true));
+                    }
+                    JsonObject es = jo.getMap("endpointService");
+                    if (es != null) {
+                        row.endpointService = es;
+                    }
+                    Long ts = jo.getLong("timestamp");
+                    if (ts != null) {
+                        row.timestamp = ts;
+                    }
+                    row.message = jo.getMap("message");
+                    row.message.remove("exchangeId");
+                    row.message.remove("exchangePattern");
+                    row.message.remove("exchangeProperties");
+                    if (!showHeaders) {
+                        row.message.remove("headers");
+                    }
+                    if (!showBody) {
+                        row.message.remove("body");
+                    }
+                    rows.add(row);
+                }
+            }
+            return rows;
+        }
+        return null;
+    }
+
+    private boolean dumpTraceFiles(Map<Long, Pid> pids, int tail, Date limit) {
+        Set<String> names = new HashSet<>();
+        List<Row> rows = new ArrayList<>();
+        for (Pid pid : pids.values()) {
+            Queue<String> queue = pid.fifo;
+            if (queue != null) {
+                for (String l : queue) {
+                    names.add(pid.name);
+                    List<Row> parsed = parseReceiveLine(pid, l);
+                    if (parsed != null && !parsed.isEmpty()) {
+                        rows.addAll(parsed);
+                    }
+                }
+                pid.fifo.clear();
+            }
+        }
+
+        // only sort if there are multiple Camels running
+        if (names.size() > 1) {
+            // sort lines
+            final Map<String, Long> lastTimestamp = new HashMap<>();
+            rows.sort((r1, r2) -> {
+                long t1 = r1.timestamp;
+                long t2 = r2.timestamp;
+                if (t1 == 0) {
+                    t1 = lastTimestamp.get(r1.name);
+                }
+                if (t1 == 0) {
+                    t1 = lastTimestamp.get(r2.name);
+                }
+                if (t1 == 0 && t2 == 0) {
+                    return 0;
+                } else if (t1 == 0) {
+                    return -1;
+                } else if (t2 == 0) {
+                    return 1;
+                }
+                lastTimestamp.put(r1.name, t1);
+                lastTimestamp.put(r2.name, t2);
+                return Long.compare(t1, t2);
+            });
+        }
+        if (tail > 0) {
+            // cut according to tail
+            int pos = rows.size() - tail;
+            if (pos > 0) {
+                rows = rows.subList(pos, rows.size());
+            }
+        }
+
+        for (Row r : rows) {
+            printTrace(r.name, pids.size(), r, limit);
+        }
+        return true;
+    }
+
+    private boolean isValidGrep(String line) {
+        if (grep == null) {
+            return true;
+        }
+        for (String g : grep) {
+            boolean m = Pattern.compile("(?i)" + g).matcher(line).find();
+            if (m) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isValidSince(Date limit, long timestamp) {
+        if (limit == null || timestamp == 0) {
+            return true;
+        }
+        Date row = new Date(timestamp);
+        return row.compareTo(limit) >= 0;
+    }
+
+    protected void printTrace(String name, int pids, Row row, Date limit) {
+        if (!prefixShown) {
+            // compute whether to show prefix or not
+            if ("false".equals(prefix) || "auto".equals(prefix) && pids <= 1) {
+                name = null;
+            }
+        }
+        prefixShown = name != null;
+
+        String data = getDataAsTable(row);
+        boolean valid = isValidSince(limit, row.timestamp) && 
isValidGrep(data);
+        if (!valid) {
+            return;
+        }
+
+        String nameWithPrefix = null;
+        if (name != null) {
+            if (loggingColor) {
+                Ansi.Color color = nameColors.get(name);
+                if (color == null) {
+                    // grab a new color
+                    int idx = (nameColors.size() % 6) + 1;
+                    color = Ansi.Color.values()[idx];
+                    nameColors.put(name, color);
+                }
+                String n = String.format("%-" + nameMaxWidth + "s", name);
+                nameWithPrefix = Ansi.ansi().fg(color).a(n).a("| 
").reset().toString();
+            } else {
+                nameWithPrefix = String.format("%-" + nameMaxWidth + "s", 
name) + "| ";
+            }
+            printer().print(nameWithPrefix);
+        }
+        if (timestamp) {
+            String ts;
+            if (ago) {
+                ts = String.format("%12s", TimeUtils.printSince(row.timestamp) 
+ " ago");
+            } else {
+                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS");
+                ts = sdf.format(new Date(row.timestamp));
+            }
+            if (loggingColor) {
+                
AnsiConsole.out().print(Ansi.ansi().fgBrightDefault().a(Ansi.Attribute.INTENSITY_FAINT).a(ts).reset());
+            } else {
+                printer().print(ts);
+            }
+            printer().print("  ");
+        }
+        // pid
+        String p = String.format("%5.5s", row.pid);
+        if (loggingColor) {
+            AnsiConsole.out().print(Ansi.ansi().fgMagenta().a(p).reset());
+            
AnsiConsole.out().print(Ansi.ansi().fgBrightDefault().a(Ansi.Attribute.INTENSITY_FAINT).a("
 --- ").reset());
+        } else {
+            printer().print(p);
+            printer().print(" --- ");
+        }
+        printer().print(" ");
+        // uuid
+        String u = String.format("%5.5s", row.uid);
+        if (loggingColor) {
+            AnsiConsole.out().print(Ansi.ansi().fgMagenta().a(u).reset());
+        } else {
+            printer().print(u);
+        }
+        printer().print(" - ");
+        // trace message
+        String[] lines = data.split(System.lineSeparator());
+        if (lines.length > 0) {
+            printer().println();
+            for (String line : lines) {
+                if (find != null) {
+                    for (String f : find) {
+                        line = line.replaceAll("(?i)" + f, findAnsi);
+                    }
+                }
+                if (grep != null) {
+                    for (String g : grep) {
+                        line = line.replaceAll("(?i)" + g, findAnsi);
+                    }
+                }
+                if (nameWithPrefix != null) {
+                    printer().print(nameWithPrefix);
+                }
+                printer().print(" ");
+                printer().println(line);
+            }
+            if (!compact) {
+                if (nameWithPrefix != null) {
+                    printer().println(nameWithPrefix);
+                } else {
+                    // empty line
+                    printer().println();
+                }
+            }
+        }
+    }
+
+    private String getDataAsTable(Row r) {
+        return tableHelper.getDataAsTable(null, null, r.endpoint, 
r.endpointService, r.message, null);
+    }
+
+    private static class Pid {
+        String pid;
+        String name;
+        Queue<String> fifo;
+        LineNumberReader reader;
+    }
+
+    private static class Row {
+        Pid parent;
+        String pid;
+        String name;
+        long uid;
+        long timestamp;
+        JsonObject endpoint;
+        JsonObject endpointService;
+        JsonObject message;
+
+        Row(Pid parent) {
+            this.parent = parent;
+        }
+
+    }
+
+    private static class StatusRow {
+        String pid;
+        String name;
+        String age;
+        long uptime;
+        boolean enabled;
+        long counter;
+        String pattern;
+    }
+
+}
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
deleted file mode 100644
index b9f859fb4f2..00000000000
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.dsl.jbang.core.commands.action;
-
-import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
-import org.apache.camel.util.FileUtil;
-import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.StopWatch;
-import org.apache.camel.util.TimeUtils;
-import org.apache.camel.util.json.JsonObject;
-import org.apache.camel.util.json.Jsoner;
-import org.fusesource.jansi.Ansi;
-import org.fusesource.jansi.AnsiConsole;
-import picocli.CommandLine;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.LineNumberReader;
-import java.text.SimpleDateFormat;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-
[email protected](name = "receive",
-                     description = "Receive messages from endpoints", 
sortOptions = false)
-public class CamelReceiveAction extends ActionBaseCommand {
-
-    public static class ActionCompletionCandidates implements Iterable<String> 
{
-
-        public ActionCompletionCandidates() {
-        }
-
-        @Override
-        public Iterator<String> iterator() {
-            return List.of("dump", "start", "stop").iterator();
-        }
-    }
-
-    @CommandLine.Parameters(description = "Name or pid of running Camel 
integration", arity = "0..1")
-    String name = "*";
-
-    @CommandLine.Option(names = { "--action" }, completionCandidates = 
ActionCompletionCandidates.class,
-            defaultValue = "dump",
-            description = "Action to start, stop, or dump messages")
-    String action;
-
-    @CommandLine.Option(names = { "--endpoint" },
-                        description = "Endpoint where to receive the messages 
from (can be uri, pattern, or refer to a route id)")
-    String endpoint;
-
-    @CommandLine.Option(names = { "--output-file" },
-                        description = "Saves messages received to the file 
with the given name (override if exists)")
-    String outputFile;
-
-    @CommandLine.Option(names = { "--tail" }, defaultValue = "-1",
-            description = "The number of messages from the end of the receive 
to show. Use -1 to read from the beginning. Use 0 to read only new lines. 
Defaults to showing all messages from beginning.")
-    int tail = -1;
-
-    @CommandLine.Option(names = { "--show-exchange-properties" }, defaultValue 
= "false",
-                        description = "Show exchange properties in traced 
messages")
-    boolean showExchangeProperties;
-
-    @CommandLine.Option(names = { "--show-headers" }, defaultValue = "true",
-                        description = "Show message headers in traced 
messages")
-    boolean showHeaders = true;
-
-    @CommandLine.Option(names = { "--show-body" }, defaultValue = "true",
-                        description = "Show message body in traced messages")
-    boolean showBody = true;
-
-    @CommandLine.Option(names = { "--show-exception" }, defaultValue = "true",
-                        description = "Show exception and stacktrace for 
failed messages")
-    boolean showException = true;
-
-    @CommandLine.Option(names = { "--logging-color" }, defaultValue = "true", 
description = "Use colored logging")
-    boolean loggingColor = true;
-
-    @CommandLine.Option(names = { "--pretty" },
-                        description = "Pretty print message body when using 
JSon or XML format")
-    boolean pretty;
-
-    @CommandLine.Option(names = { "--follow" }, defaultValue = "true",
-            description = "Keep following and outputting new received messages 
(use ctrl + c to exit).")
-    boolean follow = true;
-
-    private volatile long pid;
-
-    private MessageTableHelper tableHelper;
-
-    private static class Pid {
-        String pid;
-        String name;
-        Queue<String> fifo;
-        int depth;
-        LineNumberReader reader;
-    }
-
-    public CamelReceiveAction(CamelJBangMain main) {
-        super(main);
-    }
-
-    @Override
-    public Integer doCall() throws Exception {
-        if ("dump".equals(action)) {
-            return doDumpCall();
-        } else {
-            return doStartStopCall(action);
-        }
-    }
-
-    protected Integer doStartStopCall(String action) throws Exception {
-        // ensure output file is deleted before executing action
-        File outputFile = getOutputFile(Long.toString(pid));
-        FileUtil.deleteFile(outputFile);
-
-        JsonObject root = new JsonObject();
-        root.put("action", "receive");
-        root.put("endpoint", endpoint);
-        root.put("enabled", "start".equals(action) ? "true" : "false");
-
-        File f = getActionFile(Long.toString(pid));
-        try {
-            IOHelper.writeText(root.toJson(), f);
-        } catch (Exception e) {
-            // ignore
-        }
-        waitForOutputFile(outputFile);
-
-        // delete output file after use
-        FileUtil.deleteFile(outputFile);
-
-        return 0;
-    }
-
-    protected Integer doDumpCall() throws Exception {
-        List<Long> pids = findPids(name);
-        if (pids.isEmpty()) {
-            return 0;
-        } else if (pids.size() > 1) {
-            printer().println("Name or pid " + name + " matches " + pids.size()
-                              + " running Camel integrations. Specify a name 
or PID that matches exactly one.");
-            return 0;
-        }
-
-        this.pid = pids.get(0);
-
-        boolean waitMessage = true;
-        StopWatch watch = new StopWatch();
-        boolean more = true;
-        Pid pid = new Pid();
-        pid.pid = "" + this.pid;
-
-
-        if (tail != 0) {
-            tailTraceFiles(pids, tail);
-        } else {
-
-        }
-
-        do {
-            waitMessage = true;
-            int lines = readReceiveFiles(pid);
-            if (lines > 0) {
-                more = dumpReceivedFiles(pids, 0, null);
-            } else if (lines == 0) {
-                Thread.sleep(100);
-            } else {
-                break;
-            }
-        } while (follow || more);
-
-        return 0;
-    }
-
-    private int readReceiveFiles(Pid pid) throws Exception {
-        int lines = 0;
-        if (pid.reader == null) {
-            File file = getReceiveFile(pid.pid);
-            if (file.exists()) {
-                pid.reader = new LineNumberReader(new FileReader(file));
-                if (tail == 0) {
-                    // only read new lines so forward to end of reader
-                    long size = file.length();
-                    pid.reader.skip(size);
-                }
-            }
-        }
-        if (pid.reader != null) {
-            String line;
-            do {
-                try {
-                    line = pid.reader.readLine();
-                    if (line != null) {
-                        lines++;
-                        // switch fifo to be unlimited as we use it for new 
traces
-                        if (pid.fifo == null || pid.fifo instanceof 
ArrayBlockingQueue) {
-                            pid.fifo = new ArrayDeque<>();
-                        }
-                        pid.fifo.offer(line);
-                    }
-                } catch (IOException e) {
-                    // ignore
-                    line = null;
-                }
-            } while (line != null);
-        }
-
-        return lines;
-    }
-
-    private boolean dumpReceivedFiles(Pid pid) {
-        List<Row> rows = new ArrayList<>();
-
-        Queue<String> queue = pid.fifo;
-
-
-
-
-    }
-
-    private void tailTraceFiles(Map<Long, Pid> pids, int tail) throws 
Exception {
-        for (Pid pid : pids.values()) {
-            File file = getReceiveFile(pid.pid);
-            if (file.exists()) {
-                pid.reader = new LineNumberReader(new FileReader(file));
-                String line;
-                if (tail <= 0) {
-                    pid.fifo = new ArrayDeque<>();
-                } else {
-                    pid.fifo = new ArrayBlockingQueue<>(tail);
-                }
-                do {
-                    line = pid.reader.readLine();
-                    if (line != null) {
-                        while (!pid.fifo.offer(line)) {
-                            pid.fifo.poll();
-                        }
-                    }
-                } while (line != null);
-            }
-        }
-    }
-
-    protected JsonObject waitForOutputFile(File outputFile) {
-        StopWatch watch = new StopWatch();
-        while (watch.taken() < 5000) {
-            try {
-                // give time for response to be ready
-                Thread.sleep(20);
-
-                if (outputFile.exists()) {
-                    FileInputStream fis = new FileInputStream(outputFile);
-                    String text = IOHelper.loadText(fis);
-                    IOHelper.close(fis);
-                    return (JsonObject) Jsoner.deserialize(text);
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                // ignore
-            }
-        }
-        return null;
-    }
-
-    private static class Row {
-        String pid;
-        long uid;
-        JsonObject message;
-    }
-
-}


Reply via email to