Hello,
I am trying to separate the logic of my application by generating and
processing data in different physical engines.
I have created my custom socket source class:
class SocketSourceFunction extends SourceFunction[Event2]{
@volatile private var isRunning:Boolean = true;
@transient private var serverSocket: ServerSocketChannel = null;
override def run(ctx: SourceContext[Event2]) = {
val hostname = "localhost"
val port = 6667
println("listening:" + port)
val server = ServerSocketChannel.open();
server.bind(new InetSocketAddress (hostname, port));
var buffer = ByteBuffer.allocate (68);
val des = new EventDeSerializer2()
while (isRunning) {
println("waiting...")
var socketChannel = server.accept();
if (socketChannel != null){
println("accept:" + socketChannel)
while (true) {
var bytes = 0;
bytes = socketChannel.read(buffer)
if( bytes > 0) {
if (!buffer.hasRemaining()) {
buffer.rewind()
var event: Event2 = des.deserialize(buffer.array())
ctx.collect(event)
buffer.clear()
}
}
}
}
}
}
override def cancel() = {
isRunning = false;
val socket = this.serverSocket;
if (socket != null) {
try {
socket.close();
}catch { case e: Throwable => {
System.err.println(String.format("error: %s",
e.getMessage()));
e.printStackTrace();
System.exit(1);
}
}
}
}
}
I am sending data with either raw sockets using ByteBuffers or with a Flink
generator (serializing my Events and using writeToSocket() method).
However, in both cases, I am experiencing less than 10x throughput in
comparison to in-memory generation, even when using a 10gbit connection
(the throughput is much lower).
Is there any obvious defect in my implementation?
Thank you in advance,
George