Hi George, I suspect issuing a read operation for every 68 bytes incurs too much overhead to perform as you would like it to. Instead, create a bigger buffer (64k?) and extract single events from sub-regions of this buffer instead. Please note, however, that then the first buffer will only be processed when this method returns (the details depend on the underlying channel [1]). This is a trade-off between latency and throughput at some point. If you set non-blocking mode for your channels, you will always get what the channel has available and continue immediately. You can set this up via this, for example:
========== socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80)); while(! socketChannel.finishConnect() ){ //wait, or do something else... } ========== Nico [1] https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer) On 15/01/18 13:19, George Theodorakis wrote: > Hello, > > I am trying to separate the logic of my application by generating and > processing data in different physical engines. > > I have created my custom socket source class: > > class SocketSourceFunction extends SourceFunction[Event2]{ > @volatile private var isRunning:Boolean = true; > @transient private var serverSocket: ServerSocketChannel = null; > > override def run(ctx: SourceContext[Event2]) = { > val hostname = "localhost" > val port = 6667 > println("listening:" + port) > val server = ServerSocketChannel.open(); > server.bind(new InetSocketAddress (hostname, port)); > var buffer = ByteBuffer.allocate (68); > val des = new EventDeSerializer2() > > while (isRunning) { > println("waiting...") > var socketChannel = server.accept(); > > if (socketChannel != null){ > println("accept:" + socketChannel) > while (true) { > var bytes = 0; > bytes = socketChannel.read(buffer) > if( bytes > 0) { > if (!buffer.hasRemaining()) { > buffer.rewind() > var event: Event2 = des.deserialize(buffer.array()) > ctx.collect(event) > buffer.clear() > } > } > } > } > } > } > > override def cancel() = { > isRunning = false; > val socket = this.serverSocket; > if (socket != null) { > try { > socket.close(); > }catch { case e: Throwable => { > System.err.println(String.format("error: %s", e.getMessage())); > e.printStackTrace(); > System.exit(1); > } > } > } > } > } > > I am sending data with either raw sockets using ByteBuffers or with a > Flink generator (serializing my Events and using writeToSocket() > method). However, in both cases, I am experiencing less than 10x > throughput in comparison to in-memory generation, even when using > a 10gbit connection (the throughput is much lower). > > Is there any obvious defect in my implementation? > > Thank you in advance, > George
signature.asc
Description: OpenPGP digital signature