[ 
https://issues.apache.org/jira/browse/FLINK-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15374902#comment-15374902
 ] 

ASF GitHub Bot commented on FLINK-2125:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2233#discussion_r70614465
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
    @@ -96,19 +96,21 @@ public void run(SourceContext<String> ctx) throws 
Exception {
                                socket.connect(new InetSocketAddress(hostname, 
port), CONNECTION_TIMEOUT_TIME);
                                BufferedReader reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
     
    -                           int data;
    -                           while (isRunning && (data = reader.read()) != 
-1) {
    -                                   // check if the string is complete
    -                                   if (data != delimiter) {
    -                                           buffer.append((char) data);
    -                                   }
    -                                   else {
    +                           char[] cbuf = new char[8192];
    +                           int byteRead;
    +                           while (isRunning && (byteRead = 
reader.read(cbuf)) != -1) {
    +                                   buffer.append(cbuf, 0, byteRead);
    +                                   int delimPos;
    +                                   while (buffer.length() >= 
delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
    +                                           String token = 
buffer.substring(0, delimPos);
                                                // truncate trailing carriage 
return
    -                                           if (delimiter == '\n' && 
buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
    -                                                   
buffer.setLength(buffer.length() - 1);
    +                                           if (delimiter.equals("\n") && 
token.endsWith("\r")) {
    +                                                   token = 
token.substring(0, token.length() - 1);
    +                                           }
    +                                           if (!token.isEmpty()) {
    +                                                   ctx.collect(token);
    --- End diff --
    
    This changes existing behavior. Empty strings are filtered out, which 
wasn't done before.


> String delimiter for SocketTextStream
> -------------------------------------
>
>                 Key: FLINK-2125
>                 URL: https://issues.apache.org/jira/browse/FLINK-2125
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.9
>            Reporter: Márton Balassi
>            Priority: Minor
>              Labels: starter
>
> The SocketTextStreamFunction uses a character delimiter, despite other parts 
> of the API using String delimiter.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to