On Wed, Jan 24, 2018 at 12:45 PM, Sowmini Varadhan <sowmini.varad...@oracle.com> wrote: > RDS removes a datagram (rds_message) from the retransmit queue when > an ACK is received. The ACK indicates that the receiver has queued > the RDS datagram, so that the sender can safely forget the datagram. > When all references to the rds_message are quiesced, rds_message_purge > is called to release resources used by the rds_message > > If the datagram to be removed had pinned pages set up, add > an entry to the rs->rs_znotify_queue so that the notifcation > will be sent up via rds_rm_zerocopy_callback() when the > rds_message is eventually freed by rds_message_purge. > > rds_rm_zerocopy_callback() attempts to batch the number of cookies > sent with each notification to a max of SO_EE_ORIGIN_MAX_ZCOOKIES. > Each time a cookie is released by rds_message_purge(), the > rs_znotify_queue is checked to see if the MAX_ZCOOKIES batch limit > has been exceeded (in which case we send up a notification). If the > limit has not been exceeded, the cookie is added to the rs_znotify_queue > and a timer is set up
An alternative that does not require a timer is to batch on the sk error queue itself, like tcp zerocopy. That queues the first notification skb on the error queue without any notification latency. Then, if a subsequent notification comes in while another is pending with < MAX zcookies, it coalesces the new notification onto the pending skb and consumes the other. For RDS notifications, the implementation is an extra skb_put + uint32_t assignment. Optionally, the socket can trigger another sk_error_report on each new notification. > +static void rds_rm_zerocopy_callback(struct rds_sock *rs, > + struct rds_znotifier *znotifier, > + bool force) > +{ > + struct sock *sk = rds_rs_to_sk(rs); > + struct sk_buff *skb; > + struct sock_exterr_skb *serr; > + unsigned long flags; > + u32 *ptr; > + int ncookies = 0, i; > + struct rds_znotifier *znotif, *ztmp, *first; > + LIST_HEAD(tmp_list); > + > + spin_lock_irqsave(&rs->rs_lock, flags); > + ncookies = rs->rs_ncookies; > + if (ncookies < SO_EE_ORIGIN_MAX_ZCOOKIES && !force) { > + if (znotifier) { /* add this cookie to the list and return */ can be checked before taking lock. More importantly, when is this ever NULL? This function is a callback for a zerocopy struct of type znotifier. Is it doing double duty to flush any outstanding if znotifier == NULL && force == true? If so, the first condition probably never occurs unless force == true and thus the second is redundant. > + list_add_tail(&znotifier->z_list, > + &rs->rs_znotify_queue); > + rs->rs_ncookies++; > + } > + spin_unlock_irqrestore(&rs->rs_lock, flags); > + return; > + } > + if (!ncookies) { /* timer finds a reaped list */ > + spin_unlock_irqrestore(&rs->rs_lock, flags); > + return; > + } > + /* reap existing cookie list if we have hit the max, then add > + * new cookie to the list for next round of reaping. > + */ > + list_splice(&rs->rs_znotify_queue, &tmp_list); /* reap now */ > + INIT_LIST_HEAD(&rs->rs_znotify_queue); > + rs->rs_ncookies = 0; > + if (znotifier) { /* for next round */ This adds unnecessary notification latency to delivery of current notification. The latest notification can be appended to tmp_list and sent up immediately. > + list_add_tail(&znotifier->z_list, &rs->rs_znotify_queue); > + rs->rs_ncookies++; > + } > + spin_unlock_irqrestore(&rs->rs_lock, flags); > + > + first = list_first_entry(&tmp_list, struct rds_znotifier, z_list); > + znotif = list_next_entry(first, z_list); > + list_del(&first->z_list); > + > + skb = rds_skb_from_znotifier(first); > + ptr = skb_put(skb, ncookies * sizeof(u32)); > + i = 0; > + ptr[i++] = first->z_cookie; > + > + list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list) { > + list_del(&znotif->z_list); > + ptr[i++] = znotif->z_cookie; > + mm_unaccount_pinned_pages(&znotif->z_mmp); > + consume_skb(rds_skb_from_znotifier(znotif)); > + } > + WARN_ON(!list_empty(&tmp_list)); > + > + serr = SKB_EXT_ERR(skb); > + serr->ee.ee_errno = 0; > + serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE; > + serr->ee.ee_data = ncookies; > + serr->ee.ee_info = 0; > + serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED; > + > + if (sock_queue_err_skb(sk, skb)) > + consume_skb(skb); > +} > + > +void rs_zcopy_notify(struct timer_list *t) > +{ > + struct rds_sock *rs = from_timer(rs, t, rs_cookie_timer); > + > + rds_rm_zerocopy_callback(rs, NULL, true); > +} > + > /* > * This relies on dma_map_sg() not touching sg[].page during merging. > */ > static void rds_message_purge(struct rds_message *rm) > { > unsigned long i, flags; > + bool zcopy = false; > > if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags))) > return; > > + spin_lock_irqsave(&rm->m_rs_lock, flags); > + if (rm->data.op_mmp_znotifier && rm->m_rs) { > + struct rds_sock *rs = rm->m_rs; > + > + zcopy = true; > + rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier, > false); > + rm->data.op_mmp_znotifier = NULL; > + (void)mod_timer(&rs->rs_cookie_timer, RDS_REAP_TIMEOUT); Resetting timeout on each queued notification causes unbound notification latency for previous notifications on the queue. > + > + sock_put(rds_rs_to_sk(rs)); > + rm->m_rs = NULL; These two lines are now called only if znotifier is true, but used to be called whenever rm->m_rs != NULL. Intentional? > + } > + spin_unlock_irqrestore(&rm->m_rs_lock, flags); > + > for (i = 0; i < rm->data.op_nents; i++) { > rdsdebug("putting data page %p\n", (void > *)sg_page(&rm->data.op_sg[i])); > /* XXX will have to put_page for page refs */ > - __free_page(sg_page(&rm->data.op_sg[i])); > + if (!zcopy) > + __free_page(sg_page(&rm->data.op_sg[i])); > + else > + put_page(sg_page(&rm->data.op_sg[i])); > } > rm->data.op_nents = 0; > - spin_lock_irqsave(&rm->m_rs_lock, flags); > - if (rm->m_rs) { > - sock_put(rds_rs_to_sk(rm->m_rs)); > - rm->m_rs = NULL; > - } > - spin_unlock_irqrestore(&rm->m_rs_lock, flags); > > if (rm->rdma.op_active) > rds_rdma_free_op(&rm->rdma);