Hello,

My code uses LMHD embedded with its EPOLL mechanism. Part of that code
deals with upgrading to websocket. It then call somewhere:

  response = MHD_create_response_for_upgrade(
               upgrade_to_websocket, memo);

and the callback function upgrade_to_websocket looks as here below:

  void upgrade_to_websocket(
             void *cls,
             struct MHD_Connection *connection,
             void *con_cls,
             const char *extra_in,
             size_t extra_in_size,
             MHD_socket sock,
             struct MHD_UpgradeResponseHandle *urh
  ) {
      struct memo *memo = cls;
      struct ws *ws = ws_create(sock, memo, close_websocket, urh);
      if (ws == NULL) close_websocket(urh);
  }

  void close_websocket(struct MHD_UpgradeResponseHandle *urh) {
      MHD_upgrade_action (urh, MHD_UPGRADE_ACTION_CLOSE);
  }

Thank you for your attention until here. So far, so good.

The issue now: when the functiuon ws_create returns NULL, the program
returns to some polling and wait for an events BUT DOES NOT CLOSE THE
SOCKET, leading to starvation of the client.

I guess that calling some function after calling MHD_upgrade_action
(urh, MHD_UPGRADE_ACTION_CLOSE) could unlock the situation by
performing correct close. Though the called function should not be
MHD_run because it dispatch events, what is not expected here.

I join a sample demo. When I connect on websocket on it, the client
starves. I recorded and joined the output of strace.

Best regards
José Bollo


#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <sys/types.h>
#include <microhttpd.h>

/**************** SHA ****************************/

#define SHA1_DIGEST_LENGTH	20

/* The SHA1 structure: */
typedef struct {
	uint32_t   state[5];
	uint64_t   length;
	union {
		uint8_t    buffer[64];
		uint32_t   blocks[16];
	};
} SHA1_t;

#define rol(value, bits) (((value) << (bits)) | ((value) >> (32 - (bits))))

/* blk0() and blk() perform the initial expand. */
/* I got the idea of expanding during the round function from SSLeay */

#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
# define blk0(i) (block[i] = (rol(block[i],24)&(uint32_t)0xFF00FF00) \
	|(rol(block[i],8)&(uint32_t)0x00FF00FF))
#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
# define blk0(i) block[i]
#else
# error "unsupported byte order"
#endif

#define blk(i) (block[i&15] = rol(block[(i+13)&15]^block[(i+8)&15] \
	^block[(i+2)&15]^block[i&15],1))

/* (R0+R1), R2, R3, R4 are the different operations used in SHA1 */
#define R0(v,w,x,y,z,i) z+=((w&(x^y))^y)+blk0(i)+0x5A827999+rol(v,5);w=rol(w,30);
#define R1(v,w,x,y,z,i) z+=((w&(x^y))^y)+blk(i)+0x5A827999+rol(v,5);w=rol(w,30);
#define R2(v,w,x,y,z,i) z+=(w^x^y)+blk(i)+0x6ED9EBA1+rol(v,5);w=rol(w,30);
#define R3(v,w,x,y,z,i) z+=(((w|x)&y)|(w&x))+blk(i)+0x8F1BBCDC+rol(v,5);w=rol(w,30);
#define R4(v,w,x,y,z,i) z+=(w^x^y)+blk(i)+0xCA62C1D6+rol(v,5);w=rol(w,30);
#define WIPE 1

/* Hash a single 512-bit block. This is the core of the algorithm. */
static void transform(uint32_t state[5], uint32_t block[16])
{
	uint32_t a, b, c, d, e;

	/* Copy context->state[] to working vars */
	a = state[0];
	b = state[1];
	c = state[2];
	d = state[3];
	e = state[4];
	/* 4 rounds of 20 operations each. Loop unrolled. */
	R0(a,b,c,d,e, 0); R0(e,a,b,c,d, 1); R0(d,e,a,b,c, 2); R0(c,d,e,a,b, 3);
	R0(b,c,d,e,a, 4); R0(a,b,c,d,e, 5); R0(e,a,b,c,d, 6); R0(d,e,a,b,c, 7);
	R0(c,d,e,a,b, 8); R0(b,c,d,e,a, 9); R0(a,b,c,d,e,10); R0(e,a,b,c,d,11);
	R0(d,e,a,b,c,12); R0(c,d,e,a,b,13); R0(b,c,d,e,a,14); R0(a,b,c,d,e,15);
	R1(e,a,b,c,d,16); R1(d,e,a,b,c,17); R1(c,d,e,a,b,18); R1(b,c,d,e,a,19);
	R2(a,b,c,d,e,20); R2(e,a,b,c,d,21); R2(d,e,a,b,c,22); R2(c,d,e,a,b,23);
	R2(b,c,d,e,a,24); R2(a,b,c,d,e,25); R2(e,a,b,c,d,26); R2(d,e,a,b,c,27);
	R2(c,d,e,a,b,28); R2(b,c,d,e,a,29); R2(a,b,c,d,e,30); R2(e,a,b,c,d,31);
	R2(d,e,a,b,c,32); R2(c,d,e,a,b,33); R2(b,c,d,e,a,34); R2(a,b,c,d,e,35);
	R2(e,a,b,c,d,36); R2(d,e,a,b,c,37); R2(c,d,e,a,b,38); R2(b,c,d,e,a,39);
	R3(a,b,c,d,e,40); R3(e,a,b,c,d,41); R3(d,e,a,b,c,42); R3(c,d,e,a,b,43);
	R3(b,c,d,e,a,44); R3(a,b,c,d,e,45); R3(e,a,b,c,d,46); R3(d,e,a,b,c,47);
	R3(c,d,e,a,b,48); R3(b,c,d,e,a,49); R3(a,b,c,d,e,50); R3(e,a,b,c,d,51);
	R3(d,e,a,b,c,52); R3(c,d,e,a,b,53); R3(b,c,d,e,a,54); R3(a,b,c,d,e,55);
	R3(e,a,b,c,d,56); R3(d,e,a,b,c,57); R3(c,d,e,a,b,58); R3(b,c,d,e,a,59);
	R4(a,b,c,d,e,60); R4(e,a,b,c,d,61); R4(d,e,a,b,c,62); R4(c,d,e,a,b,63);
	R4(b,c,d,e,a,64); R4(a,b,c,d,e,65); R4(e,a,b,c,d,66); R4(d,e,a,b,c,67);
	R4(c,d,e,a,b,68); R4(b,c,d,e,a,69); R4(a,b,c,d,e,70); R4(e,a,b,c,d,71);
	R4(d,e,a,b,c,72); R4(c,d,e,a,b,73); R4(b,c,d,e,a,74); R4(a,b,c,d,e,75);
	R4(e,a,b,c,d,76); R4(d,e,a,b,c,77); R4(c,d,e,a,b,78); R4(b,c,d,e,a,79);
	/* Add the working vars back into context.state[] */
	state[0] += a;
	state[1] += b;
	state[2] += c;
	state[3] += d;
	state[4] += e;
#if WIPE
	/* Wipe variables */
	a = b = c = d = e = 0;
#endif
}


/* SHA1_Init - Initialize new context */
void SHA1_init(SHA1_t* context)
{
	/* SHA1 initialization constants */
	context->state[0] = 0x67452301;
	context->state[1] = 0xEFCDAB89;
	context->state[2] = 0x98BADCFE;
	context->state[3] = 0x10325476;
	context->state[4] = 0xC3D2E1F0;
	context->length = 0;
}

/* Run your data through this. */
void SHA1_update(SHA1_t *context, const void *buffer, size_t len)
{
	size_t i, j;
	const uint8_t *data = buffer;

	if (len) {
		j = (size_t)(context->length & 63);
		context->length += (uint64_t)len;

		if (j + len < 64)
			memcpy(&context->buffer[j], data, len);
		else {
			i = 64 - j;
			memcpy(&context->buffer[j], data, i);
			transform(context->state, context->blocks);
			for ( ; i + 64 <= len; i += 64) {
				memcpy(context->buffer, &data[i], 64);
				transform(context->state, context->blocks);
			}
			if (len - i)
				memcpy(context->buffer, &data[i], len - i);
		}
	}
}

/* Add padding and return the message digest. */
void SHA1_final(SHA1_t *context, uint8_t digest[SHA1_DIGEST_LENGTH])
{
	uint32_t i;
	uint64_t len;
	uint8_t finalcount[8], c128 = 0200, c0 = 0;

	len = context->length;
	SHA1_update(context, &c128, 1);
	while ((context->length & 63) != 56)
	    SHA1_update(context, &c0, 1);

	/* Should cause a transform() */
	len <<= 3;
	for (i = 0; i < 8; i++)
		finalcount[i] = (uint8_t)((len >> ((7 - i) << 3)) & 255);
	SHA1_update(context, finalcount, 8);

	/* Get the result */
	for (i = 0; i < SHA1_DIGEST_LENGTH; i++) {
	    digest[i] = (uint8_t)
	     ((context->state[i >> 2] >> ((3 - (i & 3)) << 3)) & 255);
	}
#if WIPE
	/* Wipe variables */
	memset(&finalcount, 0, sizeof finalcount);
	memset(context, 0, sizeof *context);
#endif
}

/**************** WebSocket connection upgrade ****************************/

static const char websocket_s[] = "websocket";
static const char sec_websocket_key_s[] = "Sec-WebSocket-Key";
static const char sec_websocket_version_s[] = "Sec-WebSocket-Version";
static const char sec_websocket_accept_s[] = "Sec-WebSocket-Accept";
static const char sec_websocket_protocol_s[] = "Sec-WebSocket-Protocol";
static const char websocket_guid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

static void enc64(unsigned char *in, char *out)
{
	static const char tob64[] =
		"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
		"abcdefghijklmnopqrstuvwxyz"
		"0123456789+/";
	out[0] = tob64[in[0] >> 2];
	out[1] = tob64[((in[0] & 0x03) << 4) | ((in[1] & 0xf0) >> 4)];
	out[2] = tob64[((in[1] & 0x0f) << 2) | ((in[2] & 0xc0) >> 6)];
	out[3] = tob64[in[2] & 0x3f];
}

static void make_accept_value(const char *key, char result[29])
{
	SHA1_t sha1;
	unsigned char md[SHA1_DIGEST_LENGTH+1];
	size_t len = strlen(key);
	char *buffer = alloca(len + sizeof websocket_guid - 1);
	memcpy(buffer, key, len);
	memcpy(buffer + len, websocket_guid, sizeof websocket_guid - 1);
	SHA1_init(&sha1);
	SHA1_update(&sha1, buffer, len + sizeof websocket_guid - 1);
	SHA1_final(&sha1, md);
	assert(SHA1_DIGEST_LENGTH == 20);
	md[20] = 0;
	enc64(&md[0], &result[0]);
	enc64(&md[3], &result[4]);
	enc64(&md[6], &result[8]);
	enc64(&md[9], &result[12]);
	enc64(&md[12], &result[16]);
	enc64(&md[15], &result[20]);
	enc64(&md[18], &result[24]);
	result[27] = '=';
	result[28] = 0;
}

static const char vseparators[] = " \t,";

static int headerhas(const char *header, const char *needle)
{
	size_t len, n;

	n = strlen(needle);
	for(;;) {
		header += strspn(header, vseparators);
		if (!*header)
			return 0;
		len = strcspn(header, vseparators);
		if (n == len && 0 == strncasecmp(needle, header, n))
			return 1;
		header += len;
	}
}

static void upgrade_to_websocket(
		void *cls,
		struct MHD_Connection *connection,
		void *con_cls,
		const char *extra_in,
		size_t extra_in_size,
		MHD_socket sock,
		struct MHD_UpgradeResponseHandle *urh)
{
	MHD_upgrade_action (urh, MHD_UPGRADE_ACTION_CLOSE);
}

static int check_websocket_upgrade(
		struct MHD_Connection *con
) {
	struct MHD_Response *response;
	const char *connection, *upgrade, *key, *version, *protocols;
	char acceptval[29];
	int vernum;

	/* is an upgrade to websocket ? */
	upgrade = MHD_lookup_connection_value(con, MHD_HEADER_KIND, MHD_HTTP_HEADER_UPGRADE);
	if (upgrade == NULL || strcasecmp(upgrade, websocket_s))
		return 0;

	/* is a connection for upgrade ? */
	connection = MHD_lookup_connection_value(con, MHD_HEADER_KIND, MHD_HTTP_HEADER_CONNECTION);
	if (connection == NULL
	 || !headerhas (connection, MHD_HTTP_HEADER_UPGRADE))
		return 0;

	/* has a key and a version ? */
	key = MHD_lookup_connection_value(con, MHD_HEADER_KIND, sec_websocket_key_s);
	version = MHD_lookup_connection_value(con, MHD_HEADER_KIND, sec_websocket_version_s);
	if (key == NULL || version == NULL)
		return 0;

	/* is a supported version ? */
	vernum = atoi(version);
	if (vernum != 13) {
		response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
		MHD_add_response_header(response, sec_websocket_version_s, "13");
		MHD_queue_response(con, MHD_HTTP_UPGRADE_REQUIRED, response);
		MHD_destroy_response(response);
		return 1;
	}

	/* is the protocol supported ? */
	protocols = MHD_lookup_connection_value(con, MHD_HEADER_KIND, sec_websocket_protocol_s);

	/* send the accept connection */
	response = MHD_create_response_for_upgrade(upgrade_to_websocket, NULL);
	make_accept_value(key, acceptval);
	MHD_add_response_header(response, sec_websocket_accept_s, acceptval);
	MHD_add_response_header(response, sec_websocket_protocol_s, protocols);
	MHD_add_response_header(response, MHD_HTTP_HEADER_UPGRADE, websocket_s);
	MHD_queue_response(con, MHD_HTTP_SWITCHING_PROTOCOLS, response);
	MHD_destroy_response(response);

	return 1;
}


static enum MHD_Result
access_handler (
		void *cls,
		struct MHD_Connection *con,
		const char *url,
		const char *method,
		const char *version,
		const char *upload_data,
		size_t *upload_data_size,
		void **ptr
) {
	struct MHD_Response *response;

	if (check_websocket_upgrade(con))
		return MHD_YES;
	response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);
	MHD_queue_response(con, MHD_HTTP_FORBIDDEN, response);
	MHD_destroy_response(response);
	return MHD_NO;
}

int
main (int argc, char *const *argv)
{
	const union MHD_DaemonInfo *i;
	struct MHD_Daemon *d;
	uint16_t port;
	struct pollfd pfd[1];
	MHD_UNSIGNED_LONG_LONG to;

	if (argc != 2)
	{
		printf ("%s PORT\n", argv[0]);
		return 1;
	}
	port = atoi (argv[1]);

	d = MHD_start_daemon(
		MHD_USE_EPOLL
		| MHD_ALLOW_UPGRADE
		| MHD_USE_TCP_FASTOPEN
		| MHD_USE_DEBUG,
		port,        /* port */
		NULL, NULL,  /* Tcp Accept call back + extra attribute */
		access_handler, NULL,  /* Http Request Call back + extra attribute */
		MHD_OPTION_END);  /* options-end */
	if (NULL == d)
		return 1;

	i = MHD_get_daemon_info(d, MHD_DAEMON_INFO_EPOLL_FD);
	if (i == NULL) {
		MHD_stop_daemon(d);
		return 1;
	}

	pfd[0].fd = i->epoll_fd;
	pfd[0].events = POLLIN;

	while (poll(pfd, 1, -1) == 1) {
		do { MHD_run(d); } while(MHD_get_timeout(d, &to) == MHD_YES && !to);    
	}
	MHD_stop_daemon (d);
	return 0;
}
poll([{fd=4, events=POLLIN}], 1, -1
)    = 1 ([{fd=4, revents=POLLIN}])
epoll_wait(4, [{EPOLLIN, {u32=17909760, u64=17909760}}], 128, 0) = 1
accept4(3, {sa_family=AF_INET, sin_port=htons(34846), 
sin_addr=inet_addr("127.0.0.1")}, [28->16], SOCK_CLOEXEC|SOCK_NONBLOCK) = 7
setsockopt(7, SOL_TCP, TCP_NODELAY, [1], 4) = 0
epoll_ctl(4, EPOLL_CTL_ADD, 7, {EPOLLIN|EPOLLPRI|EPOLLOUT|EPOLLET, 
{u32=17946880, u64=17946880}}) = 0
accept4(3, 0x7ffd0c13e8d0, [28], SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN 
(Resource temporarily unavailable)
close(6)                                = 0
poll([{fd=4, events=POLLIN}], 1, -1)    = 1 ([{fd=4, revents=POLLIN}])
epoll_wait(4, [{EPOLLIN|EPOLLOUT, {u32=17946880, u64=17946880}}], 128, 0) = 1
recvfrom(7, "GET /api HTTP/1.1\r\nHost: 127.0.0.1:5555\r\nUpgrade: 
websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Vers"..., 16384, 0, NULL, 
NULL) = 215
sendmsg(7, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="HTTP/1.1 101 
Switching Protocols\r\nUpgrade: websocket\r\nSec-WebSocket-Protocol: 
x-afb-ws-json1\r\nSec-We"..., iov_len=206}, {iov_base=NULL, iov_len=0}], 
msg_iovlen=2, msg_controllen=0, msg_flags=0}, MSG_NOSIGNAL) = 206
epoll_ctl(4, EPOLL_CTL_DEL, 7, NULL)    = 0
poll([{fd=4, events=POLLIN}], 1, -1

Reply via email to