Hi George,
I suspect issuing a read operation for every 68 bytes incurs too much
overhead to perform as you would like it to. Instead, create a bigger
buffer (64k?) and extract single events from sub-regions of this buffer
instead.
Please note, however, that then the first buffer will only be processed
when this method returns (the details depend on the underlying channel
[1]). This is a trade-off between latency and throughput at some point.
If you set non-blocking mode for your channels, you will always get what
the channel has available and continue immediately. You can set this up
via this, for example:

==========
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://jenkov.com";, 80));

while(! socketChannel.finishConnect() ){
    //wait, or do something else...
}
==========


Nico

[1]
https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer)

On 15/01/18 13:19, George Theodorakis wrote:
> Hello,
> 
> I am trying to separate the logic of my application by generating and
> processing data in different physical engines. 
> 
> I have created my custom socket source class:
> 
> class SocketSourceFunction extends SourceFunction[Event2]{
>       @volatile private var isRunning:Boolean = true;
>       @transient private var serverSocket: ServerSocketChannel = null; 
> 
>       override def run(ctx: SourceContext[Event2]) = {
>   val hostname = "localhost"
>   val port = 6667
>           println("listening:" + port)                
>   val server = ServerSocketChannel.open();
>   server.bind(new InetSocketAddress (hostname, port));    
>   var buffer = ByteBuffer.allocate (68);
>   val des = new EventDeSerializer2()
>       
>   while (isRunning) {
>             println("waiting...")            
>             var socketChannel = server.accept();
> 
>      if (socketChannel != null){
>                println("accept:" + socketChannel)
>                while (true) {
>     var bytes = 0;
>     bytes = socketChannel.read(buffer)
>     if( bytes > 0) {
>     if (!buffer.hasRemaining()) {
>     buffer.rewind()
>     var event: Event2 = des.deserialize(buffer.array())
>     ctx.collect(event)
>     buffer.clear()
>     }
>     }
>                      }
> }
>           }          
>       }
> 
>       override def cancel() = {
>         isRunning = false;
>         val socket = this.serverSocket; 
>         if (socket != null) { 
>           try { 
>             socket.close(); 
>            }catch { case e: Throwable => {  
>              System.err.println(String.format("error: %s", e.getMessage()));
>         e.printStackTrace();
>         System.exit(1);
>              }
>            }
>          } 
>       }
> }
> 
> I am sending data with either raw sockets using ByteBuffers or with a
> Flink generator (serializing my Events and using writeToSocket()
> method). However, in both cases, I am experiencing less than 10x
> throughput in comparison to in-memory generation, even when using
> a 10gbit connection (the throughput is much lower).
> 
> Is there any obvious defect in my implementation?
> 
> Thank you in advance,
> George

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to