----- Original Message ----- > Updated Branches: > refs/heads/master f47c6be30 -> 3a903f2bc > > > TS-2201: split drainIncomingChannel two thread, one handle Broadcast message > and other handle Reliable(TCP) request. > > > Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo > Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/3a903f2b > Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/3a903f2b > Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/3a903f2b > > Branch: refs/heads/master > Commit: 3a903f2bc69c0fb5623fa212f2f1ef64b5455c8b > Parents: f47c6be > Author: Chen Bin <kuo...@taobao.com> > Authored: Tue Sep 17 11:51:11 2013 +0800 > Committer: Chen Bin <kuo...@taobao.com> > Committed: Tue Sep 17 11:51:11 2013 +0800 > > ---------------------------------------------------------------------- > CHANGES | 3 ++ > mgmt/cluster/ClusterCom.cc | 109 +++++++++++++++++++++++++++------------- > 2 files changed, 77 insertions(+), 35 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/CHANGES > ---------------------------------------------------------------------- > diff --git a/CHANGES b/CHANGES > index e9c056d..f2e9fd4 100644 > --- a/CHANGES > +++ b/CHANGES > @@ -1,6 +1,9 @@ > -*- coding: utf-8 > -*- > Changes with Apache Traffic Server 4.1.0 > > + *) [TS-2201] split drainIncomingChannel two thread, one handle Broadcast > message and other handle Reliable(TCP) > + request for supporing large cluster. > + > *) [TS-2144] Avoid race on e.g. "traffic_server -Cclear" which would crash > the process intermittently. > > > http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/mgmt/cluster/ClusterCom.cc > ---------------------------------------------------------------------- > diff --git a/mgmt/cluster/ClusterCom.cc b/mgmt/cluster/ClusterCom.cc > index 430e01b..fa173cc 100644 > --- a/mgmt/cluster/ClusterCom.cc > +++ b/mgmt/cluster/ClusterCom.cc > @@ -48,6 +48,76 @@ > int MultiCastMessages = 0; > long LastHighestDelta = -1L; > > + > +void * > +drainIncomingChannel_broadcast(void *arg) > +{ > + char message[61440];
This number pops up > + fd_set fdlist; > + void *ret = arg; > + > + time_t t; > + time_t last_multicast_receive_time = time(NULL); > + struct timeval tv; > + > + /* Avert race condition, thread spun during constructor */ > + while (!lmgmt->ccom || !lmgmt->ccom->init) { > + mgmt_sleep_sec(1); > + } > + > + lmgmt->syslogThrInit(); > + > + for (;;) { /* Loop draining mgmt network channels */ > + // linux: set tv.tv_set in select() loop, since linux's select() > + // will update tv with the amount of time not slept (most other > + // implementations do not do this) > + tv.tv_sec = lmgmt->ccom->mc_poll_timeout; // interface > not-responding timeout > + tv.tv_usec = 0; > + > + memset(message, 0, 61440); numerous times. You may want to > + FD_ZERO(&fdlist); > + > + if (lmgmt->ccom->cluster_type != NO_CLUSTER) { > + if (lmgmt->ccom->receive_fd > 0) { > + FD_SET(lmgmt->ccom->receive_fd, &fdlist); /* Multicast fd */ > + } > + } > + > + mgmt_select(FD_SETSIZE, &fdlist, NULL, NULL, &tv); > + > + if (lmgmt->ccom->cluster_type != NO_CLUSTER) { > + // Multicast timeout considerations > + if ((lmgmt->ccom->receive_fd < 0) || > !FD_ISSET(lmgmt->ccom->receive_fd, &fdlist)) { > + t = time(NULL); > + if ((t - last_multicast_receive_time) > (tv.tv_sec - 1)) { > + // Timeout on multicast receive channel, reset channel. > + if (lmgmt->ccom->receive_fd > 0) { > + close(lmgmt->ccom->receive_fd); > + } > + lmgmt->ccom->receive_fd = -1; > + Debug("ccom", "Timeout, resetting multicast receive channel"); > + if (lmgmt->ccom->establishReceiveChannel(0)) { > + Debug("ccom", "establishReceiveChannel failed"); > + lmgmt->ccom->receive_fd = -1; > + } > + last_multicast_receive_time = t; // next action at next > interval > + } > + } else { > + last_multicast_receive_time = time(NULL); // valid multicast > msg > + } > + } > + > + /* Broadcast message */ > + if (lmgmt->ccom->cluster_type != NO_CLUSTER && > + lmgmt->ccom->receive_fd > 0 && > + FD_ISSET(lmgmt->ccom->receive_fd, &fdlist) && > + (lmgmt->ccom->receiveIncomingMessage(message, 61440) > 0)) { Consider replacing it with a constant. > + lmgmt->ccom->handleMultiCastMessage(message); > + } > + } > + return ret; > +} /* End drainIncomingChannel */ > + > /* > * drainIncomingChannel > * This function is blocking, it never returns. It is meant to allow for > @@ -89,8 +159,6 @@ drainIncomingChannel(void *arg) > // to reopen the channel (e.g. opening the socket would fail if the > // interface was down). In this case, the ccom->receive_fd is set > // to '-1' and the open is retried until it succeeds. > - time_t t; > - time_t last_multicast_receive_time = time(NULL); > struct timeval tv; > > /* Avert race condition, thread spun during constructor */ > @@ -111,43 +179,12 @@ drainIncomingChannel(void *arg) > FD_ZERO(&fdlist); > > if (lmgmt->ccom->cluster_type != NO_CLUSTER) { > - if (lmgmt->ccom->receive_fd > 0) { > - FD_SET(lmgmt->ccom->receive_fd, &fdlist); /* Multicast fd */ > - } > FD_SET(lmgmt->ccom->reliable_server_fd, &fdlist); /* TCP Server fd > */ > } > > mgmt_select(FD_SETSIZE, &fdlist, NULL, NULL, &tv); > > - if (lmgmt->ccom->cluster_type != NO_CLUSTER) { > - // Multicast timeout considerations > - if ((lmgmt->ccom->receive_fd < 0) || > !FD_ISSET(lmgmt->ccom->receive_fd, &fdlist)) { > - t = time(NULL); > - if ((t - last_multicast_receive_time) > (tv.tv_sec - 1)) { > - // Timeout on multicast receive channel, reset channel. > - if (lmgmt->ccom->receive_fd > 0) { > - close(lmgmt->ccom->receive_fd); > - } > - lmgmt->ccom->receive_fd = -1; > - Debug("ccom", "Timeout, resetting multicast receive channel"); > - if (lmgmt->ccom->establishReceiveChannel(0)) { > - Debug("ccom", "establishReceiveChannel failed"); > - lmgmt->ccom->receive_fd = -1; > - } > - last_multicast_receive_time = t; // next action at next > interval > - } > - } else { > - last_multicast_receive_time = time(NULL); // valid multicast > msg > - } > - } > - > - /* Broadcast message */ > - if (lmgmt->ccom->cluster_type != NO_CLUSTER && > - lmgmt->ccom->receive_fd > 0 && > - FD_ISSET(lmgmt->ccom->receive_fd, &fdlist) && > - (lmgmt->ccom->receiveIncomingMessage(message, 61440) > 0)) { > - lmgmt->ccom->handleMultiCastMessage(message); > - } else if (FD_ISSET(lmgmt->ccom->reliable_server_fd, &fdlist)) { > + if (FD_ISSET(lmgmt->ccom->reliable_server_fd, &fdlist)) { > /* Reliable(TCP) request */ > int clilen = sizeof(cli_addr); > int req_fd = mgmt_accept(lmgmt->ccom->reliable_server_fd, (struct > sockaddr *) &cli_addr, &clilen); > @@ -442,8 +479,10 @@ ClusterCom::ClusterCom(unsigned long oip, char *host, > int mcport, char *group, i > peers = ink_hash_table_create(InkHashTableKeyType_String); > mismatchLog = ink_hash_table_create(InkHashTableKeyType_String); > > - if (cluster_type != NO_CLUSTER) > + if (cluster_type != NO_CLUSTER) { > + ink_thread_create(drainIncomingChannel_broadcast, 0); /* Spin drainer > thread */ > ink_thread_create(drainIncomingChannel, 0); /* Spin drainer thread */ > + } > return; > } /* End ClusterCom::ClusterCom */ > > > -- Igor Galić Tel: +43 (0) 664 886 22 883 Mail: i.ga...@brainsware.org URL: http://brainsware.org/ GPG: 6880 4155 74BD FD7C B515 2EA5 4B1D 9E08 A097 C9AE