This is an automated email from the ASF dual-hosted git repository.
fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 21f1d5e2a1ce Camel IEC60870 Producer improvements
21f1d5e2a1ce is described below
commit 21f1d5e2a1ce4fc4e7b6e150061c4116f4e56811
Author: Croway <[email protected]>
AuthorDate: Fri Dec 19 10:51:53 2025 +0100
Camel IEC60870 Producer improvements
---
.../camel/catalog/components/iec60870-client.json | 13 +-
.../component/iec60870/client/iec60870-client.json | 13 +-
.../src/main/docs/iec60870-client-component.adoc | 93 +++++++++
.../apache/camel/component/iec60870/Constants.java | 50 +++++
.../iec60870/client/ClientConnection.java | 185 ++++++++++++++++-
.../component/iec60870/client/ClientConsumer.java | 14 ++
.../component/iec60870/client/ClientProducer.java | 88 +++++++-
.../camel/component/iec60870/ConnectionTest.java | 229 ++++++++++++++++++++-
.../endpoint/dsl/ClientEndpointBuilderFactory.java | 136 ++++++++++++
9 files changed, 808 insertions(+), 13 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/iec60870-client.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/iec60870-client.json
index e38272df854e..02172af82631 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/iec60870-client.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/iec60870-client.json
@@ -33,7 +33,18 @@
"CamelIec60870Value": { "index": 0, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Object", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The value", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_VALUE" },
"CamelIec60870Timestamp": { "index": 1, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The timestamp of the value", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_TIMESTAMP" },
"CamelIec60870Quality": { "index": 2, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"org.eclipse.neoscada.protocol.iec60870.asdu.types.QualityInformation",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The quality information of the value", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QUALITY" },
- "CamelIec60870Overflow": { "index": 3, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"boolean", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Is overflow", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_OVERFLOW" }
+ "CamelIec60870Overflow": { "index": 3, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"boolean", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Is overflow", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_OVERFLOW" },
+ "CamelIec60870ConnectionState": { "index": 4, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType":
"org.eclipse.neoscada.protocol.iec60870.client.AutoConnectClient.State",
"enum": [ "SLEEPING", "DISCONNECTED", "LOOKUP", "CONNECTING", "CONNECTED" ],
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The connection state (CONNECTED, DISCONNECTED, etc.)",
"constantName": "org.apache.ca [...]
+ "CamelIec60870ConnectionError": { "index": 5, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Throwable", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The connection state error
if any", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_CONNECTION_ERROR" },
+ "CamelIec60870ConnectionUptime": { "index": 6, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "long", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Connection uptime in milliseconds since
last connected", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_CONNECTION_UPTIME" },
+ "CamelIec60870CommandType": { "index": 7, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The command type: 'value' (default),
'interrogation', 'read', or 'status'", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_COMMAND_TYPE" },
+ "CamelIec60870AsduAddress": { "index": 8, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"org.eclipse.neoscada.protocol.iec60870.asdu.types.ASDUAddress", "deprecated":
false, "deprecationNote": "", "autowired": false, "secret": false,
"description": "The ASDU address for interrogation (optional, defaults to
broadcast)", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_ASDU_ADDRESS" },
+ "CamelIec60870Qoi": { "index": 9, "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"short", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The qualifier of interrogation: 20 (global) or
21-36 (groups 1-16)", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QOI" },
+ "CamelIec60870QualityBlocked": { "index": 10, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Quality flag: Blocked (BL)",
"constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QUALITY_BLOCKED" },
+ "CamelIec60870QualitySubstituted": { "index": 11, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Quality flag: Substituted (SB)",
"constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QUALITY_SUBSTITUTED" },
+ "CamelIec60870QualityNotTopical": { "index": 12, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Quality flag: Not topical (NT)",
"constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QUALITY_NOT_TOPICAL" },
+ "CamelIec60870QualityValid": { "index": 13, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Quality flag: Invalid (IV)",
"constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QUALITY_VALID" },
+ "CamelIec60870CauseOfTransmission": { "index": 14, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType":
"org.eclipse.neoscada.protocol.iec60870.asdu.types.CauseOfTransmission",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The cause of transmission", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_CAUSE_OF_TRANSMISSION" }
},
"properties": {
"uriPath": { "index": 0, "kind": "path", "displayName": "Uri Path",
"group": "common", "label": "", "required": true, "type": "object", "javaType":
"org.apache.camel.component.iec60870.ObjectAddress", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description": "The
object information address" },
diff --git
a/components/camel-iec60870/src/generated/resources/META-INF/org/apache/camel/component/iec60870/client/iec60870-client.json
b/components/camel-iec60870/src/generated/resources/META-INF/org/apache/camel/component/iec60870/client/iec60870-client.json
index e38272df854e..02172af82631 100644
---
a/components/camel-iec60870/src/generated/resources/META-INF/org/apache/camel/component/iec60870/client/iec60870-client.json
+++
b/components/camel-iec60870/src/generated/resources/META-INF/org/apache/camel/component/iec60870/client/iec60870-client.json
@@ -33,7 +33,18 @@
"CamelIec60870Value": { "index": 0, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Object", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The value", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_VALUE" },
"CamelIec60870Timestamp": { "index": 1, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"long", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The timestamp of the value", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_TIMESTAMP" },
"CamelIec60870Quality": { "index": 2, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"org.eclipse.neoscada.protocol.iec60870.asdu.types.QualityInformation",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The quality information of the value", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QUALITY" },
- "CamelIec60870Overflow": { "index": 3, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"boolean", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Is overflow", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_OVERFLOW" }
+ "CamelIec60870Overflow": { "index": 3, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"boolean", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Is overflow", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_OVERFLOW" },
+ "CamelIec60870ConnectionState": { "index": 4, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType":
"org.eclipse.neoscada.protocol.iec60870.client.AutoConnectClient.State",
"enum": [ "SLEEPING", "DISCONNECTED", "LOOKUP", "CONNECTING", "CONNECTED" ],
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The connection state (CONNECTED, DISCONNECTED, etc.)",
"constantName": "org.apache.ca [...]
+ "CamelIec60870ConnectionError": { "index": 5, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Throwable", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The connection state error
if any", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_CONNECTION_ERROR" },
+ "CamelIec60870ConnectionUptime": { "index": 6, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "long", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Connection uptime in milliseconds since
last connected", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_CONNECTION_UPTIME" },
+ "CamelIec60870CommandType": { "index": 7, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The command type: 'value' (default),
'interrogation', 'read', or 'status'", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_COMMAND_TYPE" },
+ "CamelIec60870AsduAddress": { "index": 8, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"org.eclipse.neoscada.protocol.iec60870.asdu.types.ASDUAddress", "deprecated":
false, "deprecationNote": "", "autowired": false, "secret": false,
"description": "The ASDU address for interrogation (optional, defaults to
broadcast)", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_ASDU_ADDRESS" },
+ "CamelIec60870Qoi": { "index": 9, "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"short", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The qualifier of interrogation: 20 (global) or
21-36 (groups 1-16)", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QOI" },
+ "CamelIec60870QualityBlocked": { "index": 10, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Quality flag: Blocked (BL)",
"constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QUALITY_BLOCKED" },
+ "CamelIec60870QualitySubstituted": { "index": 11, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Quality flag: Substituted (SB)",
"constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QUALITY_SUBSTITUTED" },
+ "CamelIec60870QualityNotTopical": { "index": 12, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Quality flag: Not topical (NT)",
"constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QUALITY_NOT_TOPICAL" },
+ "CamelIec60870QualityValid": { "index": 13, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Quality flag: Invalid (IV)",
"constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_QUALITY_VALID" },
+ "CamelIec60870CauseOfTransmission": { "index": 14, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType":
"org.eclipse.neoscada.protocol.iec60870.asdu.types.CauseOfTransmission",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The cause of transmission", "constantName":
"org.apache.camel.component.iec60870.Constants#IEC60870_CAUSE_OF_TRANSMISSION" }
},
"properties": {
"uriPath": { "index": 0, "kind": "path", "displayName": "Uri Path",
"group": "common", "label": "", "required": true, "type": "object", "javaType":
"org.apache.camel.component.iec60870.ObjectAddress", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description": "The
object information address" },
diff --git
a/components/camel-iec60870/src/main/docs/iec60870-client-component.adoc
b/components/camel-iec60870/src/main/docs/iec60870-client-component.adoc
index 527b39067e22..994784749c8c 100644
--- a/components/camel-iec60870/src/main/docs/iec60870-client-component.adoc
+++ b/components/camel-iec60870/src/main/docs/iec60870-client-component.adoc
@@ -64,4 +64,97 @@ include::partial$component-endpoint-options.adoc[]
include::partial$component-endpoint-headers.adoc[]
// component options: END
+== Producer Command Types
+
+The producer supports different command types via the
`CamelIec60870CommandType` header:
+
+[cols="1,3"]
+|===
+|Command Type |Description
+
+|`value`
+|Send value command (default). Body type determines command: Boolean→Single,
Integer→Scaled, Float→Float setpoint.
+
+|`interrogation`
+|Trigger interrogation (C_IC_NA_1). Use `CamelIec60870Qoi` header for group
interrogation (21-36).
+
+|`read`
+|Read single data point (C_RD_NA_1).
+
+|`status`
+|Get connection state only. No protocol command sent.
+|===
+
+=== Getting Connection Status (Producer)
+
+[source,java]
+----
+from("direct:status")
+ .setHeader("CamelIec60870CommandType", constant("status"))
+ .to("iec60870-client:localhost:2404/00-01-00-00-01");
+
+// Usage
+Exchange result = producerTemplate.send("direct:status", e -> {});
+State state = result.getMessage().getHeader("CamelIec60870ConnectionState",
State.class);
+Long uptime = result.getMessage().getHeader("CamelIec60870ConnectionUptime",
Long.class);
+----
+
+=== Triggering Interrogation
+
+[source,java]
+----
+from("direct:interrogate")
+ .setHeader("CamelIec60870CommandType", constant("interrogation"))
+ .to("iec60870-client:localhost:2404/00-01-00-00-01");
+
+from("direct:interrogateGroup")
+ .setHeader("CamelIec60870CommandType", constant("interrogation"))
+ .setHeader("CamelIec60870Qoi", constant((short) 21))
+ .to("iec60870-client:localhost:2404/00-01-00-00-01");
+----
+
+=== Sending Value Commands
+
+[source,java]
+----
+from("direct:bool").setBody(constant(true))
+ .to("iec60870-client:localhost:2404/00-01-00-00-01");
+
+from("direct:float").setBody(constant(42.5f))
+ .to("iec60870-client:localhost:2404/00-01-00-00-01");
+----
+
+== Consumer Examples
+
+Each message received by the consumer includes connection state and quality
headers.
+
+=== Getting Connection Status (Consumer)
+
+[source,java]
+----
+from("iec60870-client:localhost:2404/00-01-00-00-01")
+ .process(exchange -> {
+ // Connection state is included in every message
+ State state =
exchange.getIn().getHeader("CamelIec60870ConnectionState", State.class);
+ Long uptime =
exchange.getIn().getHeader("CamelIec60870ConnectionUptime", Long.class);
+
+ // Get the value and quality
+ Object value = exchange.getIn().getHeader("CamelIec60870Value");
+ Boolean valid =
exchange.getIn().getHeader("CamelIec60870QualityValid", Boolean.class);
+
+ log.info("State: {}, Uptime: {} ms, Value: {}, Valid: {}", state,
uptime, value, valid);
+ })
+ .to("log:values");
+----
+
+=== Filtering by Quality
+
+[source,java]
+----
+from("iec60870-client:localhost:2404/00-01-00-00-01")
+ .filter(header("CamelIec60870QualityValid").isEqualTo(true))
+ .log("Good value: ${header.CamelIec60870Value}")
+ .to("seda:process");
+----
+
include::spring-boot:partial$starter.adoc[]
diff --git
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
index 63ce3fa1c9a3..307f4ca20ce0 100644
---
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
+++
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
@@ -54,4 +54,54 @@ public interface Constants {
String TYPE = "type";
@Metadata(label = "consumer", description = "Is execute", javaType =
"boolean", applicableFor = SCHEME_SERVER)
String EXECUTE = "execute";
+
+ // Connection state headers
+ @Metadata(label = "consumer", description = "The connection state
(CONNECTED, DISCONNECTED, etc.)",
+ javaType =
"org.eclipse.neoscada.protocol.iec60870.client.AutoConnectClient.State",
applicableFor = SCHEME_CLIENT)
+ String IEC60870_CONNECTION_STATE = "CamelIec60870ConnectionState";
+ @Metadata(label = "consumer", description = "The connection state error if
any",
+ javaType = "Throwable", applicableFor = SCHEME_CLIENT)
+ String IEC60870_CONNECTION_ERROR = "CamelIec60870ConnectionError";
+ @Metadata(label = "consumer", description = "Connection uptime in
milliseconds since last connected",
+ javaType = "long", applicableFor = SCHEME_CLIENT)
+ String IEC60870_CONNECTION_UPTIME = "CamelIec60870ConnectionUptime";
+
+ // Producer command types
+ @Metadata(label = "producer",
+ description = "The command type: 'value' (default),
'interrogation', 'read', or 'status'",
+ javaType = "String", applicableFor = SCHEME_CLIENT)
+ String IEC60870_COMMAND_TYPE = "CamelIec60870CommandType";
+
+ // Command type values
+ String COMMAND_TYPE_VALUE = "value";
+ String COMMAND_TYPE_INTERROGATION = "interrogation";
+ String COMMAND_TYPE_READ = "read";
+ String COMMAND_TYPE_STATUS = "status";
+
+ // Interrogation headers
+ @Metadata(label = "producer", description = "The ASDU address for
interrogation (optional, defaults to broadcast)",
+ javaType =
"org.eclipse.neoscada.protocol.iec60870.asdu.types.ASDUAddress", applicableFor
= SCHEME_CLIENT)
+ String IEC60870_ASDU_ADDRESS = "CamelIec60870AsduAddress";
+ @Metadata(label = "producer", description = "The qualifier of
interrogation: 20 (global) or 21-36 (groups 1-16)",
+ javaType = "short", applicableFor = SCHEME_CLIENT)
+ String IEC60870_QOI = "CamelIec60870Qoi";
+
+ // Individual quality flag headers
+ @Metadata(label = "consumer", description = "Quality flag: Blocked (BL)",
javaType = "boolean",
+ applicableFor = SCHEME_CLIENT)
+ String IEC60870_QUALITY_BLOCKED = "CamelIec60870QualityBlocked";
+ @Metadata(label = "consumer", description = "Quality flag: Substituted
(SB)", javaType = "boolean",
+ applicableFor = SCHEME_CLIENT)
+ String IEC60870_QUALITY_SUBSTITUTED = "CamelIec60870QualitySubstituted";
+ @Metadata(label = "consumer", description = "Quality flag: Not topical
(NT)", javaType = "boolean",
+ applicableFor = SCHEME_CLIENT)
+ String IEC60870_QUALITY_NOT_TOPICAL = "CamelIec60870QualityNotTopical";
+ @Metadata(label = "consumer", description = "Quality flag: Invalid (IV)",
javaType = "boolean",
+ applicableFor = SCHEME_CLIENT)
+ String IEC60870_QUALITY_VALID = "CamelIec60870QualityValid";
+
+ // Cause of transmission header
+ @Metadata(label = "consumer", description = "The cause of transmission",
+ javaType =
"org.eclipse.neoscada.protocol.iec60870.asdu.types.CauseOfTransmission",
applicableFor = SCHEME_CLIENT)
+ String IEC60870_CAUSE_OF_TRANSMISSION = "CamelIec60870CauseOfTransmission";
}
diff --git
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConnection.java
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConnection.java
index 09bb01c5d601..110f7278f306 100644
---
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConnection.java
+++
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConnection.java
@@ -18,18 +18,26 @@ package org.apache.camel.component.iec60870.client;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import io.netty.channel.ChannelHandlerContext;
import org.apache.camel.component.iec60870.DiscardAckModule;
import org.apache.camel.component.iec60870.ObjectAddress;
+import org.eclipse.neoscada.protocol.iec60870.asdu.ASDUHeader;
+import org.eclipse.neoscada.protocol.iec60870.asdu.message.ReadCommand;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.ASDUAddress;
+import org.eclipse.neoscada.protocol.iec60870.asdu.types.CauseOfTransmission;
import
org.eclipse.neoscada.protocol.iec60870.asdu.types.InformationObjectAddress;
import
org.eclipse.neoscada.protocol.iec60870.asdu.types.QualifierOfInterrogation;
+import org.eclipse.neoscada.protocol.iec60870.asdu.types.StandardCause;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.Value;
import org.eclipse.neoscada.protocol.iec60870.client.AutoConnectClient;
import
org.eclipse.neoscada.protocol.iec60870.client.AutoConnectClient.ModulesFactory;
@@ -39,14 +47,41 @@ import
org.eclipse.neoscada.protocol.iec60870.client.data.AbstractDataProcessor;
import org.eclipse.neoscada.protocol.iec60870.client.data.DataHandler;
import org.eclipse.neoscada.protocol.iec60870.client.data.DataModule;
import org.eclipse.neoscada.protocol.iec60870.client.data.DataModuleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClientConnection {
+ private static final Logger LOG =
LoggerFactory.getLogger(ClientConnection.class);
+
+ /**
+ * Listener for value updates from the IEC 60870 server.
+ */
@FunctionalInterface
public interface ValueListener {
void update(ObjectAddress address, Value<?> value);
}
+ /**
+ * Listener for connection state changes.
+ */
+ @FunctionalInterface
+ public interface ConnectionStateListener {
+ /**
+ * Called when the connection state changes.
+ *
+ * @param state the new connection state
+ * @param error an optional error if the state change was due to an
error, or null otherwise
+ */
+ void stateChanged(State state, Throwable error);
+ }
+
+ // Connection state tracking
+ private final AtomicReference<State> currentState = new
AtomicReference<>(State.DISCONNECTED);
+ private final AtomicLong connectedTimestamp = new AtomicLong(0);
+ private final AtomicReference<DataModuleContext> dataModuleContextRef =
new AtomicReference<>();
+ private final List<ConnectionStateListener> connectionStateListeners = new
CopyOnWriteArrayList<>();
+
private final DataHandler dataHandler = new AbstractDataProcessor() {
/**
@@ -54,6 +89,8 @@ public class ClientConnection {
*/
@Override
public void activated(final DataModuleContext dataModuleContext, final
ChannelHandlerContext ctx) {
+ // Store the context for later use (interrogation, read commands)
+ dataModuleContextRef.set(dataModuleContext);
dataModuleContext.requestStartData();
dataModuleContext.startInterrogation(ASDUAddress.BROADCAST,
QualifierOfInterrogation.GLOBAL);
}
@@ -70,6 +107,8 @@ public class ClientConnection {
*/
@Override
public void disconnected() {
+ // Clear the context reference as it's no longer valid
+ dataModuleContextRef.set(null);
}
@Override
@@ -98,10 +137,21 @@ public class ClientConnection {
final DataModule dataModule = new DataModule(this.dataHandler,
this.options.getDataModuleOptions());
final ModulesFactory factory = () -> Arrays.asList(dataModule, new
DiscardAckModule());
final CountDownLatch latch = new CountDownLatch(1);
- StateListener stateListener = (final State state, final Throwable e)
-> {
- if (state == State.CONNECTED) {
+
+ StateListener stateListener = (final State state, final Throwable
error) -> {
+ State previousState = currentState.getAndSet(state);
+
+ // Track connection time
+ if (state == State.CONNECTED && previousState != State.CONNECTED) {
+ connectedTimestamp.set(System.currentTimeMillis());
latch.countDown();
+ } else if (state != State.CONNECTED && previousState ==
State.CONNECTED) {
+ // Reset connected timestamp when disconnected
+ connectedTimestamp.set(0);
}
+
+ // Notify all registered connection state listeners
+ notifyConnectionStateListeners(state, error);
};
this.client
@@ -113,6 +163,16 @@ public class ClientConnection {
}
}
+ private void notifyConnectionStateListeners(State state, Throwable error) {
+ for (ConnectionStateListener listener : connectionStateListeners) {
+ try {
+ listener.stateChanged(state, error);
+ } catch (Exception e) {
+ LOG.warn("Error notifying connection state listener", e);
+ }
+ }
+ }
+
public void stop() {
this.client.close();
}
@@ -150,4 +210,125 @@ public class ClientConnection {
public boolean executeCommand(final Object command) {
return this.client.writeCommand(command);
}
+
+ /**
+ * Adds a connection state listener to receive notifications about
connection state changes.
+ *
+ * @param listener the listener to add
+ */
+ public void addConnectionStateListener(ConnectionStateListener listener) {
+ if (listener != null) {
+ connectionStateListeners.add(listener);
+ // Notify immediately with current state
+ listener.stateChanged(currentState.get(), null);
+ }
+ }
+
+ /**
+ * Removes a connection state listener.
+ *
+ * @param listener the listener to remove
+ */
+ public void removeConnectionStateListener(ConnectionStateListener
listener) {
+ if (listener != null) {
+ connectionStateListeners.remove(listener);
+ }
+ }
+
+ /**
+ * Gets the current connection state.
+ *
+ * @return the current connection state
+ */
+ public State getConnectionState() {
+ return currentState.get();
+ }
+
+ /**
+ * Checks if the client is currently connected.
+ *
+ * @return true if connected, false otherwise
+ */
+ public boolean isConnected() {
+ return currentState.get() == State.CONNECTED;
+ }
+
+ /**
+ * Gets the connection uptime in milliseconds since the client was last
connected. Returns 0 if not currently
+ * connected.
+ *
+ * @return the uptime in milliseconds, or 0 if not connected
+ */
+ public long getConnectionUptime() {
+ long timestamp = connectedTimestamp.get();
+ if (timestamp == 0 || currentState.get() != State.CONNECTED) {
+ return 0;
+ }
+ return System.currentTimeMillis() - timestamp;
+ }
+
+ /**
+ * Starts a global interrogation command (C_IC_NA_1) to the broadcast
address. This requests all data points from
+ * the server.
+ *
+ * @return true if the command was sent successfully, false otherwise
+ */
+ public boolean startInterrogation() {
+ return startInterrogation(ASDUAddress.BROADCAST,
QualifierOfInterrogation.GLOBAL);
+ }
+
+ public boolean startInterrogation(ASDUAddress asduAddress) {
+ return startInterrogation(asduAddress,
QualifierOfInterrogation.GLOBAL);
+ }
+
+ public boolean startInterrogation(ASDUAddress asduAddress, short qoi) {
+ DataModuleContext context = dataModuleContextRef.get();
+ if (context == null) {
+ LOG.warn("Cannot start interrogation: not connected or data module
not initialized");
+ return false;
+ }
+ try {
+ context.startInterrogation(asduAddress, qoi);
+ LOG.debug("Started interrogation for ASDU address {} with QOI {}",
asduAddress, qoi);
+ return true;
+ } catch (Exception e) {
+ LOG.error("Failed to start interrogation", e);
+ return false;
+ }
+ }
+
+ public boolean startGroupInterrogation(ASDUAddress asduAddress, int group)
{
+ if (group < 1 || group > 16) {
+ throw new IllegalArgumentException("Group must be between 1 and
16, was: " + group);
+ }
+ // Group 1 = QOI 21, Group 2 = QOI 22, etc.
+ short qoiValue = (short) (20 + group);
+ return startInterrogation(asduAddress, qoiValue);
+ }
+
+ public boolean readValue(ASDUAddress asduAddress, InformationObjectAddress
ioa) {
+ DataModuleContext context = dataModuleContextRef.get();
+ if (context == null) {
+ LOG.warn("Cannot send read command: not connected or data module
not initialized");
+ return false;
+ }
+ try {
+ ASDUHeader header = new ASDUHeader(
+ new CauseOfTransmission(StandardCause.REQUEST),
+ asduAddress);
+ ReadCommand readCommand = new ReadCommand(header, ioa);
+
+ client.writeCommand(readCommand);
+
+ LOG.debug("Sent read command for ASDU address {} IOA {}",
asduAddress, ioa);
+ return true;
+ } catch (Exception e) {
+ LOG.error("Failed to send read command", e);
+ return false;
+ }
+ }
+
+ public boolean readValue(ObjectAddress address) {
+ return readValue(address.getASDUAddress(),
address.getInformationObjectAddress());
+ }
}
diff --git
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
index c1bf5e32ba2e..46368e91f477 100644
---
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
+++
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
@@ -22,6 +22,7 @@ import org.apache.camel.Processor;
import org.apache.camel.component.iec60870.Constants;
import org.apache.camel.component.iec60870.ObjectAddress;
import org.apache.camel.support.DefaultConsumer;
+import org.eclipse.neoscada.protocol.iec60870.asdu.types.QualityInformation;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.Value;
public class ClientConsumer extends DefaultConsumer {
@@ -65,5 +66,18 @@ public class ClientConsumer extends DefaultConsumer {
message.setHeader(Constants.IEC60870_TIMESTAMP, value.getTimestamp());
message.setHeader(Constants.IEC60870_QUALITY,
value.getQualityInformation());
message.setHeader(Constants.IEC60870_OVERFLOW, value.isOverflow());
+
+ // Add connection state and uptime to value messages
+ message.setHeader(Constants.IEC60870_CONNECTION_STATE,
connection.getConnectionState());
+ message.setHeader(Constants.IEC60870_CONNECTION_UPTIME,
connection.getConnectionUptime());
+
+ // Add individual quality flags
+ QualityInformation quality = value.getQualityInformation();
+ if (quality != null) {
+ message.setHeader(Constants.IEC60870_QUALITY_BLOCKED,
quality.isBlocked());
+ message.setHeader(Constants.IEC60870_QUALITY_SUBSTITUTED,
quality.isSubstituted());
+ message.setHeader(Constants.IEC60870_QUALITY_NOT_TOPICAL,
quality.isTopical() == false);
+ message.setHeader(Constants.IEC60870_QUALITY_VALID,
quality.isValid());
+ }
}
}
diff --git
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientProducer.java
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientProducer.java
index 5bf02eca82e0..d50fe79c3aae 100644
---
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientProducer.java
+++
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientProducer.java
@@ -17,32 +17,74 @@
package org.apache.camel.component.iec60870.client;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.iec60870.Constants;
import org.apache.camel.component.iec60870.ObjectAddress;
import org.apache.camel.support.DefaultProducer;
import org.eclipse.neoscada.protocol.iec60870.asdu.ASDUHeader;
import
org.eclipse.neoscada.protocol.iec60870.asdu.message.SetPointCommandScaledValue;
import
org.eclipse.neoscada.protocol.iec60870.asdu.message.SetPointCommandShortFloatingPoint;
import org.eclipse.neoscada.protocol.iec60870.asdu.message.SingleCommand;
+import org.eclipse.neoscada.protocol.iec60870.asdu.types.ASDUAddress;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.CauseOfTransmission;
import
org.eclipse.neoscada.protocol.iec60870.asdu.types.InformationObjectAddress;
+import
org.eclipse.neoscada.protocol.iec60870.asdu.types.QualifierOfInterrogation;
public class ClientProducer extends DefaultProducer {
private final ClientConnection connection;
private final ASDUHeader header;
private final InformationObjectAddress address;
+ private final ObjectAddress objectAddress;
public ClientProducer(final ClientEndpoint endpoint, final
ClientConnection connection) {
super(endpoint);
this.connection = connection;
- final ObjectAddress address = endpoint.getAddress();
- this.header = new ASDUHeader(CauseOfTransmission.ACTIVATED,
address.getASDUAddress());
- this.address = address.getInformationObjectAddress();
+ this.objectAddress = endpoint.getAddress();
+ this.header = new ASDUHeader(CauseOfTransmission.ACTIVATED,
objectAddress.getASDUAddress());
+ this.address = objectAddress.getInformationObjectAddress();
}
@Override
public void process(final Exchange exchange) throws Exception {
+ final Message message = exchange.getIn();
+ final String commandType
+ = message.getHeader(Constants.IEC60870_COMMAND_TYPE,
Constants.COMMAND_TYPE_VALUE, String.class);
+
+ switch (commandType) {
+ case Constants.COMMAND_TYPE_INTERROGATION:
+ processInterrogation(message);
+ break;
+ case Constants.COMMAND_TYPE_READ:
+ processRead(message);
+ break;
+ case Constants.COMMAND_TYPE_STATUS:
+ // Status command only retrieves connection state, no protocol
command sent
+ break;
+ case Constants.COMMAND_TYPE_VALUE:
+ default:
+ processValueCommand(exchange);
+ break;
+ }
+
+ // Always set connection state headers on the exchange after processing
+ setConnectionStateHeaders(exchange);
+ }
+
+ /**
+ * Sets connection state headers on the exchange message.
+ */
+ private void setConnectionStateHeaders(final Exchange exchange) {
+ Message out = exchange.getMessage();
+ out.setHeader(Constants.IEC60870_CONNECTION_STATE,
connection.getConnectionState());
+ out.setHeader(Constants.IEC60870_CONNECTION_UPTIME,
connection.getConnectionUptime());
+ }
+
+ /**
+ * Process a value command (single, scaled, or float setpoint).
+ */
+ private void processValueCommand(final Exchange exchange) {
final Object command = mapToCommand(exchange);
if (command != null) {
@@ -52,6 +94,46 @@ public class ClientProducer extends DefaultProducer {
}
}
+ /**
+ * Process an interrogation command (C_IC_NA_1).
+ */
+ private void processInterrogation(final Message message) {
+ // Get ASDU address from header, default to the endpoint's ASDU
address or BROADCAST
+ ASDUAddress asduAddress =
message.getHeader(Constants.IEC60870_ASDU_ADDRESS, ASDUAddress.class);
+ if (asduAddress == null) {
+ asduAddress = objectAddress.getASDUAddress();
+ }
+
+ // Get QOI from header, default to GLOBAL (20)
+ Short qoiValue = message.getHeader(Constants.IEC60870_QOI,
Short.class);
+ short qoi;
+ if (qoiValue != null) {
+ qoi = qoiValue;
+ } else {
+ qoi = QualifierOfInterrogation.GLOBAL;
+ }
+
+ if (!this.connection.startInterrogation(asduAddress, qoi)) {
+ throw new IllegalStateException("Failed to send interrogation
command. Not connected.");
+ }
+ }
+
+ /**
+ * Process a read command (C_RD_NA_1).
+ */
+ private void processRead(final Message message) {
+ // Get ASDU address from header, default to the endpoint's ASDU address
+ ASDUAddress asduAddress =
message.getHeader(Constants.IEC60870_ASDU_ADDRESS, ASDUAddress.class);
+ if (asduAddress == null) {
+ asduAddress = objectAddress.getASDUAddress();
+ }
+
+ // Use the endpoint's IOA for the read command
+ if (!this.connection.readValue(asduAddress, this.address)) {
+ throw new IllegalStateException("Failed to send read command. Not
connected.");
+ }
+ }
+
private Object mapToCommand(final Exchange exchange) {
final Object body = exchange.getIn().getBody();
diff --git
a/components/camel-iec60870/src/test/java/org/apache/camel/component/iec60870/ConnectionTest.java
b/components/camel-iec60870/src/test/java/org/apache/camel/component/iec60870/ConnectionTest.java
index 9f5315187c15..2f075da43423 100644
---
a/components/camel-iec60870/src/test/java/org/apache/camel/component/iec60870/ConnectionTest.java
+++
b/components/camel-iec60870/src/test/java/org/apache/camel/component/iec60870/ConnectionTest.java
@@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.RoutesBuilder;
@@ -29,6 +30,7 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.Value;
+import org.eclipse.neoscada.protocol.iec60870.client.AutoConnectClient.State;
import
org.eclipse.neoscada.protocol.iec60870.server.data.model.WriteModel.Request;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -47,6 +49,12 @@ public class ConnectionTest extends CamelTestSupport {
private static final String DIRECT_SEND_C_1 = "direct:sendClient1";
+ private static final String DIRECT_INTERROGATION = "direct:interrogation";
+
+ private static final String DIRECT_READ = "direct:read";
+
+ private static final String DIRECT_STATUS = "direct:status";
+
private static final String MOCK_CLIENT_1 = "mock:testClient1";
private static final String MOCK_CLIENT_2 = "mock:testClient2";
@@ -59,6 +67,15 @@ public class ConnectionTest extends CamelTestSupport {
@Produce(DIRECT_SEND_C_1)
protected ProducerTemplate producerClient1;
+ @Produce(DIRECT_INTERROGATION)
+ protected ProducerTemplate producerInterrogation;
+
+ @Produce(DIRECT_READ)
+ protected ProducerTemplate producerRead;
+
+ @Produce(DIRECT_STATUS)
+ protected ProducerTemplate producerStatus;
+
@EndpointInject(MOCK_CLIENT_1)
protected MockEndpoint testClient1Endpoint;
@@ -68,20 +85,33 @@ public class ConnectionTest extends CamelTestSupport {
@EndpointInject(MOCK_SERVER_1)
protected MockEndpoint testServer1Endpoint;
+ private int testPort;
+
@Override
protected RoutesBuilder createRouteBuilder() {
- final int port = AvailablePortFinder.getNextAvailable();
+ testPort = AvailablePortFinder.getNextAvailable();
return new RouteBuilder() {
@Override
public void configure() {
-
from(DIRECT_SEND_S_1).toF("iec60870-server:localhost:%s/00-00-00-00-01", port);
- fromF("iec60870-client:localhost:%s/00-00-00-00-01",
port).to(MOCK_CLIENT_1);
- fromF("iec60870-client:localhost:%s/00-00-00-00-02",
port).to(MOCK_CLIENT_2);
+
from(DIRECT_SEND_S_1).toF("iec60870-server:localhost:%s/00-00-00-00-01",
testPort);
+ fromF("iec60870-client:localhost:%s/00-00-00-00-01",
testPort).to(MOCK_CLIENT_1);
+ fromF("iec60870-client:localhost:%s/00-00-00-00-02",
testPort).to(MOCK_CLIENT_2);
+
+
from(DIRECT_SEND_C_1).toF("iec60870-client:localhost:%s/00-00-00-01-01",
testPort);
+ fromF("iec60870-server:localhost:%s/00-00-00-01-01",
testPort).to(MOCK_SERVER_1);
+
+ // Route for interrogation command
+
from(DIRECT_INTERROGATION).toF("iec60870-client:localhost:%s/00-00-00-00-01",
testPort);
+
+ // Route for read command
+
from(DIRECT_READ).toF("iec60870-client:localhost:%s/00-00-00-00-01", testPort);
-
from(DIRECT_SEND_C_1).toF("iec60870-client:localhost:%s/00-00-00-01-01", port);
- fromF("iec60870-server:localhost:%s/00-00-00-01-01",
port).to(MOCK_SERVER_1);
+ // Route for status command - gets connection state without
sending protocol commands
+ from(DIRECT_STATUS)
+ .setHeader(Constants.IEC60870_COMMAND_TYPE,
constant(Constants.COMMAND_TYPE_STATUS))
+ .toF("iec60870-client:localhost:%s/00-00-00-00-01",
testPort);
}
};
}
@@ -175,4 +205,191 @@ public class ConnectionTest extends CamelTestSupport {
};
}
+ /**
+ * Test that connection state header is included in consumer messages.
Verifies the CamelIec60870ConnectionState
+ * header is set to CONNECTED when receiving values.
+ */
+ @Test
+ public void testConnectionStateHeader() throws InterruptedException {
+ this.producerServer1.sendBody(42.0f);
+
+ this.testClient1Endpoint.setExpectedCount(1);
+
+ // Verify connection state header is present and shows CONNECTED
+ testClient1Endpoint.message(0).predicate(exchange -> {
+ State connectionState =
exchange.getIn().getHeader(Constants.IEC60870_CONNECTION_STATE, State.class);
+ assertNotNull(connectionState, "Connection state header should be
present");
+ assertEquals(State.CONNECTED, connectionState, "Connection should
be CONNECTED");
+ return true;
+ });
+
+ MockEndpoint.assertIsSatisfied(context, 1_000, TimeUnit.MILLISECONDS);
+ LOG.debug("Connection state: {}",
testClient1Endpoint.getExchanges().get(0).getIn()
+ .getHeader(Constants.IEC60870_CONNECTION_STATE));
+ }
+
+ /**
+ * Test that connection uptime header is included in consumer messages.
Verifies the CamelIec60870ConnectionUptime
+ * header shows positive uptime when connected.
+ */
+ @Test
+ public void testConnectionUptimeHeader() throws InterruptedException {
+ this.producerServer1.sendBody(99.9f);
+
+ this.testClient1Endpoint.setExpectedCount(1);
+
+ // Verify connection uptime header is present and positive
+ testClient1Endpoint.message(0).predicate(exchange -> {
+ Long uptime =
exchange.getIn().getHeader(Constants.IEC60870_CONNECTION_UPTIME, Long.class);
+ assertNotNull(uptime, "Connection uptime header should be
present");
+ assertTrue(uptime >= 0, "Connection uptime should be
non-negative");
+ LOG.debug("Connection uptime: {} ms", uptime);
+ return true;
+ });
+
+ MockEndpoint.assertIsSatisfied(context, 1_000, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Test that individual quality flag headers are included in consumer
messages. Verifies the quality flags (Blocked,
+ * Substituted, NotTopical, Valid) are exposed as separate headers.
+ */
+ @Test
+ public void testQualityFlagHeaders() throws InterruptedException {
+ this.producerServer1.sendBody(123.45f);
+
+ this.testClient1Endpoint.setExpectedCount(1);
+
+ // Verify individual quality flag headers are present
+ testClient1Endpoint.message(0).predicate(exchange -> {
+ Boolean blocked =
exchange.getIn().getHeader(Constants.IEC60870_QUALITY_BLOCKED, Boolean.class);
+ Boolean substituted =
exchange.getIn().getHeader(Constants.IEC60870_QUALITY_SUBSTITUTED,
Boolean.class);
+ Boolean notTopical =
exchange.getIn().getHeader(Constants.IEC60870_QUALITY_NOT_TOPICAL,
Boolean.class);
+ Boolean valid =
exchange.getIn().getHeader(Constants.IEC60870_QUALITY_VALID, Boolean.class);
+
+ assertNotNull(blocked, "Quality blocked header should be present");
+ assertNotNull(substituted, "Quality substituted header should be
present");
+ assertNotNull(notTopical, "Quality not topical header should be
present");
+ assertNotNull(valid, "Quality valid header should be present");
+
+ // Good quality means: not blocked, not substituted, topical, valid
+ assertFalse(blocked, "Value should not be blocked");
+ assertFalse(substituted, "Value should not be substituted");
+ assertFalse(notTopical, "Value should be topical");
+ assertTrue(valid, "Value should be valid");
+
+ return true;
+ });
+
+ MockEndpoint.assertIsSatisfied(context, 1_000, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Test sending an interrogation command via the producer. Verifies that
the CamelIec60870CommandType=interrogation
+ * header triggers an interrogation request.
+ */
+ @Test
+ public void testInterrogationCommand() throws InterruptedException {
+ // First, send a value to the server so there's data to interrogate
+ this.producerServer1.sendBody(77.7f);
+
+ // Wait for initial value
+ this.testClient1Endpoint.setExpectedCount(1);
+ MockEndpoint.assertIsSatisfied(context, 1_000, TimeUnit.MILLISECONDS);
+ this.testClient1Endpoint.reset();
+
+ // Now send an interrogation command - this should trigger the server
to resend all values
+ // Using global interrogation (QOI = 20)
+ this.producerInterrogation.sendBodyAndHeader(
+ null,
+ Constants.IEC60870_COMMAND_TYPE,
+ Constants.COMMAND_TYPE_INTERROGATION);
+
+ // The interrogation should cause the server to resend data
+ // We expect to receive the value again
+ this.testClient1Endpoint.setMinimumExpectedMessageCount(1);
+ MockEndpoint.assertIsSatisfied(context, 2_000, TimeUnit.MILLISECONDS);
+
+ LOG.debug("Received {} messages after interrogation",
+ testClient1Endpoint.getReceivedCounter());
+ }
+
+ /**
+ * Test sending a group interrogation command with specific QOI. Verifies
that the CamelIec60870Qoi header can
+ * specify group interrogation (QOI 21-36).
+ */
+ @Test
+ public void testGroupInterrogationCommand() throws InterruptedException {
+ // First, send a value to the server
+ this.producerServer1.sendBody(88.8f);
+
+ // Wait for initial value
+ this.testClient1Endpoint.setExpectedCount(1);
+ MockEndpoint.assertIsSatisfied(context, 1_000, TimeUnit.MILLISECONDS);
+ this.testClient1Endpoint.reset();
+
+ // Send group 1 interrogation (QOI = 21)
+ Exchange exchange = producerInterrogation.send(e -> {
+ e.getIn().setHeader(Constants.IEC60870_COMMAND_TYPE,
Constants.COMMAND_TYPE_INTERROGATION);
+ e.getIn().setHeader(Constants.IEC60870_QOI, (short) 21); // Group 1
+ });
+
+ // Command should execute without error
+ assertFalse(exchange.isFailed(), "Interrogation command should not
fail");
+
+ LOG.debug("Group interrogation command sent successfully");
+ }
+
+ /**
+ * Test sending a read command via the producer. Verifies that the
CamelIec60870CommandType=read header triggers a
+ * read request.
+ */
+ @Test
+ public void testReadCommand() throws InterruptedException {
+ // First, send a value to the server so there's data to read
+ this.producerServer1.sendBody(55.5f);
+
+ // Wait for initial value
+ this.testClient1Endpoint.setExpectedCount(1);
+ MockEndpoint.assertIsSatisfied(context, 1_000, TimeUnit.MILLISECONDS);
+ this.testClient1Endpoint.reset();
+
+ // Send a read command for the specific address
+ Exchange exchange = producerRead.send(e -> {
+ e.getIn().setHeader(Constants.IEC60870_COMMAND_TYPE,
Constants.COMMAND_TYPE_READ);
+ });
+
+ // Command should execute without error
+ assertFalse(exchange.isFailed(), "Read command should not fail");
+
+ LOG.debug("Read command sent successfully");
+ }
+
+ /**
+ * Test getting connection status via producer using the 'status' command
type. This demonstrates how to get
+ * connection state on-demand without sending any IEC 60870 protocol
command.
+ *
+ */
+ @Test
+ public void testStatusCommand() throws InterruptedException {
+ // Send to the status route - the route sets the command type header
+ Exchange exchange = producerStatus.send(e -> {
+ // No body or headers needed - the route sets command type to
'status'
+ });
+
+ // Command should execute without error
+ assertFalse(exchange.isFailed(), "Status command should not fail");
+
+ // Verify connection state headers are returned by the producer
+ State connectionState =
exchange.getMessage().getHeader(Constants.IEC60870_CONNECTION_STATE,
State.class);
+ Long uptime =
exchange.getMessage().getHeader(Constants.IEC60870_CONNECTION_UPTIME,
Long.class);
+
+ assertNotNull(connectionState, "Connection state header should be
set");
+ assertNotNull(uptime, "Connection uptime header should be set");
+ assertEquals(State.CONNECTED, connectionState, "Connection should be
CONNECTED");
+ assertTrue(uptime >= 0, "Uptime should be non-negative");
+
+ LOG.debug("Status command: state={}, uptime={} ms", connectionState,
uptime);
+ }
+
}
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/ClientEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/ClientEndpointBuilderFactory.java
index 2b0dc84327e0..353a0970c8a2 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/ClientEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/ClientEndpointBuilderFactory.java
@@ -1794,6 +1794,142 @@ public interface ClientEndpointBuilderFactory {
public String iec60870Overflow() {
return "CamelIec60870Overflow";
}
+ /**
+ * The connection state (CONNECTED, DISCONNECTED, etc.).
+ *
+ * The option is a: {@code
+ *
org.eclipse.neoscada.protocol.iec60870.client.AutoConnectClient.State} type.
+ *
+ * Group: consumer
+ *
+ * @return the name of the header {@code Iec60870ConnectionState}.
+ */
+ public String iec60870ConnectionState() {
+ return "CamelIec60870ConnectionState";
+ }
+ /**
+ * The connection state error if any.
+ *
+ * The option is a: {@code Throwable} type.
+ *
+ * Group: consumer
+ *
+ * @return the name of the header {@code Iec60870ConnectionError}.
+ */
+ public String iec60870ConnectionError() {
+ return "CamelIec60870ConnectionError";
+ }
+ /**
+ * Connection uptime in milliseconds since last connected.
+ *
+ * The option is a: {@code long} type.
+ *
+ * Group: consumer
+ *
+ * @return the name of the header {@code Iec60870ConnectionUptime}.
+ */
+ public String iec60870ConnectionUptime() {
+ return "CamelIec60870ConnectionUptime";
+ }
+ /**
+ * The command type: 'value' (default), 'interrogation', 'read', or
+ * 'status'.
+ *
+ * The option is a: {@code String} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code Iec60870CommandType}.
+ */
+ public String iec60870CommandType() {
+ return "CamelIec60870CommandType";
+ }
+ /**
+ * The ASDU address for interrogation (optional, defaults to
broadcast).
+ *
+ * The option is a: {@code
+ * org.eclipse.neoscada.protocol.iec60870.asdu.types.ASDUAddress} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code Iec60870AsduAddress}.
+ */
+ public String iec60870AsduAddress() {
+ return "CamelIec60870AsduAddress";
+ }
+ /**
+ * The qualifier of interrogation: 20 (global) or 21-36 (groups 1-16).
+ *
+ * The option is a: {@code short} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code Iec60870Qoi}.
+ */
+ public String iec60870Qoi() {
+ return "CamelIec60870Qoi";
+ }
+ /**
+ * Quality flag: Blocked (BL).
+ *
+ * The option is a: {@code boolean} type.
+ *
+ * Group: consumer
+ *
+ * @return the name of the header {@code Iec60870QualityBlocked}.
+ */
+ public String iec60870QualityBlocked() {
+ return "CamelIec60870QualityBlocked";
+ }
+ /**
+ * Quality flag: Substituted (SB).
+ *
+ * The option is a: {@code boolean} type.
+ *
+ * Group: consumer
+ *
+ * @return the name of the header {@code Iec60870QualitySubstituted}.
+ */
+ public String iec60870QualitySubstituted() {
+ return "CamelIec60870QualitySubstituted";
+ }
+ /**
+ * Quality flag: Not topical (NT).
+ *
+ * The option is a: {@code boolean} type.
+ *
+ * Group: consumer
+ *
+ * @return the name of the header {@code Iec60870QualityNotTopical}.
+ */
+ public String iec60870QualityNotTopical() {
+ return "CamelIec60870QualityNotTopical";
+ }
+ /**
+ * Quality flag: Invalid (IV).
+ *
+ * The option is a: {@code boolean} type.
+ *
+ * Group: consumer
+ *
+ * @return the name of the header {@code Iec60870QualityValid}.
+ */
+ public String iec60870QualityValid() {
+ return "CamelIec60870QualityValid";
+ }
+ /**
+ * The cause of transmission.
+ *
+ * The option is a: {@code
+ *
org.eclipse.neoscada.protocol.iec60870.asdu.types.CauseOfTransmission} type.
+ *
+ * Group: consumer
+ *
+ * @return the name of the header {@code Iec60870CauseOfTransmission}.
+ */
+ public String iec60870CauseOfTransmission() {
+ return "CamelIec60870CauseOfTransmission";
+ }
}
static ClientEndpointBuilder endpointBuilder(String componentName, String
path) {
class ClientEndpointBuilderImpl extends AbstractEndpointBuilder
implements ClientEndpointBuilder, AdvancedClientEndpointBuilder {