[ https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709305#comment-14709305 ]
ASF GitHub Bot commented on FLINK-2536: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1030#discussion_r37751723 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}. + */ +public class SocketClientSinkTest{ + + final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); + private final String host = "127.0.0.1"; + private int port; + private String value; + + public Thread t; + + public SocketClientSinkTest() { + } + + @Test + public void testSocketSink() throws Exception{ + value = ""; + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + simpleSink.invoke("testSocketSinkInvoke"); + simpleSink.close(); + } catch (Exception e){ + error.set(e); + } + } + }).start(); + + Socket sk = server.accept(); + BufferedReader rdr = new BufferedReader(new InputStreamReader(sk + .getInputStream())); + value = rdr.readLine(); + + t.join(); + server.close(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail("Error in spawned thread: " + t.getMessage()); + } + + assertEquals("testSocketSinkInvoke", value); + } + + @Test + public void testSocketSinkNoRetry() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0); + simpleSink.open(new Configuration()); + + synchronized (t) { + //wating for server to close + t.wait(); + } + + //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT + simpleSink.invoke("testSocketSinkInvoke"); + + //socket is closed then test "retry" + simpleSink.invoke("testSocketSinkInvoke"); + simpleSink.close(); + assertEquals(0, simpleSink.retries); + } catch (Exception e) { + //This Exception is good since the socket server is never reopen. + } + } + }).start(); + + Socket sk = server.accept(); + sk.setKeepAlive(false); + sk.close(); + server.close(); + + synchronized (t) { + t.notifyAll(); + } + + t.join(); + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail("Error in spawned thread: " + t.getMessage()); + } + } + + @Test + public void testSocketSinkRetryTenTimes() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10); + simpleSink.open(new Configuration()); + + synchronized (t){ + //wating for server to close + t.wait(); + } + + //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT + simpleSink.invoke("testSocketSinkInvoke"); + + //socket is closed then test "retry" + simpleSink.invoke("testSocketSinkInvoke"); + simpleSink.close(); + assertEquals(10, simpleSink.retries); + } catch (Exception e) { + //This Exception is good since the socket server is never reopen. + } + } + }).start(); + + Socket sk = server.accept(); + sk.setKeepAlive(false); + sk.close(); + server.close(); + + synchronized (t) { + t.notifyAll(); + } + + t.join(); + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail("Error in spawned thread: " + t.getMessage()); + } + } + + @Test + public void testSocketSinkRetryAccess() throws Exception{ + ServerSocket server = new ServerSocket(0); + port = server.getLocalPort(); + + new Thread(new Runnable() { + + Thread t1; + SocketClientSink<String> simpleSink; + + @Override + public void run() { + t = Thread.currentThread(); + SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() { + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + }; + + try { + simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10); + simpleSink.open(new Configuration()); + + synchronized (t) { + //wating for server to close + t.wait(); + } + + //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT + simpleSink.invoke("testSocketSinkInvoke"); + + new Thread(new Runnable() { + + @Override + public void run() { + t1 = Thread.currentThread(); + try { + //socket is closed then test "retry" + simpleSink.invoke("testSocketSinkInvoke"); + simpleSink.close(); + } catch (Exception e) { + error.set(e); + } + } + + }).start(); + + //set a new server to let the retry success. + while (simpleSink.retries == 0){ + synchronized (simpleSink) { + simpleSink.notifyAll(); + } + } + + //reopen socket server for sink access + value = ""; + ServerSocket server = new ServerSocket(port); --- End diff -- This might be a problem if this one thread is really slow. Better would be an explicit synchronization mechanism. > Add a retry for SocketClientSink > -------------------------------- > > Key: FLINK-2536 > URL: https://issues.apache.org/jira/browse/FLINK-2536 > Project: Flink > Issue Type: Improvement > Components: Streaming > Affects Versions: 0.10 > Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > I found the SocketClientSink doesn`t use a re-connect when disconnect from > the socket server or get exception. > I`d like to add a re-connect like socket source for socket sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)