--- based, on Domink's patch, but with the following changes:
- factor out code into separate function accept_connections() - no select with shutdown future (no needed) - remove sender2.send_timeout() - not sure why this was there? - restict number of spawned tasks Seems to work, but I get many handshake errors when connetion with the GUI: > https handshake failed - the handshake failed: unexpected EOF This is because of pve status ping (Thomas will fix that in pve) But I am not sure why I get the following? > https handshakeX failed - the handshake failed: error:14094416:SSL > routines:ssl3_read_bytes:sslv3 alert certificate > unknown:../ssl/record/rec_layer_s3.c:1544:SSL alert number 46 src/bin/proxmox-backup-proxy.rs | 81 ++++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 17 deletions(-) diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 78ea4d53..1f0c16b4 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc}; +use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; use std::path::{Path, PathBuf}; use std::os::unix::io::AsRawFd; @@ -116,25 +116,12 @@ async fn run() -> Result<(), Error> { let server = daemon::create_daemon( ([0,0,0,0,0,0,0,0], 8007).into(), |listener, ready| { - let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener) - .map_err(Error::from) - .try_filter_map(move |(sock, _addr)| { - let acceptor = Arc::clone(&acceptor); - async move { - sock.set_nodelay(true).unwrap(); - - let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); - Ok(tokio_openssl::accept(&acceptor, sock) - .await - .ok() // handshake errors aren't be fatal, so return None to filter - ) - } - }); - let connections = proxmox_backup::tools::async_io::HyperAccept(connections); + let connections = accept_connections(listener, acceptor); + let connections = hyper::server::accept::from_stream(connections); Ok(ready - .and_then(|_| hyper::Server::builder(connections) + .and_then(|_| hyper::Server::builder(connections) .serve(rest_server) .with_graceful_shutdown(server::shutdown_future()) .map_err(Error::from) @@ -170,6 +157,66 @@ async fn run() -> Result<(), Error> { Ok(()) } +fn accept_connections( + mut listener: tokio::net::TcpListener, + acceptor: Arc<openssl::ssl::SslAcceptor>, +) -> tokio::sync::mpsc::Receiver<Result<tokio_openssl::SslStream<tokio::net::TcpStream>, Error>> { + + let (sender, receiver) = tokio::sync::mpsc::channel(100); + + let accept_counter = Arc::new(AtomicUsize::new(0)); + + const MAX_PENDING_ACCEPTS: usize = 100; + + tokio::spawn(async move { + loop { + match listener.accept().await { + Err(err) => { + eprintln!("error accepting tcp connection: {}", err); + } + Ok((sock, _addr)) => { + sock.set_nodelay(true).unwrap(); + let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); + let acceptor = Arc::clone(&acceptor); + let mut sender = sender.clone(); + + if accept_counter.load(Ordering::SeqCst) > MAX_PENDING_ACCEPTS { + eprintln!("connection rejected - to many open connections"); + continue; + } + accept_counter.fetch_add(1, Ordering::SeqCst); + + let accept_counter = accept_counter.clone(); + tokio::spawn(async move { + let accept_future = tokio::time::timeout( + Duration::new(10, 0), tokio_openssl::accept(&acceptor, sock)); + + let result = accept_future.await; + + match result { + Ok(Ok(connection)) => { + if let Err(_) = sender.send(Ok(connection)).await { + eprintln!("detect closed connection channel"); + } + } + Ok(Err(err)) => { + eprintln!("https handshakeX failed - {}", err); + } + Err(_) => { + eprintln!("https handshake timeout"); + } + } + + accept_counter.fetch_sub(1, Ordering::SeqCst); + }); + } + } + } + }); + + receiver +} + fn start_stat_generator() { let abort_future = server::shutdown_future(); let future = Box::pin(run_stat_generator()); -- 2.20.1 _______________________________________________ pve-devel mailing list pve-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel