One pointer is the StreamExecutionEnvironment.setBufferTimeout()
parameter.
Flink's network stack collects records in buffers to send them over
the network. A buffer is sent when it is completely filled or after a
configurable timeout.
So if your program does not process many records, these records might
"get stuck" in the buffers and be emitted after the timeout flushes
the buffer.
The default timeout is 100ms. Try to reduce it.
Best, Fabian
2017-08-08 1:06 GMT+02:00 Chao Wang <chaow...@wustl.edu
<mailto:chaow...@wustl.edu>>:
Following the original post, I've tried stripping down my Flink
app to only the following, and then it still exhibits long
latencies: after the second source socket write, it took 90+
milliseconds from data source to the socket-front in Flink. I
would like to ask for pointers about how to investigate the
latency issue like this, and in general how to properly benchmark
Flink latencies. Thank you very much!
The main method:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<EventGroup> inEventGroupStream = env.addSource(new
SocketEventGroupStreamFunction(6065, 512));
inEventGroupStream.writeToSocket("DestHost", 6066, new
MySeGroup<EventGroup>());
env.execute("event processing");
}
where all the custom classes are as follows (for
serialization/deserialization and socket server functionality):
public static class MySeGroup<T> implements
SerializationSchema<EventGroup> {
@Override
public byte[] serialize(EventGroup arg0) {
int tLength = EKFFFTAES.getSizeTimepoint();
//Note: report error if tLength != arg0.getT().length
if (tLength != arg0.getT().length) {
System.out.println ("Serialization error: Timepoint size
discrepancy.");
System.out.println ("tLength = " + tLength);
System.out.println ("arg0.getT().length = " +
arg0.getT().length);
}
byte[] buffer = new byte[1 + arg0.getT().length +
arg0.getP().length];
buffer[0] = arg0.type;
System.arraycopy(arg0.getT(), 0, buffer, 1, tLength);
System.arraycopy(arg0.getP(), 0, buffer, 1 + tLength,
arg0.getP().length);
return buffer;
}
}
public static class Event extends
SimpleImmutableEntry<byte[],byte[]> {
Event(byte[] timestamp, byte[] payload){
super(timestamp, payload);
}
public byte[] getT() { // get the timestamp
return getKey();
}
public byte[] getP() { // get the payload
return getValue();
}
}
public static class EventGroup extends Event {
public byte type;
EventGroup(byte type, byte[] timestamp, byte[] payload){
super(timestamp, payload);
this.type = type;
}
}
public static class SocketEventGroupStreamFunction implements
SourceFunction<EventGroup> {
private transient ServerSocket serverSocket;
private int serverPort;
private int dataLength;
private byte[] inbuf;
private byte[] timestamp;
private byte[] payload;
private int tLength = EKFFFTAES.getSizeTimepoint();
private volatile boolean isRunning = true;
public SocketEventGroupStreamFunction(int port, int length) {
serverPort = port;
dataLength = length;
inbuf = new byte[1 + dataLength + tLength];
timestamp = new byte[tLength];
payload = new byte[dataLength];
}
@Override
public void run(SourceContext<EventGroup> ctx) throws Exception {
while(isRunning) {
serverSocket = new ServerSocket(serverPort, 100,
InetAddress.getByName("192.168.1.13"));
serverSocket.setSoTimeout(1000000);
System.out.println("Waiting for incoming connections on
port " +
serverSocket.getLocalPort() + "...");
Socket server = serverSocket.accept();
System.out.println("Just connected to " +
server.getRemoteSocketAddress());
DataInputStream in = new
DataInputStream(server.getInputStream());
while(isRunning) {
in.readFully(inbuf, 0, inbuf.length);
System.arraycopy(inbuf, 1, timestamp, 0, tLength);
System.arraycopy(inbuf, 1+tLength, payload, 0, dataLength);
System.out.print("Got an event " + inbuf[0] + ": ");
displayElapsedTime(timestamp);
ctx.collect(new EventGroup(inbuf[0], timestamp, payload));
}
}
}
@Override
public void cancel() {
isRunning = false;
ServerSocket theSocket = this.serverSocket;
if (theSocket != null) {
try {
theSocket.close();
}catch(SocketTimeoutException s) {
System.out.println("Socket timed out!");
}catch(IOException e) {
e.printStackTrace();
}
}
}
}
and finally, EKFFFTAES is my cpp library implementing the
timestamping facility:
int timePointLength = sizeof(std::chrono::system_clock::time_point);
JNIEXPORT jint JNICALL Java_eventProcessing_EKFFFTAES_getSizeTimepoint
(JNIEnv *, jclass)
{
return ::timePointLength;
}
JNIEXPORT void JNICALL
Java_eventProcessing_EKFFFTAES_displayElapsedTime
(JNIEnv *env, jclass, jbyteArray inArray)
{
std::chrono::system_clock::time_point end =
std::chrono::system_clock::now();
jbyte *inCArray = env->GetByteArrayElements(inArray, NULL);
std::chrono::system_clock::time_point start;
std::memcpy (&start, inCArray, ::timePointLength);
std::cout <<
std::chrono::duration_cast<std::chrono::microseconds>(end -
start).count() << std::endl;
}
Thank you,
Chao
On 08/07/2017 03:20 PM, Chao Wang wrote:
Hi,
I have been trying to benchmark the end-to-end latency of a
Flink 1.3.1 application, but got confused regarding the amount
of time spent in Flink. In my setting, data source and data
sink dwell in separated machines, like the following topology:
Machine 1 Machine 2 Machine 3
data source (via a socket client) -> Flink -> data
sink (via a socket server)
I observed 200-400 milliseconds end-to-end latency, while the
execution time of my stream transformations took no more than
two milliseconds, and the socket-only networking latency
between machines is no more than one millisecond, and I used
ptpd so that the clock offset between machines were also no
more than one millisecond.
Question: What took those hundreds of milliseconds?
Here are the details of my setting and my observation so far:
On Machine 2, I implemented a socket server as a data source
to Flink (by implementing SourceFunction), and I splited the
incoming stream into several streams (by SplitStream) for some
transformations (implementing MapFuction and
CoFlatMapFunction), where the results were fed to socket
(using writeToSocket). I used c++11's chrono time library
(through JNI) to take timestamps and determine the elapsed
time, and I have verified that the overhead of timestamping
this way is no more than one millisecond.
I observed that for the four consecutive writes from Machine
1, with the time between two writes no more than 0.3
milliseconds, on Machine 2 Flink got the first write in 0.2
milliseconds, but then it took 90 milliseconds for Flink to
get the next write, and another 4 milliseconds for the third
write, and yet another 4 milliseconds for the fourth write.
And then it took more than 70 milliseconds before Flink
started processing my plan's first stream transformation. And
after my last transformation, it took more than 70
milliseconds before the result was received at Machine 3.
Thank you,
Chao