Hi Dhruv, The changes look good to me.
Best, Fabian 2018-05-08 5:37 GMT+02:00 Dhruv Kumar <gargdhru...@gmail.com>: > Thanks a lot, Fabian for your response. > > What I understand is that if I write my own Sourcefunction such that it > handles the "end of stream” record and make the source exit from run() > method, the flink program will terminate. > > I have been using *SocketTextStreamFunction* till now. > So, I duplicated the *SocketTextStreamFunction* class into another class > named *CustomSocketTextStreamFunction* which is exactly the same as > *SocketTextStreamFunction* except for one change in the *run()* method. > Change is highlighted in *BOLD* below. Can you take a look and let me > know if this will work and it won’t have much of performance impact? I > tested it on my machine locally and seems to work fine. But I just want to > make sure that it won’t have any side effects/race conditions etc. > > ``` > @Override > public void run(SourceContext<String> ctx) throws Exception { > final StringBuilder buffer = new StringBuilder(); > long attempt = 0; > > while (isRunning) { > > try (Socket socket = new Socket()) { > currentSocket = socket; > > LOG.info("Custom: Connecting to server socket " + > hostname + ':' + port); > socket.connect(new InetSocketAddress(hostname, port), > CONNECTION_TIMEOUT_TIME); > BufferedReader reader = new BufferedReader(new > InputStreamReader(socket.getInputStream())); > > char[] cbuf = new char[8192]; > int bytesRead; > while (isRunning && (bytesRead = reader.read(cbuf)) != -1) > { > buffer.append(cbuf, 0, bytesRead); > int delimPos; > while (buffer.length() >= delimiter.length() && > (delimPos = buffer.indexOf(delimiter)) != -1) { > String record = buffer.substring(0, delimPos); > *if(record.equals("END")) {* > * LOG.info <http://LOG.info>("End of stream > encountered");* > * isRunning = false;* > * buffer.delete(0, delimPos + > delimiter.length());* > * break;* > * }* > // truncate trailing carriage return > if (delimiter.equals("\n") && > record.endsWith("\r")) { > record = record.substring(0, record.length() - > 1); > } > ctx.collect(record); > buffer.delete(0, delimPos + delimiter.length()); > } > } > } > > // if we dropped out of this loop due to an EOF, sleep and > retry > if (isRunning) { > attempt++; > if (maxNumRetries == -1 || attempt < maxNumRetries) { > LOG.warn("Lost connection to server socket. Retrying > in " + delayBetweenRetries + " msecs..."); > Thread.sleep(delayBetweenRetries); > } > else { > // this should probably be here, but some examples > expect simple exists of the stream source > // throw new EOFException("Reached end of stream and > reconnects are not enabled."); > break; > } > } > } > > // collect trailing data > if (buffer.length() > 0) { > ctx.collect(buffer.toString()); > } > } > ``` > > > -------------------------------------------------- > *Dhruv Kumar* > PhD Candidate > Department of Computer Science and Engineering > University of Minnesota > www.dhruvkumar.me > > On May 7, 2018, at 11:04, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi, > > Flink will automatically stop the execution of a DataStream program once > all sources have finished to provide data, i.e., when all SourceFunction > return from the run() method. > The DeserializationSchema.isEndOfStream() method can be used to tell a > built-in SourceFunction such as a KafkaConsumer that it should leave the > run() method. > If you implement your own SourceFunction you can leave run() after you > ingested all data. > > Note, that Flink won't wait for all processing time timers but will > immediately shutdown the program after the last in-flight record was > processed. > Event-time timers will be handled because each source emits a > Long.MAX_VALUE watermark after it emitted its last record. > > Best, Fabian > > 2018-05-07 17:18 GMT+02:00 Dhruv Kumar <gargdhru...@gmail.com>: > >> I notice that there is some *DeserializationSchema* in >> *org.apache.flink.api.common.se >> <http://org.apache.flink.api.common.se>rialization* which has a function >> *isEndOfStream* but I am not sure if I can use it in my use case. >> >> -------------------------------------------------- >> *Dhruv Kumar* >> PhD Candidate >> Department of Computer Science and Engineering >> University of Minnesota >> www.dhruvkumar.me >> >> On May 7, 2018, at 06:18, Dhruv Kumar <gargdhru...@gmail.com> wrote: >> >> Hi >> >> Is there a way I can capture the end of stream signal for streams which >> are replayed from historical data? I need the end of stream signal to tell >> the Flink program to finish its execution. >> >> Below is the use case in detail: >> 1. An independent log replayer program sends the records to a socket >> (identified by ip address and port). >> 2. Flink program reads the incoming records via socketTextStream from the >> above mentioned socket, applies a KeyBy operator on the incoming records >> and then does some processing, finally writing them to another socket. >> >> How do I tell the Flink program to finish its execution? Is there any >> information which I can add to the records while they are sent from the >> replayer program and which can be parsed when the records arrive inside the >> Flink program? >> >> Let me know if anything is not clear. >> >> Thanks >> >> -------------------------------------------------- >> *Dhruv Kumar* >> PhD Candidate >> Department of Computer Science and Engineering >> University of Minnesota >> www.dhruvkumar.me >> >> >> > >