Great! Thanks for reporting back :-) 2017-08-09 22:52 GMT+02:00 Chao Wang <chaow...@wustl.edu>:
> It seems that the observed long latencies were due to certain one-time > internal mechanism that only occurred after Flink has received the first > message. Based on my measurement that mechanism took around 100 ms. > > Now I setup my application the following way, and I observed that the > end-to-end latency is similar to that of using raw sockets (off by less > than 1 ms): Send the first message to Flink and then wait for 110 ms before > sending the second message. And for the subsequent sends we can remove the > 110 ms wait. > > Chao > > > On 08/09/2017 10:57 AM, Chao Wang wrote: > > Thank you, Fabian. > > Maybe there's also some buffers sit between data source and the first > operator? I observed that in my implementation of SourceFunction (using a > socket server, as listed in the previous email), for receiving two > messages, in terms of event time, it took 0.2 ms before the SourceFunction > receives the first message but then it took 97 ms to receive the second > message. The interval between the two sends is 0.07 ms at the sending side, > which is a java socket client. > > Or could it be that there is a timeout setting for scheduling data source > in Flink? > > > Thanks, > > Chao > > On 08/08/2017 02:58 AM, Fabian Hueske wrote: > > One pointer is the StreamExecutionEnvironment.setBufferTimeout() > parameter. > Flink's network stack collects records in buffers to send them over the > network. A buffer is sent when it is completely filled or after a > configurable timeout. > So if your program does not process many records, these records might "get > stuck" in the buffers and be emitted after the timeout flushes the buffer. > The default timeout is 100ms. Try to reduce it. > > Best, Fabian > > 2017-08-08 1:06 GMT+02:00 Chao Wang <chaow...@wustl.edu>: > >> Following the original post, I've tried stripping down my Flink app to >> only the following, and then it still exhibits long latencies: after the >> second source socket write, it took 90+ milliseconds from data source to >> the socket-front in Flink. I would like to ask for pointers about how to >> investigate the latency issue like this, and in general how to properly >> benchmark Flink latencies. Thank you very much! >> >> >> The main method: >> >> >> public static void main(String[] args) throws Exception { >> final StreamExecutionEnvironment env = StreamExecutionEnvironment.get >> ExecutionEnvironment(); >> DataStream<EventGroup> inEventGroupStream = env.addSource(new >> SocketEventGroupStreamFunction(6065, 512)); >> inEventGroupStream.writeToSocket("DestHost", 6066, new >> MySeGroup<EventGroup>()); >> env.execute("event processing"); >> } >> >> >> where all the custom classes are as follows (for >> serialization/deserialization and socket server functionality): >> >> >> public static class MySeGroup<T> implements >> SerializationSchema<EventGroup> { >> >> @Override >> public byte[] serialize(EventGroup arg0) { >> int tLength = EKFFFTAES.getSizeTimepoint(); >> //Note: report error if tLength != arg0.getT().length >> if (tLength != arg0.getT().length) { >> System.out.println ("Serialization error: Timepoint size >> discrepancy."); >> System.out.println ("tLength = " + tLength); >> System.out.println ("arg0.getT().length = " + arg0.getT().length); >> } >> byte[] buffer = new byte[1 + arg0.getT().length + >> arg0.getP().length]; >> buffer[0] = arg0.type; >> System.arraycopy(arg0.getT(), 0, buffer, 1, tLength); >> System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength, >> arg0.getP().length); >> return buffer; >> } >> } >> >> public static class Event extends SimpleImmutableEntry<byte[],byte[]> { >> >> Event(byte[] timestamp, byte[] payload){ >> super(timestamp, payload); >> } >> public byte[] getT() { // get the timestamp >> return getKey(); >> } >> public byte[] getP() { // get the payload >> return getValue(); >> } >> } >> >> public static class EventGroup extends Event { >> public byte type; >> EventGroup(byte type, byte[] timestamp, byte[] payload){ >> super(timestamp, payload); >> this.type = type; >> } >> } >> >> >> public static class SocketEventGroupStreamFunction implements >> SourceFunction<EventGroup> { >> >> private transient ServerSocket serverSocket; >> private int serverPort; >> private int dataLength; >> private byte[] inbuf; >> private byte[] timestamp; >> private byte[] payload; >> private int tLength = EKFFFTAES.getSizeTimepoint(); >> private volatile boolean isRunning = true; >> >> public SocketEventGroupStreamFunction(int port, int length) { >> serverPort = port; >> dataLength = length; >> inbuf = new byte[1 + dataLength + tLength]; >> timestamp = new byte[tLength]; >> payload = new byte[dataLength]; >> } >> >> @Override >> public void run(SourceContext<EventGroup> ctx) throws Exception { >> while(isRunning) { >> serverSocket = new ServerSocket(serverPort, 100, >> InetAddress.getByName("192.168.1.13")); >> serverSocket.setSoTimeout(1000000); >> System.out.println("Waiting for incoming connections on port " + >> serverSocket.getLocalPort() + "..."); >> Socket server = serverSocket.accept(); >> >> System.out.println("Just connected to " + >> server.getRemoteSocketAddress()); >> DataInputStream in = new DataInputStream(server.getInpu >> tStream()); >> >> while(isRunning) { >> in.readFully(inbuf, 0, inbuf.length); >> System.arraycopy(inbuf, 1, timestamp, 0, tLength); >> System.arraycopy(inbuf, 1+tLength, payload, 0, dataLength); >> >> System.out.print("Got an event " + inbuf[0] + ": "); >> displayElapsedTime(timestamp); >> >> ctx.collect(new EventGroup(inbuf[0], timestamp, payload)); >> } >> } >> } >> >> @Override >> public void cancel() { >> isRunning = false; >> ServerSocket theSocket = this.serverSocket; >> if (theSocket != null) { >> try { >> theSocket.close(); >> }catch(SocketTimeoutException s) { >> System.out.println("Socket timed out!"); >> }catch(IOException e) { >> e.printStackTrace(); >> } >> } >> } >> } >> >> >> and finally, EKFFFTAES is my cpp library implementing the timestamping >> facility: >> >> >> int timePointLength = sizeof(std::chrono::system_clock::time_point); >> >> JNIEXPORT jint JNICALL Java_eventProcessing_EKFFFTAES_getSizeTimepoint >> (JNIEnv *, jclass) >> { >> return ::timePointLength; >> } >> >> JNIEXPORT void JNICALL Java_eventProcessing_EKFFFTAES_displayElapsedTime >> (JNIEnv *env, jclass, jbyteArray inArray) >> { >> std::chrono::system_clock::time_point end = >> std::chrono::system_clock::now(); >> jbyte *inCArray = env->GetByteArrayElements(inArray, NULL); >> std::chrono::system_clock::time_point start; >> std::memcpy (&start, inCArray, ::timePointLength); >> std::cout << std::chrono::duration_cast<std::chrono::microseconds>(end >> - start).count() << std::endl; >> } >> >> >> Thank you, >> >> Chao >> >> >> On 08/07/2017 03:20 PM, Chao Wang wrote: >> >>> Hi, >>> >>> I have been trying to benchmark the end-to-end latency of a Flink 1.3.1 >>> application, but got confused regarding the amount of time spent in Flink. >>> In my setting, data source and data sink dwell in separated machines, like >>> the following topology: >>> >>> Machine 1 Machine 2 >>> Machine 3 >>> data source (via a socket client) -> Flink -> data sink (via a >>> socket server) >>> >>> I observed 200-400 milliseconds end-to-end latency, while the execution >>> time of my stream transformations took no more than two milliseconds, and >>> the socket-only networking latency between machines is no more than one >>> millisecond, and I used ptpd so that the clock offset between machines were >>> also no more than one millisecond. >>> >>> Question: What took those hundreds of milliseconds? >>> >>> Here are the details of my setting and my observation so far: >>> >>> On Machine 2, I implemented a socket server as a data source to Flink >>> (by implementing SourceFunction), and I splited the incoming stream into >>> several streams (by SplitStream) for some transformations (implementing >>> MapFuction and CoFlatMapFunction), where the results were fed to socket >>> (using writeToSocket). I used c++11's chrono time library (through JNI) to >>> take timestamps and determine the elapsed time, and I have verified that >>> the overhead of timestamping this way is no more than one millisecond. >>> >>> I observed that for the four consecutive writes from Machine 1, with the >>> time between two writes no more than 0.3 milliseconds, on Machine 2 Flink >>> got the first write in 0.2 milliseconds, but then it took 90 milliseconds >>> for Flink to get the next write, and another 4 milliseconds for the third >>> write, and yet another 4 milliseconds for the fourth write. >>> >>> And then it took more than 70 milliseconds before Flink started >>> processing my plan's first stream transformation. And after my last >>> transformation, it took more than 70 milliseconds before the result was >>> received at Machine 3. >>> >>> >>> Thank you, >>> >>> Chao >>> >>> >>> >> > > >