From: Liu Ping Fan <pingf...@linux.vnet.ibm.com> Use nc->transfer_lock to protect the nc->peer->send_queue. All of the deleter and senders will sync on this lock, so we can also survive across unplug.
Signed-off-by: Liu Ping Fan <pingf...@linux.vnet.ibm.com> --- include/net/net.h | 4 +++ include/net/queue.h | 1 + net/hub.c | 21 +++++++++++++- net/net.c | 72 ++++++++++++++++++++++++++++++++++++++++++++++++--- net/queue.c | 15 +++++++++- 5 files changed, 105 insertions(+), 8 deletions(-) diff --git a/include/net/net.h b/include/net/net.h index 24563ef..3e4b9df 100644 --- a/include/net/net.h +++ b/include/net/net.h @@ -63,6 +63,8 @@ typedef struct NetClientInfo { } NetClientInfo; struct NetClientState { + /* protect peer's send_queue */ + QemuMutex transfer_lock; NetClientInfo *info; int link_down; QTAILQ_ENTRY(NetClientState) next; @@ -78,6 +80,7 @@ struct NetClientState { typedef struct NICState { NetClientState ncs[MAX_QUEUE_NUM]; + NetClientState *pending_peer[MAX_QUEUE_NUM]; NICConf *conf; void *opaque; bool peer_deleted; @@ -105,6 +108,7 @@ NetClientState *qemu_find_vlan_client_by_name(Monitor *mon, int vlan_id, const char *client_str); typedef void (*qemu_nic_foreach)(NICState *nic, void *opaque); void qemu_foreach_nic(qemu_nic_foreach func, void *opaque); +int qemu_can_send_packet_nolock(NetClientState *sender); int qemu_can_send_packet(NetClientState *nc); ssize_t qemu_sendv_packet(NetClientState *nc, const struct iovec *iov, int iovcnt); diff --git a/include/net/queue.h b/include/net/queue.h index f60e57f..0ecd23b 100644 --- a/include/net/queue.h +++ b/include/net/queue.h @@ -67,6 +67,7 @@ ssize_t qemu_net_queue_send_iov(NetQueue *queue, NetPacketSent *sent_cb); void qemu_net_queue_purge(NetQueue *queue, NetClientState *from); +void qemu_net_queue_purge_all(NetQueue *queue); bool qemu_net_queue_flush(NetQueue *queue); #endif /* QEMU_NET_QUEUE_H */ diff --git a/net/hub.c b/net/hub.c index 81d2a04..97c3ac3 100644 --- a/net/hub.c +++ b/net/hub.c @@ -53,9 +53,14 @@ static ssize_t net_hub_receive(NetHub *hub, NetHubPort *source_port, if (port == source_port) { continue; } - + qemu_mutex_lock(&port->nc.transfer_lock); + if (!port->nc.peer) { + qemu_mutex_unlock(&port->nc.transfer_lock); + continue; + } qemu_net_queue_append(port->nc.peer->send_queue, &port->nc, QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL); + qemu_mutex_unlock(&port->nc.transfer_lock); event_notifier_set(&port->e); } return len; @@ -65,7 +70,13 @@ static void hub_port_deliver_packet(void *opaque) { NetHubPort *port = (NetHubPort *)opaque; + qemu_mutex_lock(&port->nc.transfer_lock); + if (!port->nc.peer) { + qemu_mutex_unlock(&port->nc.transfer_lock); + return; + } qemu_net_queue_flush(port->nc.peer->send_queue); + qemu_mutex_unlock(&port->nc.transfer_lock); } static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port, @@ -78,10 +89,16 @@ static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port, if (port == source_port) { continue; } - + qemu_mutex_lock(&port->nc.transfer_lock); + if (!port->nc.peer) { + qemu_mutex_unlock(&port->nc.transfer_lock); + continue; + } qemu_net_queue_append_iov(port->nc.peer->send_queue, &port->nc, QEMU_NET_PACKET_FLAG_NONE, iov, iovcnt, NULL); + qemu_mutex_unlock(&port->nc.transfer_lock); event_notifier_set(&port->e); + } return len; } diff --git a/net/net.c b/net/net.c index 544542b..0acb933 100644 --- a/net/net.c +++ b/net/net.c @@ -207,6 +207,7 @@ static void qemu_net_client_setup(NetClientState *nc, nc->peer = peer; peer->peer = nc; } + qemu_mutex_init(&nc->transfer_lock); QTAILQ_INSERT_TAIL(&net_clients, nc, next); nc->send_queue = qemu_new_net_queue(nc); @@ -285,6 +286,7 @@ void *qemu_get_nic_opaque(NetClientState *nc) static void qemu_cleanup_net_client(NetClientState *nc) { + /* This is the place where may be out of big lock, when dev finalized */ QTAILQ_REMOVE(&net_clients, nc, next); if (nc->info->cleanup) { @@ -307,6 +309,28 @@ static void qemu_free_net_client(NetClientState *nc) } } +/* exclude race with rx/tx path, flush out peer's queue */ +static void qemu_flushout_net_client(NetClientState *nc) +{ + NetClientState *peer; + + /* sync on receive path */ + peer = nc->peer; + if (peer) { + qemu_mutex_lock(&peer->transfer_lock); + peer->peer = NULL; + qemu_mutex_unlock(&peer->transfer_lock); + } + + /* sync on send from this nc */ + qemu_mutex_lock(&nc->transfer_lock); + nc->peer = NULL; + if (peer) { + qemu_net_queue_purge(peer->send_queue, nc); + } + qemu_mutex_unlock(&nc->transfer_lock); +} + void qemu_del_net_client(NetClientState *nc) { NetClientState *ncs[MAX_QUEUE_NUM]; @@ -337,7 +361,9 @@ void qemu_del_net_client(NetClientState *nc) } for (i = 0; i < queues; i++) { + qemu_flushout_net_client(ncs[i]); qemu_cleanup_net_client(ncs[i]); + nic->pending_peer[i] = ncs[i]; } return; @@ -346,6 +372,7 @@ void qemu_del_net_client(NetClientState *nc) assert(nc->info->type != NET_CLIENT_OPTIONS_KIND_NIC); for (i = 0; i < queues; i++) { + qemu_flushout_net_client(ncs[i]); qemu_cleanup_net_client(ncs[i]); qemu_free_net_client(ncs[i]); } @@ -358,16 +385,19 @@ void qemu_del_nic(NICState *nic) /* If this is a peer NIC and peer has already been deleted, free it now. */ if (nic->peer_deleted) { for (i = 0; i < queues; i++) { - qemu_free_net_client(qemu_get_subqueue(nic, i)->peer); + qemu_free_net_client(nic->pending_peer[i]); } } for (i = queues - 1; i >= 0; i--) { + NetClientState *nc = qemu_get_subqueue(nic, i); + qemu_flushout_net_client(nc); qemu_cleanup_net_client(nc); qemu_free_net_client(nc); } + } void qemu_foreach_nic(qemu_nic_foreach func, void *opaque) @@ -383,7 +413,7 @@ void qemu_foreach_nic(qemu_nic_foreach func, void *opaque) } } -int qemu_can_send_packet(NetClientState *sender) +int qemu_can_send_packet_nolock(NetClientState *sender) { if (!sender->peer) { return 1; @@ -398,6 +428,29 @@ int qemu_can_send_packet(NetClientState *sender) return 1; } +int qemu_can_send_packet(NetClientState *sender) +{ + int ret = 1; + + qemu_mutex_lock(&sender->transfer_lock); + if (!sender->peer) { + ret = 1; + goto unlock; + } + + if (sender->peer->receive_disabled) { + ret = 0; + goto unlock; + } else if (sender->peer->info->can_receive && + !sender->peer->info->can_receive(sender->peer)) { + ret = 0; + goto unlock; + } +unlock: + qemu_mutex_unlock(&sender->transfer_lock); + return ret; +} + ssize_t qemu_deliver_packet(NetClientState *sender, unsigned flags, const uint8_t *data, @@ -455,19 +508,24 @@ static ssize_t qemu_send_packet_async_with_flags(NetClientState *sender, NetPacketSent *sent_cb) { NetQueue *queue; + ssize_t rslt; #ifdef DEBUG_NET printf("qemu_send_packet_async:\n"); hex_dump(stdout, buf, size); #endif + qemu_mutex_lock(&sender->transfer_lock); if (sender->link_down || !sender->peer) { + qemu_mutex_lock(&sender->transfer_lock); return size; } queue = sender->peer->send_queue; - return qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb); + rslt = qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb); + qemu_mutex_unlock(&sender->transfer_lock); + return rslt; } ssize_t qemu_send_packet_async(NetClientState *sender, @@ -535,16 +593,21 @@ ssize_t qemu_sendv_packet_async(NetClientState *sender, NetPacketSent *sent_cb) { NetQueue *queue; + ssize_t rslt; + qemu_mutex_lock(&sender->transfer_lock); if (sender->link_down || !sender->peer) { + qemu_mutex_unlock(&sender->transfer_lock); return iov_size(iov, iovcnt); } queue = sender->peer->send_queue; - return qemu_net_queue_send_iov(queue, sender, + rslt = qemu_net_queue_send_iov(queue, sender, QEMU_NET_PACKET_FLAG_NONE, iov, iovcnt, sent_cb); + qemu_mutex_unlock(&sender->transfer_lock); + return rslt; } ssize_t @@ -984,6 +1047,7 @@ void do_info_network(Monitor *mon, const QDict *qdict) print_net_client(mon, peer); } } + } void qmp_set_link(const char *name, bool up, Error **errp) diff --git a/net/queue.c b/net/queue.c index d4fb965..ad65523 100644 --- a/net/queue.c +++ b/net/queue.c @@ -24,6 +24,7 @@ #include "net/queue.h" #include "qemu/queue.h" #include "net/net.h" +#include "qom/object.h" /* The delivery handler may only return zero if it will call * qemu_net_queue_flush() when it determines that it is once again able @@ -172,7 +173,7 @@ ssize_t qemu_net_queue_send(NetQueue *queue, { ssize_t ret; - if (queue->delivering || !qemu_can_send_packet(sender)) { + if (queue->delivering || !qemu_can_send_packet_nolock(sender)) { qemu_net_queue_append(queue, sender, flags, data, size, sent_cb); return 0; } @@ -197,7 +198,7 @@ ssize_t qemu_net_queue_send_iov(NetQueue *queue, { ssize_t ret; - if (queue->delivering || !qemu_can_send_packet(sender)) { + if (queue->delivering || !qemu_can_send_packet_nolock(sender)) { qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb); return 0; } @@ -225,6 +226,16 @@ void qemu_net_queue_purge(NetQueue *queue, NetClientState *from) } } +void qemu_net_queue_purge_all(NetQueue *queue) +{ + NetPacket *packet, *next; + + QTAILQ_FOREACH_SAFE(packet, &queue->packets, entry, next) { + QTAILQ_REMOVE(&queue->packets, packet, entry); + g_free(packet); + } +} + bool qemu_net_queue_flush(NetQueue *queue) { while (!QTAILQ_EMPTY(&queue->packets)) { -- 1.7.4.4