This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 0b19988 CAMEL-14854: camel-irc - Should reconnect if failing to use
existing connection in the producer.
0b19988 is described below
commit 0b199880d71bdd9895dbef96aaa8bbf3e6e93320
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Jun 19 14:51:04 2020 +0200
CAMEL-14854: camel-irc - Should reconnect if failing to use existing
connection in the producer.
---
.../apache/camel/component/irc/IrcComponent.java | 8 +++
.../apache/camel/component/irc/IrcConsumer.java | 5 +-
.../apache/camel/component/irc/IrcEndpoint.java | 2 +-
.../apache/camel/component/irc/IrcProducer.java | 59 ++++++++++++++--------
.../camel/component/irc/IrcProducerTest.java | 11 ++--
5 files changed, 56 insertions(+), 29 deletions(-)
diff --git
a/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcComponent.java
b/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcComponent.java
index a644ca5..01ee562 100644
---
a/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcComponent.java
+++
b/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcComponent.java
@@ -72,6 +72,14 @@ public class IrcComponent extends DefaultComponent
implements SSLContextParamete
return connection;
}
+ public synchronized void closeIRCConnection(IrcConfiguration
configuration) {
+ IRCConnection connection =
connectionCache.get(configuration.getCacheKey());
+ if (connection != null) {
+ closeConnection(configuration.getCacheKey(),connection);
+ connectionCache.remove(configuration.getCacheKey());
+ }
+ }
+
protected IRCConnection createConnection(IrcConfiguration configuration) {
IRCConnection conn = null;
IRCEventListener ircLogger;
diff --git
a/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcConsumer.java
b/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcConsumer.java
index 596da1c..8800ade 100644
---
a/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcConsumer.java
+++
b/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcConsumer.java
@@ -34,7 +34,7 @@ public class IrcConsumer extends DefaultConsumer {
private final IrcConfiguration configuration;
private final IrcEndpoint endpoint;
private final IRCConnection connection;
- private IRCEventAdapter listener;
+ private IRCEventAdapter listener = new FilteredIRCEventAdapter();
public IrcConsumer(IrcEndpoint endpoint, Processor processor,
IRCConnection connection) {
super(endpoint, processor);
@@ -83,9 +83,6 @@ public class IrcConsumer extends DefaultConsumer {
}
public IRCEventAdapter getListener() {
- if (listener == null) {
- listener = new FilteredIRCEventAdapter();
- }
return listener;
}
diff --git
a/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcEndpoint.java
b/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcEndpoint.java
index ae11734..123933a 100644
---
a/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcEndpoint.java
+++
b/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcEndpoint.java
@@ -129,7 +129,7 @@ public class IrcEndpoint extends DefaultEndpoint {
@Override
public IrcProducer createProducer() throws Exception {
- return new IrcProducer(this,
component.getIRCConnection(configuration));
+ return new IrcProducer(this);
}
@Override
diff --git
a/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcProducer.java
b/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcProducer.java
index c0858e4..3cf4aa5 100644
---
a/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcProducer.java
+++
b/components/camel-irc/src/main/java/org/apache/camel/component/irc/IrcProducer.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.irc;
+import java.io.IOException;
+
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.support.DefaultProducer;
@@ -32,16 +35,16 @@ public class IrcProducer extends DefaultProducer {
private static final Logger LOG =
LoggerFactory.getLogger(IrcProducer.class);
- private final IrcConfiguration configuration;
- private IRCConnection connection;
- private IrcEndpoint endpoint;
- private IRCEventAdapter listener;
+ private transient IRCConnection connection;
+ private IRCEventAdapter listener = new FilteredIRCEventAdapter();
- public IrcProducer(IrcEndpoint endpoint, IRCConnection connection) {
+ public IrcProducer(IrcEndpoint endpoint) {
super(endpoint);
- this.endpoint = endpoint;
- this.connection = connection;
- this.configuration = endpoint.getConfiguration();
+ }
+
+ @Override
+ public IrcEndpoint getEndpoint() {
+ return (IrcEndpoint) super.getEndpoint();
}
@Override
@@ -49,7 +52,10 @@ public class IrcProducer extends DefaultProducer {
final String msg = exchange.getIn().getBody(String.class);
final String sendTo =
exchange.getIn().getHeader(IrcConstants.IRC_SEND_TO, String.class);
- if (!connection.isConnected()) {
+ if (connection == null || !connection.isConnected()) {
+ reconnect();
+ }
+ if (connection == null || !connection.isConnected()) {
throw new RuntimeCamelException("Lost connection to " +
connection.getHost());
}
@@ -61,7 +67,7 @@ public class IrcProducer extends DefaultProducer {
LOG.debug("Sending to: {} message: {}", sendTo, msg);
connection.doPrivmsg(sendTo, msg);
} else {
- for (IrcChannel channel :
endpoint.getConfiguration().getChannelList()) {
+ for (IrcChannel channel :
getEndpoint().getConfiguration().getChannelList()) {
LOG.debug("Sending to: {} message: {}", channel, msg);
connection.doPrivmsg(channel.getName(), msg);
}
@@ -72,22 +78,36 @@ public class IrcProducer extends DefaultProducer {
@Override
protected void doStart() throws Exception {
super.doStart();
- listener = getListener();
+ reconnect();
+ }
+
+ protected void reconnect() {
+ // create new connection
+ if (connection == null || connection.isConnected()) {
+ connection =
getEndpoint().getComponent().getIRCConnection(getEndpoint().getConfiguration());
+ } else {
+ // reconnecting
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reconnecting to {}:{}",
getEndpoint().getConfiguration().getHostname(),
getEndpoint().getConfiguration().getNickname());
+ }
+
getEndpoint().getComponent().closeConnection(getEndpoint().getConfiguration().getCacheKey(),
connection);
+ connection =
getEndpoint().getComponent().getIRCConnection(getEndpoint().getConfiguration());
+ }
connection.addIRCEventListener(listener);
- LOG.debug("Sleeping for {} seconds before sending commands.",
configuration.getCommandTimeout() / 1000);
+ LOG.debug("Sleeping for {} seconds before sending commands.",
getEndpoint().getConfiguration().getCommandTimeout() / 1000);
// sleep for a few seconds as the server sometimes takes a moment to
fully connect, print banners, etc after connection established
try {
- Thread.sleep(configuration.getCommandTimeout());
+ Thread.sleep(getEndpoint().getConfiguration().getCommandTimeout());
} catch (InterruptedException ex) {
// ignore
}
- endpoint.joinChannels();
+ getEndpoint().joinChannels();
}
@Override
protected void doStop() throws Exception {
if (connection != null) {
- for (IrcChannel channel :
endpoint.getConfiguration().getChannelList()) {
+ for (IrcChannel channel :
getEndpoint().getConfiguration().getChannelList()) {
LOG.debug("Parting: {}", channel);
connection.doPart(channel.getName());
}
@@ -106,9 +126,6 @@ public class IrcProducer extends DefaultProducer {
}
public IRCEventAdapter getListener() {
- if (listener == null) {
- listener = new FilteredIRCEventAdapter();
- }
return listener;
}
@@ -122,15 +139,15 @@ public class IrcProducer extends DefaultProducer {
public void onKick(String channel, IRCUser user, String passiveNick,
String msg) {
// check to see if I got kick and if so rejoin if autoRejoin is on
- if (passiveNick.equals(connection.getNick()) &&
endpoint.getConfiguration().isAutoRejoin()) {
- endpoint.joinChannel(channel);
+ if (passiveNick.equals(connection.getNick()) &&
getEndpoint().getConfiguration().isAutoRejoin()) {
+ getEndpoint().joinChannel(channel);
}
}
@Override
public void onError(int num, String msg) {
- IrcProducer.this.endpoint.handleIrcError(num, msg);
+ IrcProducer.this.getEndpoint().handleIrcError(num, msg);
}
}
diff --git
a/components/camel-irc/src/test/java/org/apache/camel/component/irc/IrcProducerTest.java
b/components/camel-irc/src/test/java/org/apache/camel/component/irc/IrcProducerTest.java
index dcdf5f0..3ad68cc 100644
---
a/components/camel-irc/src/test/java/org/apache/camel/component/irc/IrcProducerTest.java
+++
b/components/camel-irc/src/test/java/org/apache/camel/component/irc/IrcProducerTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.when;
public class IrcProducerTest {
+ private IrcComponent component;
private IRCConnection connection;
private IrcEndpoint endpoint;
private IrcConfiguration configuration;
@@ -43,6 +44,7 @@ public class IrcProducerTest {
@Before
public void doSetup() {
+ component = mock(IrcComponent.class);
connection = mock(IRCConnection.class);
endpoint = mock(IrcEndpoint.class);
configuration = mock(IrcConfiguration.class);
@@ -56,14 +58,17 @@ public class IrcProducerTest {
when(configuration.getChannelList()).thenReturn(channels);
when(endpoint.getConfiguration()).thenReturn(configuration);
+ when(endpoint.getComponent()).thenReturn(component);
+ when(component.getIRCConnection(configuration)).thenReturn(connection);
- producer = new IrcProducer(endpoint, connection);
+ producer = new IrcProducer(endpoint);
producer.setListener(listener);
+ producer.start();
}
@Test
public void doStopTest() throws Exception {
- producer.doStop();
+ producer.stop();
verify(connection).doPart("#chan1");
verify(connection).doPart("#chan2");
verify(connection).removeIRCEventListener(listener);
@@ -71,7 +76,7 @@ public class IrcProducerTest {
@Test
public void doStartTest() throws Exception {
- producer.doStart();
+ producer.start();
verify(connection).addIRCEventListener(listener);
verify(endpoint).joinChannels();