On Sun, Aug 21, 2022 at 07:07:29PM +0200, Alexander Bluhm wrote:
> On Fri, Aug 19, 2022 at 10:54:42PM +0200, Alexander Bluhm wrote:
> > This diff allows to run udp_input() in parallel.

Diff rebased to -current.

Index: kern/uipc_socket.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket.c,v
retrieving revision 1.284
diff -u -p -r1.284 uipc_socket.c
--- kern/uipc_socket.c  21 Aug 2022 16:22:17 -0000      1.284
+++ kern/uipc_socket.c  22 Aug 2022 12:01:58 -0000
@@ -822,10 +822,10 @@ bad:
        if (mp)
                *mp = NULL;
 
-       solock(so);
+       solock_shared(so);
 restart:
        if ((error = sblock(so, &so->so_rcv, SBLOCKWAIT(flags))) != 0) {
-               sounlock(so);
+               sounlock_shared(so);
                return (error);
        }
 
@@ -893,7 +893,7 @@ restart:
                sbunlock(so, &so->so_rcv);
                error = sbwait(so, &so->so_rcv);
                if (error) {
-                       sounlock(so);
+                       sounlock_shared(so);
                        return (error);
                }
                goto restart;
@@ -962,11 +962,11 @@ dontblock:
                        sbsync(&so->so_rcv, nextrecord);
                        if (controlp) {
                                if (pr->pr_domain->dom_externalize) {
-                                       sounlock(so);
+                                       sounlock_shared(so);
                                        error =
                                            (*pr->pr_domain->dom_externalize)
                                            (cm, controllen, flags);
-                                       solock(so);
+                                       solock_shared(so);
                                }
                                *controlp = cm;
                        } else {
@@ -1040,9 +1040,9 @@ dontblock:
                        SBLASTRECORDCHK(&so->so_rcv, "soreceive uiomove");
                        SBLASTMBUFCHK(&so->so_rcv, "soreceive uiomove");
                        resid = uio->uio_resid;
-                       sounlock(so);
+                       sounlock_shared(so);
                        uio_error = uiomove(mtod(m, caddr_t) + moff, len, uio);
-                       solock(so);
+                       solock_shared(so);
                        if (uio_error)
                                uio->uio_resid = resid - len;
                } else
@@ -1126,7 +1126,7 @@ dontblock:
                        error = sbwait(so, &so->so_rcv);
                        if (error) {
                                sbunlock(so, &so->so_rcv);
-                               sounlock(so);
+                               sounlock_shared(so);
                                return (0);
                        }
                        if ((m = so->so_rcv.sb_mb) != NULL)
@@ -1171,7 +1171,7 @@ dontblock:
                *flagsp |= flags;
 release:
        sbunlock(so, &so->so_rcv);
-       sounlock(so);
+       sounlock_shared(so);
        return (error);
 }
 
Index: kern/uipc_socket2.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket2.c,v
retrieving revision 1.127
diff -u -p -r1.127 uipc_socket2.c
--- kern/uipc_socket2.c 13 Aug 2022 21:01:46 -0000      1.127
+++ kern/uipc_socket2.c 22 Aug 2022 12:01:58 -0000
@@ -360,6 +360,24 @@ solock(struct socket *so)
        }
 }
 
+void
+solock_shared(struct socket *so)
+{
+       switch (so->so_proto->pr_domain->dom_family) {
+       case PF_INET:
+       case PF_INET6:
+               if (so->so_proto->pr_usrreqs->pru_lock != NULL) {
+                       NET_LOCK_SHARED();
+                       pru_lock(so);
+               } else
+                       NET_LOCK();
+               break;
+       default:
+               rw_enter_write(&so->so_lock);
+               break;
+       }
+}
+
 int
 solock_persocket(struct socket *so)
 {
@@ -403,6 +421,24 @@ sounlock(struct socket *so)
 }
 
 void
+sounlock_shared(struct socket *so)
+{
+       switch (so->so_proto->pr_domain->dom_family) {
+       case PF_INET:
+       case PF_INET6:
+               if (so->so_proto->pr_usrreqs->pru_unlock != NULL) {
+                       pru_unlock(so);
+                       NET_UNLOCK_SHARED();
+               } else
+                       NET_UNLOCK();
+               break;
+       default:
+               rw_exit_write(&so->so_lock);
+               break;
+       }
+}
+
+void
 soassertlocked(struct socket *so)
 {
        switch (so->so_proto->pr_domain->dom_family) {
@@ -425,7 +461,15 @@ sosleep_nsec(struct socket *so, void *id
        switch (so->so_proto->pr_domain->dom_family) {
        case PF_INET:
        case PF_INET6:
+               if (so->so_proto->pr_usrreqs->pru_unlock != NULL &&
+                   rw_status(&netlock) == RW_READ) {
+                       pru_unlock(so);
+               }
                ret = rwsleep_nsec(ident, &netlock, prio, wmesg, nsecs);
+               if (so->so_proto->pr_usrreqs->pru_lock != NULL &&
+                   rw_status(&netlock) == RW_READ) {
+                       pru_lock(so);
+               }
                break;
        default:
                ret = rwsleep_nsec(ident, &so->so_lock, prio, wmesg, nsecs);
Index: net/if_bridge.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/net/if_bridge.c,v
retrieving revision 1.364
diff -u -p -r1.364 if_bridge.c
--- net/if_bridge.c     7 Aug 2022 00:57:43 -0000       1.364
+++ net/if_bridge.c     22 Aug 2022 12:01:58 -0000
@@ -1590,7 +1590,7 @@ bridge_ipsec(struct ifnet *ifp, struct e
                            off);
                        tdb_unref(tdb);
                        if (prot != IPPROTO_DONE)
-                               ip_deliver(&m, &hlen, prot, af);
+                               ip_deliver(&m, &hlen, prot, af, 0);
                        return (1);
                } else {
                        tdb_unref(tdb);
Index: netinet/in_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
retrieving revision 1.99
diff -u -p -r1.99 in_proto.c
--- netinet/in_proto.c  15 Aug 2022 09:11:38 -0000      1.99
+++ netinet/in_proto.c  22 Aug 2022 12:01:58 -0000
@@ -185,7 +185,7 @@ const struct protosw inetsw[] = {
   .pr_type     = SOCK_DGRAM,
   .pr_domain   = &inetdomain,
   .pr_protocol = IPPROTO_UDP,
-  .pr_flags    = PR_ATOMIC|PR_ADDR|PR_SPLICE,
+  .pr_flags    = PR_ATOMIC|PR_ADDR|PR_SPLICE|PR_MPSAFE,
   .pr_input    = udp_input,
   .pr_ctlinput = udp_ctlinput,
   .pr_ctloutput        = ip_ctloutput,
Index: netinet/ip_input.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/ip_input.c,v
retrieving revision 1.380
diff -u -p -r1.380 ip_input.c
--- netinet/ip_input.c  21 Aug 2022 14:15:55 -0000      1.380
+++ netinet/ip_input.c  22 Aug 2022 12:01:58 -0000
@@ -230,6 +230,11 @@ ip_init(void)
 #endif
 }
 
+struct ip_offnxt {
+       int     ion_off;
+       int     ion_nxt;
+};
+
 /*
  * Enqueue packet for local delivery.  Queuing is used as a boundary
  * between the network layer (input/forward path) running with
@@ -246,6 +251,30 @@ ip_ours(struct mbuf **mp, int *offp, int
        if (af != AF_UNSPEC)
                return nxt;
 
+       nxt = ip_deliver(mp, offp, nxt, AF_INET, 1);
+       if (nxt == IPPROTO_DONE)
+               return IPPROTO_DONE;
+
+        /* save values for later, use after dequeue */
+       if (*offp != sizeof(struct ip)) {
+               struct m_tag *mtag;
+               struct ip_offnxt *ion;
+
+               /* mbuf tags are expensive, but only used for header options */
+               mtag = m_tag_get(PACKET_TAG_IP_OFFNXT, sizeof(*ion),
+                   M_NOWAIT);
+               if (mtag == NULL) {
+                       ipstat_inc(ips_idropped);
+                       m_freemp(mp);
+                       return IPPROTO_DONE;
+               }
+               ion = (struct ip_offnxt *)(mtag + 1);
+               ion->ion_off = *offp;
+               ion->ion_nxt = nxt;
+
+               m_tag_prepend(*mp, mtag);
+       }
+
        niq_enqueue(&ipintrq, *mp);
        *mp = NULL;
        return IPPROTO_DONE;
@@ -261,18 +290,31 @@ ipintr(void)
        struct mbuf *m;
 
        while ((m = niq_dequeue(&ipintrq)) != NULL) {
-               struct ip *ip;
+               struct m_tag *mtag;
                int off, nxt;
 
 #ifdef DIAGNOSTIC
                if ((m->m_flags & M_PKTHDR) == 0)
                        panic("ipintr no HDR");
 #endif
-               ip = mtod(m, struct ip *);
-               off = ip->ip_hl << 2;
-               nxt = ip->ip_p;
+               mtag = m_tag_find(m, PACKET_TAG_IP_OFFNXT, NULL);
+               if (mtag != NULL) {
+                       struct ip_offnxt *ion;
+
+                       ion = (struct ip_offnxt *)(mtag + 1);
+                       off = ion->ion_off;
+                       nxt = ion->ion_nxt;
+
+                       m_tag_delete(m, mtag);
+               } else {
+                       struct ip *ip;
 
-               nxt = ip_deliver(&m, &off, nxt, AF_INET);
+                       ip = mtod(m, struct ip *);
+                       off = ip->ip_hl << 2;
+                       nxt = ip->ip_p;
+               }
+
+               nxt = ip_deliver(&m, &off, nxt, AF_INET, 0);
                KASSERT(nxt == IPPROTO_DONE);
        }
 }
@@ -673,7 +715,7 @@ ip_fragcheck(struct mbuf **mp, int *offp
 #endif
 
 int
-ip_deliver(struct mbuf **mp, int *offp, int nxt, int af)
+ip_deliver(struct mbuf **mp, int *offp, int nxt, int af, int shared)
 {
        const struct protosw *psw;
        int naf = af;
@@ -681,26 +723,24 @@ ip_deliver(struct mbuf **mp, int *offp, 
        int nest = 0;
 #endif /* INET6 */
 
-       NET_ASSERT_LOCKED_EXCLUSIVE();
-
-       /* pf might have modified stuff, might have to chksum */
-       switch (af) {
-       case AF_INET:
-               in_proto_cksum_out(*mp, NULL);
-               break;
-#ifdef INET6
-       case AF_INET6:
-               in6_proto_cksum_out(*mp, NULL);
-               break;
-#endif /* INET6 */
-       }
-
        /*
         * Tell launch routine the next header
         */
        IPSTAT_INC(delivered);
 
        while (nxt != IPPROTO_DONE) {
+               switch (af) {
+               case AF_INET:
+                       psw = &inetsw[ip_protox[nxt]];
+                       break;
+#ifdef INET6
+               case AF_INET6:
+                       psw = &inet6sw[ip6_protox[nxt]];
+                       break;
+#endif /* INET6 */
+               }
+               if (shared && !ISSET(psw->pr_flags, PR_MPSAFE))
+                       break;
 #ifdef INET6
                if (af == AF_INET6 &&
                    ip6_hdrnestlimit && (++nest > ip6_hdrnestlimit)) {
@@ -737,16 +777,6 @@ ip_deliver(struct mbuf **mp, int *offp, 
                case IPPROTO_IPV6:
                        naf = AF_INET6;
                        ip6stat_inc(ip6s_delivered);
-                       break;
-#endif /* INET6 */
-               }
-               switch (af) {
-               case AF_INET:
-                       psw = &inetsw[ip_protox[nxt]];
-                       break;
-#ifdef INET6
-               case AF_INET6:
-                       psw = &inet6sw[ip6_protox[nxt]];
                        break;
 #endif /* INET6 */
                }
Index: netinet/ip_var.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/ip_var.h,v
retrieving revision 1.99
diff -u -p -r1.99 ip_var.h
--- netinet/ip_var.h    21 Aug 2022 22:45:55 -0000      1.99
+++ netinet/ip_var.h    22 Aug 2022 12:01:58 -0000
@@ -249,7 +249,7 @@ int  ip_sysctl(int *, u_int, void *, siz
 void    ip_savecontrol(struct inpcb *, struct mbuf **, struct ip *,
            struct mbuf *);
 int     ip_input_if(struct mbuf **, int *, int, int, struct ifnet *);
-int     ip_deliver(struct mbuf **, int *, int, int);
+int     ip_deliver(struct mbuf **, int *, int, int, int);
 void    ip_forward(struct mbuf *, struct ifnet *, struct rtentry *, int);
 int     rip_ctloutput(int, struct socket *, int, int, struct mbuf *);
 void    rip_init(void);
Index: netinet/udp_usrreq.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/udp_usrreq.c,v
retrieving revision 1.287
diff -u -p -r1.287 udp_usrreq.c
--- netinet/udp_usrreq.c        22 Aug 2022 10:37:27 -0000      1.287
+++ netinet/udp_usrreq.c        22 Aug 2022 12:01:58 -0000
@@ -122,10 +122,15 @@ u_int     udp_sendspace = 9216;           /* really m
 u_int  udp_recvspace = 40 * (1024 + sizeof(struct sockaddr_in));
                                        /* 40 1K datagrams */
 
+void   udp_lock(struct socket *);
+void   udp_unlock(struct socket *);
+
 const struct pr_usrreqs udp_usrreqs = {
        .pru_usrreq     = udp_usrreq,
        .pru_attach     = udp_attach,
        .pru_detach     = udp_detach,
+       .pru_lock       = udp_lock,
+       .pru_unlock     = udp_unlock,
        .pru_bind       = udp_bind,
        .pru_connect    = udp_connect,
 };
@@ -653,12 +658,17 @@ udp_sbappend(struct inpcb *inp, struct m
        }
 #endif
        m_adj(m, hlen);
+
+       mtx_enter(&inp->inp_mtx);
        if (sbappendaddr(so, &so->so_rcv, srcaddr, m, opts) == 0) {
+               mtx_leave(&inp->inp_mtx);
                udpstat_inc(udps_fullsock);
                m_freem(m);
                m_freem(opts);
                return;
        }
+       mtx_leave(&inp->inp_mtx);
+
        sorwakeup(so);
 }
 
@@ -1245,6 +1255,24 @@ udp_detach(struct socket *so)
 
        in_pcbdetach(inp);
        return (0);
+}
+
+void
+udp_lock(struct socket *so)
+{
+       struct inpcb *inp = sotoinpcb(so);
+
+       NET_ASSERT_LOCKED();
+       mtx_enter(&inp->inp_mtx);
+}
+
+void
+udp_unlock(struct socket *so)
+{
+       struct inpcb *inp = sotoinpcb(so);
+
+       NET_ASSERT_LOCKED();
+       mtx_leave(&inp->inp_mtx);
 }
 
 int
Index: netinet6/in6_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v
retrieving revision 1.110
diff -u -p -r1.110 in6_proto.c
--- netinet6/in6_proto.c        15 Aug 2022 09:11:39 -0000      1.110
+++ netinet6/in6_proto.c        22 Aug 2022 12:01:58 -0000
@@ -136,7 +136,7 @@ const struct protosw inet6sw[] = {
   .pr_type     = SOCK_DGRAM,
   .pr_domain   = &inet6domain,
   .pr_protocol = IPPROTO_UDP,
-  .pr_flags    = PR_ATOMIC|PR_ADDR|PR_SPLICE,
+  .pr_flags    = PR_ATOMIC|PR_ADDR|PR_SPLICE|PR_MPSAFE,
   .pr_input    = udp_input,
   .pr_ctlinput = udp6_ctlinput,
   .pr_ctloutput        = ip6_ctloutput,
Index: netinet6/ip6_input.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/ip6_input.c,v
retrieving revision 1.254
diff -u -p -r1.254 ip6_input.c
--- netinet6/ip6_input.c        21 Aug 2022 14:15:55 -0000      1.254
+++ netinet6/ip6_input.c        22 Aug 2022 12:01:58 -0000
@@ -190,6 +190,10 @@ ip6_ours(struct mbuf **mp, int *offp, in
        if (af != AF_UNSPEC)
                return nxt;
 
+       nxt = ip_deliver(mp, offp, nxt, AF_INET6, 1);
+       if (nxt == IPPROTO_DONE)
+               return IPPROTO_DONE;
+
        /* save values for later, use after dequeue */
        if (*offp != sizeof(struct ip6_hdr)) {
                struct m_tag *mtag;
@@ -248,7 +252,7 @@ ip6intr(void)
                        off = sizeof(struct ip6_hdr);
                        nxt = ip6->ip6_nxt;
                }
-               nxt = ip_deliver(&m, &off, nxt, AF_INET6);
+               nxt = ip_deliver(&m, &off, nxt, AF_INET6, 0);
                KASSERT(nxt == IPPROTO_DONE);
        }
 }
Index: sys/mbuf.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/sys/mbuf.h,v
retrieving revision 1.255
diff -u -p -r1.255 mbuf.h
--- sys/mbuf.h  15 Aug 2022 16:15:37 -0000      1.255
+++ sys/mbuf.h  22 Aug 2022 12:01:58 -0000
@@ -471,6 +471,8 @@ struct m_tag *m_tag_next(struct mbuf *, 
 #define PACKET_TAG_IPSEC_IN_DONE       0x0001  /* IPsec applied, in */
 #define PACKET_TAG_IPSEC_OUT_DONE      0x0002  /* IPsec applied, out */
 #define PACKET_TAG_IPSEC_FLOWINFO      0x0004  /* IPsec flowinfo */
+#define PACKET_TAG_IP_OFFNXT           0x0010  /* IPv4 offset and next proto */
+#define PACKET_TAG_IP6_OFFNXT          0x0020  /* IPv6 offset and next proto */
 #define PACKET_TAG_WIREGUARD           0x0040  /* WireGuard data */
 #define PACKET_TAG_GRE                 0x0080  /* GRE processing done */
 #define PACKET_TAG_DLT                 0x0100 /* data link layer type */
@@ -479,7 +481,6 @@ struct m_tag *m_tag_next(struct mbuf *, 
 #define PACKET_TAG_SRCROUTE            0x1000 /* IPv4 source routing options */
 #define PACKET_TAG_TUNNEL              0x2000  /* Tunnel endpoint address */
 #define PACKET_TAG_CARP_BAL_IP         0x4000  /* carp(4) ip balanced marker */
-#define PACKET_TAG_IP6_OFFNXT          0x8000  /* IPv6 offset and next proto */
 
 #define MTAG_BITS \
     ("\20\1IPSEC_IN_DONE\2IPSEC_OUT_DONE\3IPSEC_FLOWINFO" \
Index: sys/protosw.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/sys/protosw.h,v
retrieving revision 1.41
diff -u -p -r1.41 protosw.h
--- sys/protosw.h       22 Aug 2022 08:08:47 -0000      1.41
+++ sys/protosw.h       22 Aug 2022 12:01:58 -0000
@@ -66,6 +66,8 @@ struct pr_usrreqs {
 
        int     (*pru_attach)(struct socket *, int);
        int     (*pru_detach)(struct socket *);
+       void    (*pru_lock)(struct socket *);
+       void    (*pru_unlock)(struct socket *);
        int     (*pru_bind)(struct socket *, struct mbuf *, struct proc *);
        int     (*pru_listen)(struct socket *);
        int     (*pru_connect)(struct socket *, struct mbuf *);
@@ -116,6 +118,7 @@ struct protosw {
 #define        PR_ABRTACPTDIS  0x20            /* abort on accept(2) to 
disconnected
                                           socket */
 #define        PR_SPLICE       0x40            /* socket splicing is possible 
*/
+#define        PR_MPSAFE       0x80            /* input runs with shared 
netlock */
 
 /*
  * The arguments to usrreq are:
@@ -263,6 +266,18 @@ static inline int
 pru_detach(struct socket *so)
 {
        return (*so->so_proto->pr_usrreqs->pru_detach)(so);
+}
+
+static inline void
+pru_lock(struct socket *so)
+{
+       (*so->so_proto->pr_usrreqs->pru_lock)(so);
+}
+
+static inline void
+pru_unlock(struct socket *so)
+{
+       (*so->so_proto->pr_usrreqs->pru_unlock)(so);
 }
 
 static inline int
Index: sys/socketvar.h
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/sys/socketvar.h,v
retrieving revision 1.108
diff -u -p -r1.108 socketvar.h
--- sys/socketvar.h     21 Aug 2022 16:22:18 -0000      1.108
+++ sys/socketvar.h     22 Aug 2022 12:01:58 -0000
@@ -349,9 +349,11 @@ int        sockargs(struct mbuf **, const void 
 
 int    sosleep_nsec(struct socket *, void *, int, const char *, uint64_t);
 void   solock(struct socket *);
+void   solock_shared(struct socket *);
 int    solock_persocket(struct socket *);
 void   solock_pair(struct socket *, struct socket *);
 void   sounlock(struct socket *);
+void   sounlock_shared(struct socket *);
 
 int    sendit(struct proc *, int, struct msghdr *, int, register_t *);
 int    recvit(struct proc *, int, struct msghdr *, caddr_t, register_t *);

Reply via email to