Hi Jose,

I've figured out the issue.  The bug is actually in your code.

The problem is that you are using an _external_ event loop, and if you
do so, you are responsible for re-triggering MHD_run() if you do
anything that might change the state of an MHD connection.
This (mostly) applies to MHD_resume_connection() -- but also to closing
an upgraded connection.

Without this modification, MHD does close the connection on the _next_
request, so if you would connect with another client, MHD will then
close the connection.

This is the only way to fix this, as when MHD is not 'run', it cannot
act. And if you are controlling the external event loop, you are
responsible for triggering MHD 'when necessary'.

I've attached a modified version of your code that adds the necessary
signalling logic to your poll()-loop. (Using eventfd(); non-portable.
Use a pipe() to do this in a more portable way.)


Happy hacking!

Christian

On 12/10/20 4:08 PM, José Bollo wrote:
> Hello,
> 
> My code uses LMHD embedded with its EPOLL mechanism. Part of that code
> deals with upgrading to websocket. It then call somewhere:
> 
>   response = MHD_create_response_for_upgrade(
>                upgrade_to_websocket, memo);
> 
> and the callback function upgrade_to_websocket looks as here below:
> 
>   void upgrade_to_websocket(
>              void *cls,
>              struct MHD_Connection *connection,
>              void *con_cls,
>              const char *extra_in,
>              size_t extra_in_size,
>              MHD_socket sock,
>              struct MHD_UpgradeResponseHandle *urh
>   ) {
>       struct memo *memo = cls;
>       struct ws *ws = ws_create(sock, memo, close_websocket, urh);
>       if (ws == NULL) close_websocket(urh);
>   }
> 
>   void close_websocket(struct MHD_UpgradeResponseHandle *urh) {
>       MHD_upgrade_action (urh, MHD_UPGRADE_ACTION_CLOSE);
>   }
> 
> Thank you for your attention until here. So far, so good.
> 
> The issue now: when the functiuon ws_create returns NULL, the program
> returns to some polling and wait for an events BUT DOES NOT CLOSE THE
> SOCKET, leading to starvation of the client.
> 
> I guess that calling some function after calling MHD_upgrade_action
> (urh, MHD_UPGRADE_ACTION_CLOSE) could unlock the situation by
> performing correct close. Though the called function should not be
> MHD_run because it dispatch events, what is not expected here.
> 
> I join a sample demo. When I connect on websocket on it, the client
> starves. I recorded and joined the output of strace.
> 
> Best regards
> José Bollo
> 
> 
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <sys/types.h>
#include <microhttpd.h>
#include <sys/eventfd.h>

static int efd;

/**************** SHA ****************************/

#define SHA1_DIGEST_LENGTH  20

/* The SHA1 structure: */
typedef struct
{
  uint32_t state[5];
  uint64_t length;
  union
  {
    uint8_t buffer[64];
    uint32_t blocks[16];
  };
} SHA1_t;

#define rol(value, bits) (((value) << (bits)) | ((value) >> (32 - (bits))))

/* blk0() and blk() perform the initial expand. */
/* I got the idea of expanding during the round function from SSLeay */

#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
# define blk0(i) (block[i] = (rol (block[i],24) & (uint32_t) 0xFF00FF00) \
                             | (rol (block[i],8) & (uint32_t) 0x00FF00FF))
#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
# define blk0(i) block[i]
#else
# error "unsupported byte order"
#endif

#define blk(i) (block[i & 15] = rol (block[(i + 13) & 15] ^ block[(i + 8) & 15] \
                                     ^ block[(i + 2) & 15] ^ block[i & 15],1))

/* (R0+R1), R2, R3, R4 are the different operations used in SHA1 */
#define R0(v,w,x,y,z,i) z += ((w & (x ^ y)) ^ y) + blk0 (i) + 0x5A827999 + rol ( \
    v,5); w = rol (w,30);
#define R1(v,w,x,y,z,i) z += ((w & (x ^ y)) ^ y) + blk (i) + 0x5A827999 + rol ( \
    v,5); w = rol (w,30);
#define R2(v,w,x,y,z,i) z += (w ^ x ^ y) + blk (i) + 0x6ED9EBA1 + rol (v,5); w = \
  rol (w,30);
#define R3(v,w,x,y,z,i) z += (((w | x) & y) | (w & x)) + blk (i) + 0x8F1BBCDC \
                             + rol (v,5); w = rol (w,30);
#define R4(v,w,x,y,z,i) z += (w ^ x ^ y) + blk (i) + 0xCA62C1D6 + rol (v,5); w = \
  rol (w,30);
#define WIPE 1

/* Hash a single 512-bit block. This is the core of the algorithm. */
static void
transform (uint32_t state[5], uint32_t block[16])
{
  uint32_t a, b, c, d, e;

  /* Copy context->state[] to working vars */
  a = state[0];
  b = state[1];
  c = state[2];
  d = state[3];
  e = state[4];
  /* 4 rounds of 20 operations each. Loop unrolled. */
  R0 (a,b,c,d,e, 0); R0 (e,a,b,c,d, 1); R0 (d,e,a,b,c, 2); R0 (c,d,e,a,b, 3);
  R0 (b,c,d,e,a, 4); R0 (a,b,c,d,e, 5); R0 (e,a,b,c,d, 6); R0 (d,e,a,b,c, 7);
  R0 (c,d,e,a,b, 8); R0 (b,c,d,e,a, 9); R0 (a,b,c,d,e,10); R0 (e,a,b,c,d,11);
  R0 (d,e,a,b,c,12); R0 (c,d,e,a,b,13); R0 (b,c,d,e,a,14); R0 (a,b,c,d,e,15);
  R1 (e,a,b,c,d,16); R1 (d,e,a,b,c,17); R1 (c,d,e,a,b,18); R1 (b,c,d,e,a,19);
  R2 (a,b,c,d,e,20); R2 (e,a,b,c,d,21); R2 (d,e,a,b,c,22); R2 (c,d,e,a,b,23);
  R2 (b,c,d,e,a,24); R2 (a,b,c,d,e,25); R2 (e,a,b,c,d,26); R2 (d,e,a,b,c,27);
  R2 (c,d,e,a,b,28); R2 (b,c,d,e,a,29); R2 (a,b,c,d,e,30); R2 (e,a,b,c,d,31);
  R2 (d,e,a,b,c,32); R2 (c,d,e,a,b,33); R2 (b,c,d,e,a,34); R2 (a,b,c,d,e,35);
  R2 (e,a,b,c,d,36); R2 (d,e,a,b,c,37); R2 (c,d,e,a,b,38); R2 (b,c,d,e,a,39);
  R3 (a,b,c,d,e,40); R3 (e,a,b,c,d,41); R3 (d,e,a,b,c,42); R3 (c,d,e,a,b,43);
  R3 (b,c,d,e,a,44); R3 (a,b,c,d,e,45); R3 (e,a,b,c,d,46); R3 (d,e,a,b,c,47);
  R3 (c,d,e,a,b,48); R3 (b,c,d,e,a,49); R3 (a,b,c,d,e,50); R3 (e,a,b,c,d,51);
  R3 (d,e,a,b,c,52); R3 (c,d,e,a,b,53); R3 (b,c,d,e,a,54); R3 (a,b,c,d,e,55);
  R3 (e,a,b,c,d,56); R3 (d,e,a,b,c,57); R3 (c,d,e,a,b,58); R3 (b,c,d,e,a,59);
  R4 (a,b,c,d,e,60); R4 (e,a,b,c,d,61); R4 (d,e,a,b,c,62); R4 (c,d,e,a,b,63);
  R4 (b,c,d,e,a,64); R4 (a,b,c,d,e,65); R4 (e,a,b,c,d,66); R4 (d,e,a,b,c,67);
  R4 (c,d,e,a,b,68); R4 (b,c,d,e,a,69); R4 (a,b,c,d,e,70); R4 (e,a,b,c,d,71);
  R4 (d,e,a,b,c,72); R4 (c,d,e,a,b,73); R4 (b,c,d,e,a,74); R4 (a,b,c,d,e,75);
  R4 (e,a,b,c,d,76); R4 (d,e,a,b,c,77); R4 (c,d,e,a,b,78); R4 (b,c,d,e,a,79);
  /* Add the working vars back into context.state[] */
  state[0] += a;
  state[1] += b;
  state[2] += c;
  state[3] += d;
  state[4] += e;
#if WIPE
  /* Wipe variables */
  a = b = c = d = e = 0;
#endif
}


/* SHA1_Init - Initialize new context */
void
SHA1_init (SHA1_t*context)
{
  /* SHA1 initialization constants */
  context->state[0] = 0x67452301;
  context->state[1] = 0xEFCDAB89;
  context->state[2] = 0x98BADCFE;
  context->state[3] = 0x10325476;
  context->state[4] = 0xC3D2E1F0;
  context->length = 0;
}


/* Run your data through this. */
void
SHA1_update (SHA1_t *context, const void *buffer, size_t len)
{
  size_t i, j;
  const uint8_t *data = buffer;

  if (len)
  {
    j = (size_t) (context->length & 63);
    context->length += (uint64_t) len;

    if (j + len < 64)
      memcpy (&context->buffer[j], data, len);
    else
    {
      i = 64 - j;
      memcpy (&context->buffer[j], data, i);
      transform (context->state, context->blocks);
      for ( ; i + 64 <= len; i += 64)
      {
        memcpy (context->buffer, &data[i], 64);
        transform (context->state, context->blocks);
      }
      if (len - i)
        memcpy (context->buffer, &data[i], len - i);
    }
  }
}


/* Add padding and return the message digest. */
void
SHA1_final (SHA1_t *context, uint8_t digest[SHA1_DIGEST_LENGTH])
{
  uint32_t i;
  uint64_t len;
  uint8_t finalcount[8], c128 = 0200, c0 = 0;

  len = context->length;
  SHA1_update (context, &c128, 1);
  while ((context->length & 63) != 56)
    SHA1_update (context, &c0, 1);

  /* Should cause a transform() */
  len <<= 3;
  for (i = 0; i < 8; i++)
    finalcount[i] = (uint8_t) ((len >> ((7 - i) << 3)) & 255);
  SHA1_update (context, finalcount, 8);

  /* Get the result */
  for (i = 0; i < SHA1_DIGEST_LENGTH; i++)
  {
    digest[i] = (uint8_t)
                ((context->state[i >> 2] >> ((3 - (i & 3)) << 3)) & 255);
  }
#if WIPE
  /* Wipe variables */
  memset (&finalcount, 0, sizeof finalcount);
  memset (context, 0, sizeof *context);
#endif
}


/**************** WebSocket connection upgrade ****************************/

static const char websocket_s[] = "websocket";
static const char sec_websocket_key_s[] = "Sec-WebSocket-Key";
static const char sec_websocket_version_s[] = "Sec-WebSocket-Version";
static const char sec_websocket_accept_s[] = "Sec-WebSocket-Accept";
static const char sec_websocket_protocol_s[] = "Sec-WebSocket-Protocol";
static const char websocket_guid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

static void
enc64 (unsigned char *in, char *out)
{
  static const char tob64[] =
    "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    "abcdefghijklmnopqrstuvwxyz"
    "0123456789+/";
  out[0] = tob64[in[0] >> 2];
  out[1] = tob64[((in[0] & 0x03) << 4) | ((in[1] & 0xf0) >> 4)];
  out[2] = tob64[((in[1] & 0x0f) << 2) | ((in[2] & 0xc0) >> 6)];
  out[3] = tob64[in[2] & 0x3f];
}


static void
make_accept_value (const char *key, char result[29])
{
  SHA1_t sha1;
  unsigned char md[SHA1_DIGEST_LENGTH + 1];
  size_t len = strlen (key);
  char *buffer = alloca (len + sizeof websocket_guid - 1);
  memcpy (buffer, key, len);
  memcpy (buffer + len, websocket_guid, sizeof websocket_guid - 1);
  SHA1_init (&sha1);
  SHA1_update (&sha1, buffer, len + sizeof websocket_guid - 1);
  SHA1_final (&sha1, md);
  assert (SHA1_DIGEST_LENGTH == 20);
  md[20] = 0;
  enc64 (&md[0], &result[0]);
  enc64 (&md[3], &result[4]);
  enc64 (&md[6], &result[8]);
  enc64 (&md[9], &result[12]);
  enc64 (&md[12], &result[16]);
  enc64 (&md[15], &result[20]);
  enc64 (&md[18], &result[24]);
  result[27] = '=';
  result[28] = 0;
}


static const char vseparators[] = " \t,";

static int
headerhas (const char *header, const char *needle)
{
  size_t len, n;

  n = strlen (needle);
  for (;;)
  {
    header += strspn (header, vseparators);
    if (! *header)
      return 0;
    len = strcspn (header, vseparators);
    if ((n == len) && (0 == strncasecmp (needle, header, n)))
      return 1;
    header += len;
  }
}


static void
upgrade_to_websocket (
  void *cls,
  struct MHD_Connection *connection,
  void *con_cls,
  const char *extra_in,
  size_t extra_in_size,
  MHD_socket sock,
  struct MHD_UpgradeResponseHandle *urh)
{
  static long long s = 1;
  MHD_upgrade_action (urh, MHD_UPGRADE_ACTION_CLOSE);
  write (efd, &s, sizeof (s));
}


static int
check_websocket_upgrade (
  struct MHD_Connection *con
  )
{
  struct MHD_Response *response;
  const char *connection, *upgrade, *key, *version, *protocols;
  char acceptval[29];
  int vernum;

  /* is an upgrade to websocket ? */
  upgrade = MHD_lookup_connection_value (con, MHD_HEADER_KIND,
                                         MHD_HTTP_HEADER_UPGRADE);
  if ((upgrade == NULL) || strcasecmp (upgrade, websocket_s))
    return 0;

  /* is a connection for upgrade ? */
  connection = MHD_lookup_connection_value (con, MHD_HEADER_KIND,
                                            MHD_HTTP_HEADER_CONNECTION);
  if ((connection == NULL)
      || ! headerhas (connection, MHD_HTTP_HEADER_UPGRADE))
    return 0;

  /* has a key and a version ? */
  key = MHD_lookup_connection_value (con, MHD_HEADER_KIND, sec_websocket_key_s);
  version = MHD_lookup_connection_value (con, MHD_HEADER_KIND,
                                         sec_websocket_version_s);
  if ((key == NULL) || (version == NULL))
    return 0;

  /* is a supported version ? */
  vernum = atoi (version);
  if (vernum != 13)
  {
    response = MHD_create_response_from_buffer (0, NULL,
                                                MHD_RESPMEM_PERSISTENT);
    MHD_add_response_header (response, sec_websocket_version_s, "13");
    MHD_queue_response (con, MHD_HTTP_UPGRADE_REQUIRED, response);
    MHD_destroy_response (response);
    return 1;
  }

  /* is the protocol supported ? */
  protocols = MHD_lookup_connection_value (con, MHD_HEADER_KIND,
                                           sec_websocket_protocol_s);

  /* send the accept connection */
  response = MHD_create_response_for_upgrade (upgrade_to_websocket, NULL);
  make_accept_value (key, acceptval);
  MHD_add_response_header (response, sec_websocket_accept_s, acceptval);
  MHD_add_response_header (response, sec_websocket_protocol_s, protocols);
  MHD_add_response_header (response, MHD_HTTP_HEADER_UPGRADE, websocket_s);
  MHD_queue_response (con, MHD_HTTP_SWITCHING_PROTOCOLS, response);
  MHD_destroy_response (response);

  return 1;
}


static enum MHD_Result
access_handler (
  void *cls,
  struct MHD_Connection *con,
  const char *url,
  const char *method,
  const char *version,
  const char *upload_data,
  size_t *upload_data_size,
  void **ptr
  )
{
  struct MHD_Response *response;

  if (check_websocket_upgrade (con))
    return MHD_YES;
  response = MHD_create_response_from_buffer (0, NULL, MHD_RESPMEM_PERSISTENT);
  MHD_queue_response (con, MHD_HTTP_FORBIDDEN, response);
  MHD_destroy_response (response);
  return MHD_NO;
}


int
main (int argc, char *const *argv)
{
  const union MHD_DaemonInfo *di;
  struct MHD_Daemon *d;
  uint16_t port;
  struct pollfd pfd[2];
  MHD_UNSIGNED_LONG_LONG to;
  int ret;
  int tox;

  if (argc != 2)
  {
    printf ("%s PORT\n", argv[0]);
    return 1;
  }
  port = atoi (argv[1]);
  efd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
  d = MHD_start_daemon (
    MHD_USE_EPOLL
    | MHD_ALLOW_UPGRADE
    | MHD_USE_TCP_FASTOPEN
    | MHD_USE_DEBUG,
    port,        /* port */
    NULL, NULL,  /* Tcp Accept call back + extra attribute */
    access_handler, NULL,  /* Http Request Call back + extra attribute */
    MHD_OPTION_END);  /* options-end */
  if (NULL == d)
    return 1;

  di = MHD_get_daemon_info (d, MHD_DAEMON_INFO_EPOLL_FD);
  if (di == NULL)
  {
    MHD_stop_daemon (d);
    return 1;
  }

  pfd[0].fd = di->epoll_fd;
  pfd[0].events = POLLIN;
  pfd[1].fd = efd;
  pfd[1].events = POLLIN;
  tox = -1;
  while (-1 != (ret = poll (pfd, 2, tox)))
  {
    if (0 != pfd[1].revents)
    {
      long long d;

      read (efd, &d, sizeof (d));
    }
    MHD_run (d);
    if (MHD_get_timeout (d, &to) == MHD_YES)
      tox = to;
    else
      tox = -1;
  }
  MHD_stop_daemon (d);
  close (efd);
  return 0;
}

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to