Great! Thanks for reporting back :-)

2017-08-09 22:52 GMT+02:00 Chao Wang <chaow...@wustl.edu>:

> It seems that the observed long latencies were due to certain one-time
> internal mechanism that only occurred after Flink has received the first
> message. Based on my measurement that mechanism took around 100 ms.
>
> Now I setup my application the following way, and I observed that the
> end-to-end latency is similar to that of using raw sockets (off by less
> than 1 ms): Send the first message to Flink and then wait for 110 ms before
> sending the second message. And for the subsequent sends we can remove the
> 110 ms wait.
>
> Chao
>
>
> On 08/09/2017 10:57 AM, Chao Wang wrote:
>
> Thank you, Fabian.
>
> Maybe there's also some buffers sit between data source and the first
> operator? I observed that in my implementation of SourceFunction (using a
> socket server, as listed in the previous email), for receiving two
> messages, in terms of event time, it took 0.2 ms before the SourceFunction
> receives the first message but then it took 97 ms to receive the second
> message. The interval between the two sends is 0.07 ms at the sending side,
> which is a java socket client.
>
> Or could it be that there is a timeout setting for scheduling data source
> in Flink?
>
>
> Thanks,
>
> Chao
>
> On 08/08/2017 02:58 AM, Fabian Hueske wrote:
>
> One pointer is the StreamExecutionEnvironment.setBufferTimeout()
> parameter.
> Flink's network stack collects records in buffers to send them over the
> network. A buffer is sent when it is completely filled or after a
> configurable timeout.
> So if your program does not process many records, these records might "get
> stuck" in the buffers and be emitted after the timeout flushes the buffer.
> The default timeout is 100ms. Try to reduce it.
>
> Best, Fabian
>
> 2017-08-08 1:06 GMT+02:00 Chao Wang <chaow...@wustl.edu>:
>
>> Following the original post, I've tried stripping down my Flink app to
>> only the following, and then it still exhibits long latencies: after the
>> second source socket write, it took 90+ milliseconds from data source to
>> the socket-front in Flink. I would like to ask for pointers about how to
>> investigate the latency issue like this, and in general how to properly
>> benchmark Flink latencies. Thank you very much!
>>
>>
>> The main method:
>>
>>
>>   public static void main(String[] args) throws Exception {
>>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>>     DataStream<EventGroup> inEventGroupStream = env.addSource(new
>> SocketEventGroupStreamFunction(6065, 512));
>>     inEventGroupStream.writeToSocket("DestHost", 6066, new
>> MySeGroup<EventGroup>());
>>     env.execute("event processing");
>>  }
>>
>>
>> where all the custom classes are as follows (for
>> serialization/deserialization and socket server functionality):
>>
>>
>>   public static class MySeGroup<T> implements
>> SerializationSchema<EventGroup> {
>>
>>     @Override
>>     public byte[] serialize(EventGroup arg0) {
>>       int tLength = EKFFFTAES.getSizeTimepoint();
>>       //Note: report error if tLength != arg0.getT().length
>>       if (tLength != arg0.getT().length) {
>>         System.out.println ("Serialization error: Timepoint size
>> discrepancy.");
>>         System.out.println ("tLength = " + tLength);
>>         System.out.println ("arg0.getT().length = " + arg0.getT().length);
>>       }
>>       byte[] buffer = new byte[1 + arg0.getT().length +
>> arg0.getP().length];
>>       buffer[0] = arg0.type;
>>       System.arraycopy(arg0.getT(), 0, buffer, 1, tLength);
>>       System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength,
>> arg0.getP().length);
>>       return buffer;
>>     }
>>   }
>>
>>   public static class Event extends SimpleImmutableEntry<byte[],byte[]> {
>>
>>     Event(byte[] timestamp, byte[] payload){
>>       super(timestamp, payload);
>>     }
>>     public byte[] getT() { // get the timestamp
>>       return getKey();
>>     }
>>     public byte[] getP() { // get the payload
>>       return getValue();
>>     }
>>   }
>>
>>   public static class EventGroup extends Event {
>>     public byte type;
>>     EventGroup(byte type, byte[] timestamp, byte[] payload){
>>       super(timestamp, payload);
>>       this.type = type;
>>     }
>>   }
>>
>>
>>   public static class SocketEventGroupStreamFunction implements
>> SourceFunction<EventGroup> {
>>
>>     private transient ServerSocket serverSocket;
>>     private int serverPort;
>>     private int dataLength;
>>     private byte[] inbuf;
>>     private byte[] timestamp;
>>     private byte[] payload;
>>     private int tLength = EKFFFTAES.getSizeTimepoint();
>>     private volatile boolean isRunning = true;
>>
>>     public SocketEventGroupStreamFunction(int port, int length) {
>>       serverPort = port;
>>       dataLength = length;
>>       inbuf = new byte[1 + dataLength + tLength];
>>       timestamp = new byte[tLength];
>>       payload = new byte[dataLength];
>>     }
>>
>>     @Override
>>     public void run(SourceContext<EventGroup> ctx) throws Exception {
>>       while(isRunning) {
>>         serverSocket = new ServerSocket(serverPort, 100,
>> InetAddress.getByName("192.168.1.13"));
>>         serverSocket.setSoTimeout(1000000);
>>         System.out.println("Waiting for incoming connections on port " +
>>           serverSocket.getLocalPort() + "...");
>>         Socket server = serverSocket.accept();
>>
>>         System.out.println("Just connected to " +
>> server.getRemoteSocketAddress());
>>         DataInputStream in = new DataInputStream(server.getInpu
>> tStream());
>>
>>         while(isRunning) {
>>           in.readFully(inbuf, 0, inbuf.length);
>>           System.arraycopy(inbuf, 1, timestamp, 0, tLength);
>>           System.arraycopy(inbuf, 1+tLength, payload, 0, dataLength);
>>
>>           System.out.print("Got an event " + inbuf[0] + ": ");
>>           displayElapsedTime(timestamp);
>>
>>           ctx.collect(new EventGroup(inbuf[0], timestamp, payload));
>>         }
>>       }
>>     }
>>
>>     @Override
>>     public void cancel() {
>>       isRunning = false;
>>       ServerSocket theSocket = this.serverSocket;
>>       if (theSocket != null) {
>>         try {
>>           theSocket.close();
>>         }catch(SocketTimeoutException s) {
>>           System.out.println("Socket timed out!");
>>         }catch(IOException e) {
>>           e.printStackTrace();
>>         }
>>       }
>>     }
>>   }
>>
>>
>> and finally, EKFFFTAES is my cpp library implementing the timestamping
>> facility:
>>
>>
>> int timePointLength = sizeof(std::chrono::system_clock::time_point);
>>
>> JNIEXPORT jint JNICALL Java_eventProcessing_EKFFFTAES_getSizeTimepoint
>>   (JNIEnv *, jclass)
>> {
>>   return ::timePointLength;
>> }
>>
>> JNIEXPORT void JNICALL Java_eventProcessing_EKFFFTAES_displayElapsedTime
>>   (JNIEnv *env, jclass, jbyteArray inArray)
>> {
>>   std::chrono::system_clock::time_point end =
>>     std::chrono::system_clock::now();
>>   jbyte *inCArray = env->GetByteArrayElements(inArray, NULL);
>>   std::chrono::system_clock::time_point start;
>>   std::memcpy (&start, inCArray, ::timePointLength);
>>   std::cout << std::chrono::duration_cast<std::chrono::microseconds>(end
>> - start).count() << std::endl;
>> }
>>
>>
>> Thank you,
>>
>> Chao
>>
>>
>> On 08/07/2017 03:20 PM, Chao Wang wrote:
>>
>>> Hi,
>>>
>>> I have been trying to benchmark the end-to-end latency of a Flink 1.3.1
>>> application, but got confused regarding the amount of time spent in Flink.
>>> In my setting, data source and data sink dwell in separated machines, like
>>> the following topology:
>>>
>>> Machine 1                                            Machine 2
>>> Machine 3
>>> data source (via a socket client)   ->      Flink ->    data sink (via a
>>> socket server)
>>>
>>> I observed 200-400 milliseconds end-to-end latency, while the execution
>>> time of my stream transformations took no more than two milliseconds, and
>>> the socket-only networking latency between machines is no more than one
>>> millisecond, and I used ptpd so that the clock offset between machines were
>>> also no more than one millisecond.
>>>
>>> Question: What took those hundreds of milliseconds?
>>>
>>> Here are the details of my setting and my observation so far:
>>>
>>> On Machine 2, I implemented a socket server as a data source to Flink
>>> (by implementing SourceFunction), and I splited the incoming stream into
>>> several streams (by SplitStream) for some transformations (implementing
>>> MapFuction and CoFlatMapFunction), where the results were fed to socket
>>> (using writeToSocket). I used c++11's chrono time library (through JNI) to
>>> take timestamps and determine the elapsed time, and I have verified that
>>> the overhead of timestamping this way is no more than one millisecond.
>>>
>>> I observed that for the four consecutive writes from Machine 1, with the
>>> time between two writes no more than 0.3 milliseconds, on Machine 2 Flink
>>> got the first write in 0.2 milliseconds, but then it took 90 milliseconds
>>> for Flink to get the next write, and another 4 milliseconds for the third
>>> write, and yet another 4 milliseconds for the fourth write.
>>>
>>> And then it took more than 70 milliseconds before Flink started
>>> processing my plan's first stream transformation. And after my last
>>> transformation, it took more than 70 milliseconds before the result was
>>> received at Machine 3.
>>>
>>>
>>> Thank you,
>>>
>>> Chao
>>>
>>>
>>>
>>
>
>
>

Reply via email to