Hey

Another approach without a dedicated connector, is to write the connector 
yourself.


  1.  Create a class that handles TCP/IP messages
  2.  Create Async Stream [1] and use that class to get the TCP/IP Messages
  3.  Optional, you can also combine Mono[2] for better performance when 
retrieving the messages.

Example

public class AsyncTcpIPStream<IN> extends RichAsyncFunction<IN , TcpIpMessage> {
  private TcpIpHandler tcpIpHandler;

    public AsyncTcpIPStream(final TcpIpHandler tcpIpHandler) {
        this.tcpIpHandler = tcpIpHandler;
    }

    @Override
    public void asyncInvoke(IN input, ResultFuture<Tuple2<IN, TcpIpMessage>> 
resultFuture) throws Exception {
        Executors.directExecutor().execute(() -> 
getTcpIpMessage()).subscribe(tcpIpMessage -> {
            Tuple2<T, TcpIpMessage> output = Tuple2.of(input, tcpIpMessage);
            resultFuture.complete(Collections.singletonList(output));
        });

    }

    private Mono<TcpIpMessage> getTcpIpMessages() {
       return tcpIpHandler.getMessages();
    }
}

getMessages() would be

public Mono<TcpIpMessage> getMessages() {
 .
 .
 .
}



[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
[2] 
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html


Tamir.



________________________________
From: Caizhi Weng <tsreape...@gmail.com>
Sent: Friday, November 19, 2021 3:27 AM
To: Robert Cullen <cinquate...@gmail.com>
Cc: user <user@flink.apache.org>
Subject: Re: Connector for TCP/IP messages


EXTERNAL EMAIL


Hi!

We have a SocketSourceFunction in the flink-examples-table module. Note that 
this is only an example and cannot be used in production. See its java doc for 
more information. If you want a production-ready socket source you can modify 
that and implement your own.

Robert Cullen <cinquate...@gmail.com<mailto:cinquate...@gmail.com>> 
于2021年11月19日周五 上午12:02写道:
Does flink have a connector for TCP/IP messages?

--
Robert Cullen
240-475-4490

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Reply via email to