[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725221#comment-14725221 ]
ASF GitHub Bot commented on FLINK-2536: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r38408571 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java --- @@ -73,13 +90,56 @@ public void intializeConnection() { * The incoming data */ @Override - public void invoke(IN value) { + public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { dataOutputStream.write(msg); } catch (IOException e) { - throw new RuntimeException("Cannot send message " + value.toString() + - " to socket server at " + hostName + ":" + port, e); + LOG.error("Cannot send message " + value.toString() + + " to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() + + ". Trying to reconnect."); + retries = 0; + boolean success = false; + while ((retries < maxRetry || retryForever) && !success && isRunning){ + try { + + if (dataOutputStream != null) { + dataOutputStream.close(); + } + + if (client != null && !client.isClosed()) { + client.close(); + } + + retries++; + + client = new Socket(hostName, port); + dataOutputStream = new DataOutputStream(client.getOutputStream()); + dataOutputStream.write(msg); + success = true; + + } catch(IOException ee) { + LOG.error("Reconnect to socket server and send message failed. Caused by " + + ee.toString() + ". Retry time(s):" + retries); + + if (lock == null) { --- End diff -- The `lock` part can probably be replaced by a simple `Thread.sleep(...)`. > Add a retry for SocketClientSink > -------------------------------- > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming > Affects Versions: 0.10 > Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)