Hi Dhruv,

The changes look good to me.

Best, Fabian

2018-05-08 5:37 GMT+02:00 Dhruv Kumar <gargdhru...@gmail.com>:

> Thanks a lot, Fabian for your response.
>
> What I understand is that if I write my own Sourcefunction such that it
> handles the "end of stream” record and make the source exit from run()
> method, the flink program will terminate.
>
> I have been using *SocketTextStreamFunction* till now.
> So, I duplicated the *SocketTextStreamFunction* class into another class
> named *CustomSocketTextStreamFunction* which is exactly the same as
> *SocketTextStreamFunction* except for one change in the *run()* method.
> Change is highlighted in *BOLD* below. Can you take a look and let me
> know if this will work and it won’t have much of performance impact? I
> tested it on my machine locally and seems to work fine. But I just want to
> make sure that it won’t have any side effects/race conditions etc.
>
> ```
> @Override
>     public void run(SourceContext<String> ctx) throws Exception {
>         final StringBuilder buffer = new StringBuilder();
>         long attempt = 0;
>
>         while (isRunning) {
>
>             try (Socket socket = new Socket()) {
>                 currentSocket = socket;
>
>                 LOG.info("Custom: Connecting to server socket " +
> hostname + ':' + port);
>                 socket.connect(new InetSocketAddress(hostname, port),
> CONNECTION_TIMEOUT_TIME);
>                 BufferedReader reader = new BufferedReader(new
> InputStreamReader(socket.getInputStream()));
>
>                 char[] cbuf = new char[8192];
>                 int bytesRead;
>                 while (isRunning && (bytesRead = reader.read(cbuf)) != -1)
> {
>                     buffer.append(cbuf, 0, bytesRead);
>                     int delimPos;
>                     while (buffer.length() >= delimiter.length() &&
> (delimPos = buffer.indexOf(delimiter)) != -1) {
>                         String record = buffer.substring(0, delimPos);
>                         *if(record.equals("END")) {*
> *                            LOG.info <http://LOG.info>("End of stream
> encountered");*
> *                            isRunning = false;*
> *                            buffer.delete(0, delimPos +
> delimiter.length());*
> *                            break;*
> *                        }*
>                         // truncate trailing carriage return
>                         if (delimiter.equals("\n") &&
> record.endsWith("\r")) {
>                             record = record.substring(0, record.length() -
> 1);
>                         }
>                         ctx.collect(record);
>                         buffer.delete(0, delimPos + delimiter.length());
>                     }
>                 }
>             }
>
>             // if we dropped out of this loop due to an EOF, sleep and
> retry
>             if (isRunning) {
>                 attempt++;
>                 if (maxNumRetries == -1 || attempt < maxNumRetries) {
>                     LOG.warn("Lost connection to server socket. Retrying
> in " + delayBetweenRetries + " msecs...");
>                     Thread.sleep(delayBetweenRetries);
>                 }
>                 else {
>                     // this should probably be here, but some examples
> expect simple exists of the stream source
>                     // throw new EOFException("Reached end of stream and
> reconnects are not enabled.");
>                     break;
>                 }
>             }
>         }
>
>         // collect trailing data
>         if (buffer.length() > 0) {
>             ctx.collect(buffer.toString());
>         }
>     }
> ```
>
>
> --------------------------------------------------
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
> On May 7, 2018, at 11:04, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi,
>
> Flink will automatically stop the execution of a DataStream program once
> all sources have finished to provide data, i.e., when all SourceFunction
> return from the run() method.
> The DeserializationSchema.isEndOfStream() method can be used to tell a
> built-in SourceFunction such as a KafkaConsumer that it should leave the
> run() method.
> If you implement your own SourceFunction you can leave run() after you
> ingested all data.
>
> Note, that Flink won't wait for all processing time timers but will
> immediately shutdown the program after the last in-flight record was
> processed.
> Event-time timers will be handled because each source emits a
> Long.MAX_VALUE watermark after it emitted its last record.
>
> Best, Fabian
>
> 2018-05-07 17:18 GMT+02:00 Dhruv Kumar <gargdhru...@gmail.com>:
>
>> I notice that there is some *DeserializationSchema* in 
>> *org.apache.flink.api.common.se
>> <http://org.apache.flink.api.common.se>rialization* which has a function
>> *isEndOfStream* but I am not sure if I can use it in my use case.
>>
>> --------------------------------------------------
>> *Dhruv Kumar*
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me
>>
>> On May 7, 2018, at 06:18, Dhruv Kumar <gargdhru...@gmail.com> wrote:
>>
>> Hi
>>
>> Is there a way I can capture the end of stream signal for streams which
>> are replayed from historical data? I need the end of stream signal to tell
>> the Flink program to finish its execution.
>>
>> Below is the use case in detail:
>> 1. An independent log replayer program sends the records to a socket
>> (identified by ip address and port).
>> 2. Flink program reads the incoming records via socketTextStream from the
>> above mentioned socket, applies a KeyBy operator on the incoming records
>> and then does some processing, finally writing them to another socket.
>>
>> How do I tell the Flink program to finish its execution? Is there any
>> information which I can add to the records while they are sent from the
>> replayer program and which can be parsed when the records arrive inside the
>> Flink program?
>>
>> Let me know if anything is not clear.
>>
>> Thanks
>>
>> --------------------------------------------------
>> *Dhruv Kumar*
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me
>>
>>
>>
>
>

Reply via email to