Hi Jose, I've figured out the issue. The bug is actually in your code.
The problem is that you are using an _external_ event loop, and if you do so, you are responsible for re-triggering MHD_run() if you do anything that might change the state of an MHD connection. This (mostly) applies to MHD_resume_connection() -- but also to closing an upgraded connection. Without this modification, MHD does close the connection on the _next_ request, so if you would connect with another client, MHD will then close the connection. This is the only way to fix this, as when MHD is not 'run', it cannot act. And if you are controlling the external event loop, you are responsible for triggering MHD 'when necessary'. I've attached a modified version of your code that adds the necessary signalling logic to your poll()-loop. (Using eventfd(); non-portable. Use a pipe() to do this in a more portable way.) Happy hacking! Christian On 12/10/20 4:08 PM, José Bollo wrote: > Hello, > > My code uses LMHD embedded with its EPOLL mechanism. Part of that code > deals with upgrading to websocket. It then call somewhere: > > response = MHD_create_response_for_upgrade( > upgrade_to_websocket, memo); > > and the callback function upgrade_to_websocket looks as here below: > > void upgrade_to_websocket( > void *cls, > struct MHD_Connection *connection, > void *con_cls, > const char *extra_in, > size_t extra_in_size, > MHD_socket sock, > struct MHD_UpgradeResponseHandle *urh > ) { > struct memo *memo = cls; > struct ws *ws = ws_create(sock, memo, close_websocket, urh); > if (ws == NULL) close_websocket(urh); > } > > void close_websocket(struct MHD_UpgradeResponseHandle *urh) { > MHD_upgrade_action (urh, MHD_UPGRADE_ACTION_CLOSE); > } > > Thank you for your attention until here. So far, so good. > > The issue now: when the functiuon ws_create returns NULL, the program > returns to some polling and wait for an events BUT DOES NOT CLOSE THE > SOCKET, leading to starvation of the client. > > I guess that calling some function after calling MHD_upgrade_action > (urh, MHD_UPGRADE_ACTION_CLOSE) could unlock the situation by > performing correct close. Though the called function should not be > MHD_run because it dispatch events, what is not expected here. > > I join a sample demo. When I connect on websocket on it, the client > starves. I recorded and joined the output of strace. > > Best regards > José Bollo > >
#include <stdlib.h> #include <assert.h> #include <stdio.h> #include <stdint.h> #include <string.h> #include <errno.h> #include <fcntl.h> #include <poll.h> #include <sys/types.h> #include <microhttpd.h> #include <sys/eventfd.h> static int efd; /**************** SHA ****************************/ #define SHA1_DIGEST_LENGTH 20 /* The SHA1 structure: */ typedef struct { uint32_t state[5]; uint64_t length; union { uint8_t buffer[64]; uint32_t blocks[16]; }; } SHA1_t; #define rol(value, bits) (((value) << (bits)) | ((value) >> (32 - (bits)))) /* blk0() and blk() perform the initial expand. */ /* I got the idea of expanding during the round function from SSLeay */ #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ # define blk0(i) (block[i] = (rol (block[i],24) & (uint32_t) 0xFF00FF00) \ | (rol (block[i],8) & (uint32_t) 0x00FF00FF)) #elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ # define blk0(i) block[i] #else # error "unsupported byte order" #endif #define blk(i) (block[i & 15] = rol (block[(i + 13) & 15] ^ block[(i + 8) & 15] \ ^ block[(i + 2) & 15] ^ block[i & 15],1)) /* (R0+R1), R2, R3, R4 are the different operations used in SHA1 */ #define R0(v,w,x,y,z,i) z += ((w & (x ^ y)) ^ y) + blk0 (i) + 0x5A827999 + rol ( \ v,5); w = rol (w,30); #define R1(v,w,x,y,z,i) z += ((w & (x ^ y)) ^ y) + blk (i) + 0x5A827999 + rol ( \ v,5); w = rol (w,30); #define R2(v,w,x,y,z,i) z += (w ^ x ^ y) + blk (i) + 0x6ED9EBA1 + rol (v,5); w = \ rol (w,30); #define R3(v,w,x,y,z,i) z += (((w | x) & y) | (w & x)) + blk (i) + 0x8F1BBCDC \ + rol (v,5); w = rol (w,30); #define R4(v,w,x,y,z,i) z += (w ^ x ^ y) + blk (i) + 0xCA62C1D6 + rol (v,5); w = \ rol (w,30); #define WIPE 1 /* Hash a single 512-bit block. This is the core of the algorithm. */ static void transform (uint32_t state[5], uint32_t block[16]) { uint32_t a, b, c, d, e; /* Copy context->state[] to working vars */ a = state[0]; b = state[1]; c = state[2]; d = state[3]; e = state[4]; /* 4 rounds of 20 operations each. Loop unrolled. */ R0 (a,b,c,d,e, 0); R0 (e,a,b,c,d, 1); R0 (d,e,a,b,c, 2); R0 (c,d,e,a,b, 3); R0 (b,c,d,e,a, 4); R0 (a,b,c,d,e, 5); R0 (e,a,b,c,d, 6); R0 (d,e,a,b,c, 7); R0 (c,d,e,a,b, 8); R0 (b,c,d,e,a, 9); R0 (a,b,c,d,e,10); R0 (e,a,b,c,d,11); R0 (d,e,a,b,c,12); R0 (c,d,e,a,b,13); R0 (b,c,d,e,a,14); R0 (a,b,c,d,e,15); R1 (e,a,b,c,d,16); R1 (d,e,a,b,c,17); R1 (c,d,e,a,b,18); R1 (b,c,d,e,a,19); R2 (a,b,c,d,e,20); R2 (e,a,b,c,d,21); R2 (d,e,a,b,c,22); R2 (c,d,e,a,b,23); R2 (b,c,d,e,a,24); R2 (a,b,c,d,e,25); R2 (e,a,b,c,d,26); R2 (d,e,a,b,c,27); R2 (c,d,e,a,b,28); R2 (b,c,d,e,a,29); R2 (a,b,c,d,e,30); R2 (e,a,b,c,d,31); R2 (d,e,a,b,c,32); R2 (c,d,e,a,b,33); R2 (b,c,d,e,a,34); R2 (a,b,c,d,e,35); R2 (e,a,b,c,d,36); R2 (d,e,a,b,c,37); R2 (c,d,e,a,b,38); R2 (b,c,d,e,a,39); R3 (a,b,c,d,e,40); R3 (e,a,b,c,d,41); R3 (d,e,a,b,c,42); R3 (c,d,e,a,b,43); R3 (b,c,d,e,a,44); R3 (a,b,c,d,e,45); R3 (e,a,b,c,d,46); R3 (d,e,a,b,c,47); R3 (c,d,e,a,b,48); R3 (b,c,d,e,a,49); R3 (a,b,c,d,e,50); R3 (e,a,b,c,d,51); R3 (d,e,a,b,c,52); R3 (c,d,e,a,b,53); R3 (b,c,d,e,a,54); R3 (a,b,c,d,e,55); R3 (e,a,b,c,d,56); R3 (d,e,a,b,c,57); R3 (c,d,e,a,b,58); R3 (b,c,d,e,a,59); R4 (a,b,c,d,e,60); R4 (e,a,b,c,d,61); R4 (d,e,a,b,c,62); R4 (c,d,e,a,b,63); R4 (b,c,d,e,a,64); R4 (a,b,c,d,e,65); R4 (e,a,b,c,d,66); R4 (d,e,a,b,c,67); R4 (c,d,e,a,b,68); R4 (b,c,d,e,a,69); R4 (a,b,c,d,e,70); R4 (e,a,b,c,d,71); R4 (d,e,a,b,c,72); R4 (c,d,e,a,b,73); R4 (b,c,d,e,a,74); R4 (a,b,c,d,e,75); R4 (e,a,b,c,d,76); R4 (d,e,a,b,c,77); R4 (c,d,e,a,b,78); R4 (b,c,d,e,a,79); /* Add the working vars back into context.state[] */ state[0] += a; state[1] += b; state[2] += c; state[3] += d; state[4] += e; #if WIPE /* Wipe variables */ a = b = c = d = e = 0; #endif } /* SHA1_Init - Initialize new context */ void SHA1_init (SHA1_t*context) { /* SHA1 initialization constants */ context->state[0] = 0x67452301; context->state[1] = 0xEFCDAB89; context->state[2] = 0x98BADCFE; context->state[3] = 0x10325476; context->state[4] = 0xC3D2E1F0; context->length = 0; } /* Run your data through this. */ void SHA1_update (SHA1_t *context, const void *buffer, size_t len) { size_t i, j; const uint8_t *data = buffer; if (len) { j = (size_t) (context->length & 63); context->length += (uint64_t) len; if (j + len < 64) memcpy (&context->buffer[j], data, len); else { i = 64 - j; memcpy (&context->buffer[j], data, i); transform (context->state, context->blocks); for ( ; i + 64 <= len; i += 64) { memcpy (context->buffer, &data[i], 64); transform (context->state, context->blocks); } if (len - i) memcpy (context->buffer, &data[i], len - i); } } } /* Add padding and return the message digest. */ void SHA1_final (SHA1_t *context, uint8_t digest[SHA1_DIGEST_LENGTH]) { uint32_t i; uint64_t len; uint8_t finalcount[8], c128 = 0200, c0 = 0; len = context->length; SHA1_update (context, &c128, 1); while ((context->length & 63) != 56) SHA1_update (context, &c0, 1); /* Should cause a transform() */ len <<= 3; for (i = 0; i < 8; i++) finalcount[i] = (uint8_t) ((len >> ((7 - i) << 3)) & 255); SHA1_update (context, finalcount, 8); /* Get the result */ for (i = 0; i < SHA1_DIGEST_LENGTH; i++) { digest[i] = (uint8_t) ((context->state[i >> 2] >> ((3 - (i & 3)) << 3)) & 255); } #if WIPE /* Wipe variables */ memset (&finalcount, 0, sizeof finalcount); memset (context, 0, sizeof *context); #endif } /**************** WebSocket connection upgrade ****************************/ static const char websocket_s[] = "websocket"; static const char sec_websocket_key_s[] = "Sec-WebSocket-Key"; static const char sec_websocket_version_s[] = "Sec-WebSocket-Version"; static const char sec_websocket_accept_s[] = "Sec-WebSocket-Accept"; static const char sec_websocket_protocol_s[] = "Sec-WebSocket-Protocol"; static const char websocket_guid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; static void enc64 (unsigned char *in, char *out) { static const char tob64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz" "0123456789+/"; out[0] = tob64[in[0] >> 2]; out[1] = tob64[((in[0] & 0x03) << 4) | ((in[1] & 0xf0) >> 4)]; out[2] = tob64[((in[1] & 0x0f) << 2) | ((in[2] & 0xc0) >> 6)]; out[3] = tob64[in[2] & 0x3f]; } static void make_accept_value (const char *key, char result[29]) { SHA1_t sha1; unsigned char md[SHA1_DIGEST_LENGTH + 1]; size_t len = strlen (key); char *buffer = alloca (len + sizeof websocket_guid - 1); memcpy (buffer, key, len); memcpy (buffer + len, websocket_guid, sizeof websocket_guid - 1); SHA1_init (&sha1); SHA1_update (&sha1, buffer, len + sizeof websocket_guid - 1); SHA1_final (&sha1, md); assert (SHA1_DIGEST_LENGTH == 20); md[20] = 0; enc64 (&md[0], &result[0]); enc64 (&md[3], &result[4]); enc64 (&md[6], &result[8]); enc64 (&md[9], &result[12]); enc64 (&md[12], &result[16]); enc64 (&md[15], &result[20]); enc64 (&md[18], &result[24]); result[27] = '='; result[28] = 0; } static const char vseparators[] = " \t,"; static int headerhas (const char *header, const char *needle) { size_t len, n; n = strlen (needle); for (;;) { header += strspn (header, vseparators); if (! *header) return 0; len = strcspn (header, vseparators); if ((n == len) && (0 == strncasecmp (needle, header, n))) return 1; header += len; } } static void upgrade_to_websocket ( void *cls, struct MHD_Connection *connection, void *con_cls, const char *extra_in, size_t extra_in_size, MHD_socket sock, struct MHD_UpgradeResponseHandle *urh) { static long long s = 1; MHD_upgrade_action (urh, MHD_UPGRADE_ACTION_CLOSE); write (efd, &s, sizeof (s)); } static int check_websocket_upgrade ( struct MHD_Connection *con ) { struct MHD_Response *response; const char *connection, *upgrade, *key, *version, *protocols; char acceptval[29]; int vernum; /* is an upgrade to websocket ? */ upgrade = MHD_lookup_connection_value (con, MHD_HEADER_KIND, MHD_HTTP_HEADER_UPGRADE); if ((upgrade == NULL) || strcasecmp (upgrade, websocket_s)) return 0; /* is a connection for upgrade ? */ connection = MHD_lookup_connection_value (con, MHD_HEADER_KIND, MHD_HTTP_HEADER_CONNECTION); if ((connection == NULL) || ! headerhas (connection, MHD_HTTP_HEADER_UPGRADE)) return 0; /* has a key and a version ? */ key = MHD_lookup_connection_value (con, MHD_HEADER_KIND, sec_websocket_key_s); version = MHD_lookup_connection_value (con, MHD_HEADER_KIND, sec_websocket_version_s); if ((key == NULL) || (version == NULL)) return 0; /* is a supported version ? */ vernum = atoi (version); if (vernum != 13) { response = MHD_create_response_from_buffer (0, NULL, MHD_RESPMEM_PERSISTENT); MHD_add_response_header (response, sec_websocket_version_s, "13"); MHD_queue_response (con, MHD_HTTP_UPGRADE_REQUIRED, response); MHD_destroy_response (response); return 1; } /* is the protocol supported ? */ protocols = MHD_lookup_connection_value (con, MHD_HEADER_KIND, sec_websocket_protocol_s); /* send the accept connection */ response = MHD_create_response_for_upgrade (upgrade_to_websocket, NULL); make_accept_value (key, acceptval); MHD_add_response_header (response, sec_websocket_accept_s, acceptval); MHD_add_response_header (response, sec_websocket_protocol_s, protocols); MHD_add_response_header (response, MHD_HTTP_HEADER_UPGRADE, websocket_s); MHD_queue_response (con, MHD_HTTP_SWITCHING_PROTOCOLS, response); MHD_destroy_response (response); return 1; } static enum MHD_Result access_handler ( void *cls, struct MHD_Connection *con, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr ) { struct MHD_Response *response; if (check_websocket_upgrade (con)) return MHD_YES; response = MHD_create_response_from_buffer (0, NULL, MHD_RESPMEM_PERSISTENT); MHD_queue_response (con, MHD_HTTP_FORBIDDEN, response); MHD_destroy_response (response); return MHD_NO; } int main (int argc, char *const *argv) { const union MHD_DaemonInfo *di; struct MHD_Daemon *d; uint16_t port; struct pollfd pfd[2]; MHD_UNSIGNED_LONG_LONG to; int ret; int tox; if (argc != 2) { printf ("%s PORT\n", argv[0]); return 1; } port = atoi (argv[1]); efd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); d = MHD_start_daemon ( MHD_USE_EPOLL | MHD_ALLOW_UPGRADE | MHD_USE_TCP_FASTOPEN | MHD_USE_DEBUG, port, /* port */ NULL, NULL, /* Tcp Accept call back + extra attribute */ access_handler, NULL, /* Http Request Call back + extra attribute */ MHD_OPTION_END); /* options-end */ if (NULL == d) return 1; di = MHD_get_daemon_info (d, MHD_DAEMON_INFO_EPOLL_FD); if (di == NULL) { MHD_stop_daemon (d); return 1; } pfd[0].fd = di->epoll_fd; pfd[0].events = POLLIN; pfd[1].fd = efd; pfd[1].events = POLLIN; tox = -1; while (-1 != (ret = poll (pfd, 2, tox))) { if (0 != pfd[1].revents) { long long d; read (efd, &d, sizeof (d)); } MHD_run (d); if (MHD_get_timeout (d, &to) == MHD_YES) tox = to; else tox = -1; } MHD_stop_daemon (d); close (efd); return 0; }
signature.asc
Description: OpenPGP digital signature