Hi Leon,

Abstractly, what you describe should be easy with Cap'n Proto. One client
can send a capability (interface reference) to the server, and the server
can freely send that capability on to some other client. Cap'n Proto will
automatically arrange to proxy messages from one client to the other,
through the server. (Someday, three-party handoff will allow the clients to
form a direct connection to each other... when we get around to
implementing it.)

I am not very familiar with the Python implementation so I'm not sure I can
help debug what specifically is wrong here. Maybe Jacob (cc'd) can help.

-Kenton

On Thu, Oct 14, 2021 at 2:39 AM Leon Wessels <lawess...@gmail.com> wrote:

> Hi,
>
> I have a use case where multiple clients connect to a server. A call from
> a client to the server might require the server to call another client. For
> some functionality a direct call from one client to another would be best.
> For performance reasons some clients will be written in C++. For simplicity
> the server and other clients will be written in Python. Is this possible?
> In my attempt below the client call never returns. In the example client
> calls manager which calls provider.
>
> poc.capnp
> @0xdb7253e3ef44cbf9;
>
> interface Manager {
>     registerService @0 (service :Service) -> ();
>     getService @1 () -> (service :Service);
>     get @2 () -> (val :UInt16);
>
>     interface Service {
>         get @0 () -> (val :UInt16);
>     }
> }
>
> client.py
> #!/usr/bin/python3
>
> import sys
> import capnp
> import poc_capnp
>
>
> def start_client(host):
>     client = capnp.TwoPartyClient(host)
>     ms = client.bootstrap().cast_as(poc_capnp.Manager)
>     print(ms.get().wait())
>     service = ms.getService().wait()
>     print('got service')
>     print(service.get().wait())
>
>
> start_client(sys.argv[1])
>
> provider.py
> #!/usr/bin/python3
>
> import sys
> import capnp
> import poc_capnp
> import asyncio
>
>
> class Provider(poc_capnp.Manager.Service.Server):
>     def __init__(self):
>         self.counter = 0
>
>     def get(self, **kwargs):
>         print(self.counter)
>         self.counter = (self.counter + 1) % 10000
>         return self.counter
>
>
> async def socket_reader(client, reader):
>     while True:
>         try:
>             data = await asyncio.wait_for(reader.read(4096), timeout=1.0)
>             client.write(data)
>         except asyncio.TimeoutError:
>             pass
>
>
> async def socket_writer(client, writer):
>     while True:
>         try:
>             data = await asyncio.wait_for(client.read(4096), timeout=1.0)
>             await writer.write(data.tobytes())
>         except asyncio.TimeoutError:
>             pass
>
>
> async def start_ipc(host):
>     client = capnp.TwoPartyClient()
>     ms = client.bootstrap().cast_as(poc_capnp.Manager)
>
>     reader, writer = await
> asyncio.wait_for(asyncio.open_connection(*host.split(':')), timeout=1.0, )
>     coroutines = [socket_reader(client, reader), socket_writer(client,
> writer)]
>     asyncio.gather(*coroutines, return_exceptions=True)
>
>     await ms.registerService(Provider()).a_wait()
>     while True:
>         await asyncio.sleep(1)
>
>
> asyncio.run(start_ipc(sys.argv[1]))
>
> manager.py
> #!/usr/bin/python3
>
> import sys
> import capnp
> import poc_capnp
> import asyncio
>
>
> g_service = None
>
>
> class ManagerImpl(poc_capnp.Manager.Server):
>     def __init__(self):
>         pass
>
>     def registerService(self, service, **kwargs):
>         global g_service
>         g_service = service
>         print('service registered')
>
>     def getService(self, **kwargs):
>         global g_service
>         print('service retrieved')
>         return g_service
>
>     def get(self, _context, **kwargs):
>         global g_service
>         print('service called')
>         return g_service.get().then(
>             lambda value: setattr(_context.results, "val", value)
>         )
>
>
> # -------------------------------------------------------
>
>
> class Server:
>     async def myreader(self):
>         while self.retry:
>             try:
>                 # Must be a wait_for so we don't block on read()
>                 data = await asyncio.wait_for(
>                     self.reader.read(4096),
>                     timeout=0.1
>                 )
>             except asyncio.TimeoutError:
>                 # print("myreader timeout.")
>                 continue
>             except Exception as err:
>                 print("Unknown myreader err: %s", err)
>                 return False
>             await self.server.write(data)
>         print("myreader done.")
>         return True
>
>     async def mywriter(self):
>         while self.retry:
>             try:
>                 # Must be a wait_for so we don't block on read()
>                 data = await asyncio.wait_for(
>                     self.server.read(4096),
>                     timeout=0.1
>                 )
>                 self.writer.write(data.tobytes())
>             except asyncio.TimeoutError:
>                 # print("mywriter timeout.")
>                 continue
>             except Exception as err:
>                 print("Unknown mywriter err: %s", err)
>                 return False
>         print("mywriter done.")
>         return True
>
>     async def myserver(self, reader, writer):
>         # Start TwoPartyServer using TwoWayPipe (only requires bootstrap)
>         self.server = capnp.TwoPartyServer(bootstrap=ManagerImpl())
>         self.reader = reader
>         self.writer = writer
>         self.retry = True
>
>         # Assemble reader and writer tasks, run in the background
>         coroutines = [self.myreader(), self.mywriter()]
>         tasks = asyncio.gather(*coroutines, return_exceptions=True)
>
>         while True:
>             self.server.poll_once()
>             # Check to see if reader has been sent an eof (disconnect)
>             if self.reader.at_eof():
>                 self.retry = False
>                 print('close connection')
>                 break
>             await asyncio.sleep(0.01)
>
>         # Make wait for reader/writer to finish (prevent possible resource
> leaks)
>         await tasks
>
>
> async def new_connection(reader, writer):
>     print('new connection')
>     server = Server()
>     await server.myserver(reader, writer)
>
>
> async def run_server(host):
>     server = await asyncio.start_server(new_connection, *host.split(':'))
>     print('server started')
>     async with server:
>         await server.serve_forever()
>
>
> asyncio.run(run_server(sys.argv[1]))
>
> --
> You received this message because you are subscribed to the Google Groups
> "Cap'n Proto" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to capnproto+unsubscr...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/capnproto/CALao4Qunk9UfFyPjqncxxvp4_gStev95%2Bjzw8KGL5TLpG0CeFg%40mail.gmail.com
> <https://groups.google.com/d/msgid/capnproto/CALao4Qunk9UfFyPjqncxxvp4_gStev95%2Bjzw8KGL5TLpG0CeFg%40mail.gmail.com?utm_medium=email&utm_source=footer>
> .
>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to capnproto+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/CAJouXQmw8Qsy1tudEzBR6aokkB4LZaZgA0AoQeDkE9bGS_v7zA%40mail.gmail.com.

Reply via email to