This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 17c3b2c CAMEL-13118: Components should not depend on camel-core but
camel-support
17c3b2c is described below
commit 17c3b2c8b08a64bacbd40006642989796b72c6a9
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Apr 18 08:09:04 2019 +0200
CAMEL-13118: Components should not depend on camel-core but camel-support
---
components/camel-grpc/pom.xml | 2 +-
.../apache/camel/component/grpc/GrpcProducer.java | 10 ++++++-
.../client/GrpcResponseRouterStreamObserver.java | 31 +++++++++++++---------
3 files changed, 28 insertions(+), 15 deletions(-)
diff --git a/components/camel-grpc/pom.xml b/components/camel-grpc/pom.xml
index 5d3a06d..db4545c 100644
--- a/components/camel-grpc/pom.xml
+++ b/components/camel-grpc/pom.xml
@@ -43,7 +43,7 @@
<!-- requires camel-core -->
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-core</artifactId>
+ <artifactId>camel-support</artifactId>
</dependency>
<dependency>
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
index ad542be..3e99b9c 100644
---
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
@@ -38,6 +38,7 @@ import
org.apache.camel.component.grpc.client.GrpcResponseRouterStreamObserver;
import org.apache.camel.spi.ClassResolver;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
/**
@@ -117,11 +118,18 @@ public class GrpcProducer extends DefaultAsyncProducer {
if (configuration.getStreamRepliesTo() != null) {
this.globalResponseObserver = new
GrpcResponseRouterStreamObserver(configuration, getEndpoint());
}
+
+ if (this.globalResponseObserver != null) {
+ ServiceHelper.startService(this.globalResponseObserver);
+ }
}
}
@Override
protected void doStop() throws Exception {
+ if (this.globalResponseObserver != null) {
+ ServiceHelper.stopService(this.globalResponseObserver);
+ }
if (channel != null) {
forwarder.shutdown();
forwarder = null;
@@ -136,7 +144,7 @@ public class GrpcProducer extends DefaultAsyncProducer {
}
protected void initializeChannel() throws Exception {
- NettyChannelBuilder channelBuilder = null;
+ NettyChannelBuilder channelBuilder;
if (!ObjectHelper.isEmpty(configuration.getHost()) &&
!ObjectHelper.isEmpty(configuration.getPort())) {
log.info("Creating channel to the remote gRPC server {}:{}",
configuration.getHost(), configuration.getPort());
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
index c1edbd0..749b53e 100644
---
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
@@ -17,31 +17,28 @@
package org.apache.camel.component.grpc.client;
import io.grpc.stub.StreamObserver;
-import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.component.grpc.GrpcConfiguration;
import org.apache.camel.component.grpc.GrpcConstants;
-import org.apache.camel.impl.DefaultProducerCache;
-import org.apache.camel.spi.ProducerCache;
import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.service.ServiceSupport;
/**
* A stream observer that routes all responses to another endpoint.
*/
-public class GrpcResponseRouterStreamObserver implements
StreamObserver<Object> {
+public class GrpcResponseRouterStreamObserver extends ServiceSupport
implements StreamObserver<Object> {
private final Endpoint sourceEndpoint;
private final GrpcConfiguration configuration;
- private final Endpoint endpoint;
- private final ProducerCache producerCache;
+ private Endpoint endpoint;
+ private AsyncProducer producer;
- public GrpcResponseRouterStreamObserver(GrpcConfiguration configuration,
Endpoint sourceEndpoint) {
+ public GrpcResponseRouterStreamObserver(GrpcConfiguration configuration,
Endpoint sourceEndpoint) throws Exception {
this.configuration = configuration;
this.sourceEndpoint = sourceEndpoint;
- this.endpoint =
CamelContextHelper.getMandatoryEndpoint(sourceEndpoint.getCamelContext(),
configuration.getStreamRepliesTo());
- sourceEndpoint.getCamelContext().createProducerTemplate(-1);
- this.producerCache = new DefaultProducerCache(this,
sourceEndpoint.getCamelContext(), -1);
}
@Override
@@ -50,7 +47,6 @@ public class GrpcResponseRouterStreamObserver implements
StreamObserver<Object>
exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT);
exchange.getIn().setBody(o);
doSend(exchange);
-
}
@Override
@@ -72,10 +68,19 @@ public class GrpcResponseRouterStreamObserver implements
StreamObserver<Object>
}
}
-
private void doSend(Exchange exchange) {
+ producer.processAsync(exchange);
+ }
- producerCache.doInAsyncProducer(endpoint, exchange, doneSync -> { },
AsyncProcessor::process);
+ @Override
+ protected void doStart() throws Exception {
+ this.endpoint =
CamelContextHelper.getMandatoryEndpoint(sourceEndpoint.getCamelContext(),
configuration.getStreamRepliesTo());
+ this.producer = endpoint.createAsyncProducer();
+ ServiceHelper.startService(producer);
}
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(producer);
+ }
}