This updates brings new features to the following supported: * unified cache to store netchannels (IPv4 and stub for fied cache to store netchannels (IPv4 and stub for IPv6 hashes, TCP and UDP) * skb queueing mechanism * netchannel creation/removing/reading commands * netchannel's callback to allocate/free pages (for example to get data from mapped area) not only from SLAB cache * netchannel's callback to move/copy data to userspace
Added: * memory limits (soft limits, since update is not protected). * blocking reading. * two types of data reading backends (copy_to_user(), copy to (could be mapped) area). Patch against previous release is attached. Userspace application, design and implementation notes, full patchsets can be found at project homepage [1]. 1. Network channel homepage. http://tservice.net.ru/~s0mbre/old/?section=projects&item=netchannel I would like to rise a question about how netchannel object should be handled by system in general, i.e. should netchannels be associated with process or they should live by themselfs, i.e. like routes? My implementation allows netchannels to be setup permanently, so process can exit and then new one can bind to existing netchannel and read it's data, but it requires some tricks to create mapping of it's pages into process' context... Also if netchannel is created, but no process is associated with it, who will process protocol state machine? Signed-off-by: Evgeniy Polyakov <[EMAIL PROTECTED]> diff --git a/include/linux/netchannel.h b/include/linux/netchannel.h index e87a148..7ab2fa0 100644 --- a/include/linux/netchannel.h +++ b/include/linux/netchannel.h @@ -32,13 +32,20 @@ enum netchannel_commands { NETCHANNEL_DUMP, }; +enum netchannel_type { + NETCHANNEL_COPY_USER = 0, + NETCHANNEL_MMAP, + NETCHANEL_VM_HACK, +}; + struct unetchannel { __u32 src, dst; /* source/destination hashes */ __u16 sport, dport; /* source/destination ports */ __u8 proto; /* IP protocol number */ - __u8 listen; - __u8 reserved[2]; + __u8 type; /* Netchannel type */ + __u8 memory_limit_order; /* Memor limit order */ + __u8 reserved; }; struct unetchannel_control @@ -46,6 +53,8 @@ struct unetchannel_control struct unetchannel unc; __u32 cmd; __u32 len; + __u32 flags; + __u32 timeout; }; #ifdef __KERNEL__ @@ -60,9 +69,14 @@ struct netchannel struct page * (*nc_alloc_page)(unsigned int size); void (*nc_free_page)(struct page *page); - int (*nc_read_data)(struct netchannel *, unsigned int *len, void __user *arg); + int (*nc_read_data)(struct netchannel *, unsigned int *timeout, unsigned int *len, void *arg); + + struct sk_buff_head recv_queue; + wait_queue_head_t wait; + + unsigned int qlen; - struct sk_buff_head list; + void *priv; }; struct netchannel_cache_head @@ -71,5 +85,15 @@ struct netchannel_cache_head struct mutex mutex; }; +#define NETCHANNEL_MAX_ORDER 32 +#define NETCHANNEL_MIN_ORDER PAGE_SHIFT + +struct netchannel_mmap +{ + struct page **page; + unsigned int pnum; + unsigned int poff; +}; + #endif /* __KERNEL__ */ #endif /* __NETCHANNEL_H */ diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h index accd00b..ba82aa2 100644 --- a/include/linux/skbuff.h +++ b/include/linux/skbuff.h @@ -301,7 +301,6 @@ struct sk_buff { * Handling routines are only of interest to the kernel */ #include <linux/slab.h> -#include <linux/netchannel.h> #include <asm/system.h> @@ -316,10 +315,11 @@ static inline struct sk_buff *alloc_skb( } #ifdef CONFIG_NETCHANNEL +struct unetchannel; extern struct sk_buff *netchannel_alloc(struct unetchannel *unc, unsigned int header_size, unsigned int total_size, gfp_t gfp_mask); #else -static struct sk_buff *netchannel_alloc(struct unetchannel *unc, unsigned int header_size, +static struct sk_buff *netchannel_alloc(void *unc, unsigned int header_size, unsigned int total_size, gfp_t gfp_mask) { return NULL; diff --git a/net/core/netchannel.c b/net/core/netchannel.c index 169a764..96e5e5b 100644 --- a/net/core/netchannel.c +++ b/net/core/netchannel.c @@ -27,6 +27,8 @@ #include <linux/slab.h> #include <linux/skbuff.h> #include <linux/errno.h> +#include <linux/highmem.h> +#include <linux/netchannel.h> #include <linux/in.h> #include <linux/ip.h> @@ -127,6 +129,7 @@ static void netchannel_free_rcu(struct r { struct netchannel *nc = container_of(rcu, struct netchannel, rcu_head); + netchannel_cleanup(nc); kmem_cache_free(netchannel_cache, nc); } @@ -151,8 +154,10 @@ static inline void netchannel_dump_info_ dport = ntohs(unc->dport); sport = ntohs(unc->sport); - printk(KERN_INFO "netchannel: %s %u.%u.%u.%u:%u -> %u.%u.%u.%u:%u, proto: %u, hit: %lu, err: %d.\n", - prefix, NIPQUAD(src), sport, NIPQUAD(dst), dport, unc->proto, hit, err); + printk(KERN_NOTICE "netchannel: %s %u.%u.%u.%u:%u -> %u.%u.%u.%u:%u, " + "proto: %u, type: %u, order: %u, hit: %lu, err: %d.\n", + prefix, NIPQUAD(src), sport, NIPQUAD(dst), dport, + unc->proto, unc->type, unc->memory_limit_order, hit, err); } static int netchannel_convert_skb_ipv6(struct sk_buff *skb, struct unetchannel *unc) @@ -197,7 +202,7 @@ static int netchannel_convert_skb_ipv4(s len = skb->len; - skb->h.raw = skb->nh.iph + iph->ihl*4; + skb->h.raw = skb->nh.raw + iph->ihl*4; switch (unc->proto) { case IPPROTO_TCP: @@ -352,35 +357,91 @@ int netchannel_recv(struct sk_buff *skb) nc->hit++; - skb_queue_tail(&nc->list, skb); + if (nc->qlen + skb->len > (1 << nc->unc.memory_limit_order)) { + kfree_skb(skb); + err = 0; + goto unlock; + } + + skb_queue_tail(&nc->recv_queue, skb); + nc->qlen += skb->len; unlock: rcu_read_unlock(); return err; } +static int netchannel_wait_for_packet(struct netchannel *nc, long *timeo_p) +{ + int error = 0; + DEFINE_WAIT(wait); + + prepare_to_wait_exclusive(&nc->wait, &wait, TASK_INTERRUPTIBLE); + + if (skb_queue_empty(&nc->recv_queue)) { + if (signal_pending(current)) + goto interrupted; + + *timeo_p = schedule_timeout(*timeo_p); + } +out: + finish_wait(&nc->wait, &wait); + return error; +interrupted: + error = (*timeo_p == MAX_SCHEDULE_TIMEOUT) ? -ERESTARTSYS : -EINTR; + goto out; +} + +static struct sk_buff *netchannel_get_skb(struct netchannel *nc, unsigned int *timeout, int *error) +{ + struct sk_buff *skb = NULL; + long tm = *timeout; + + *error = 0; + + while (1) { + skb = skb_dequeue(&nc->recv_queue); + if (skb) + break; + + if (*timeout) { + *error = netchannel_wait_for_packet(nc, &tm); + if (*error) { + *timeout = tm; + break; + } + tm = *timeout; + } else { + *error = -EAGAIN; + break; + } + } + + return skb; +} + /* * Actually it should be something like recvmsg(). */ -static int netchannel_copy_to_user(struct netchannel *nc, unsigned int *len, void __user *arg) +static int netchannel_copy_to_user(struct netchannel *nc, unsigned int *timeout, unsigned int *len, void *arg) { unsigned int copied; struct sk_buff *skb; struct iovec to; - int err = -EINVAL; - - to.iov_base = arg; - to.iov_len = *len; + int err; - skb = skb_dequeue(&nc->list); + skb = netchannel_get_skb(nc, timeout, &err); if (!skb) - return -EAGAIN; + return err; + + to.iov_base = arg; + to.iov_len = *len; copied = skb->len; if (copied > *len) copied = *len; - - if (skb->ip_summed==CHECKSUM_UNNECESSARY) { + + if (skb->ip_summed == CHECKSUM_UNNECESSARY) { err = skb_copy_datagram_iovec(skb, 0, &to, copied); } else { err = skb_copy_and_csum_datagram_iovec(skb,0, &to); @@ -388,56 +449,290 @@ static int netchannel_copy_to_user(struc *len = (err == 0)?copied:0; + nc->qlen -= skb->len; kfree_skb(skb); return err; } -static int netchannel_create(struct unetchannel *unc) +int netchannel_skb_copy_datagram(const struct sk_buff *skb, int offset, + void *to, int len) { - struct netchannel *nc; - int err = -ENOMEM; - struct netchannel_cache_head *bucket; + int start = skb_headlen(skb); + int i, copy = start - offset; + + /* Copy header. */ + if (copy > 0) { + if (copy > len) + copy = len; + memcpy(to, skb->data + offset, copy); + + if ((len -= copy) == 0) + return 0; + offset += copy; + to += copy; + } + + /* Copy paged appendix. Hmm... why does this look so complicated? */ + for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) { + int end; + + BUG_TRAP(start <= offset + len); + + end = start + skb_shinfo(skb)->frags[i].size; + if ((copy = end - offset) > 0) { + u8 *vaddr; + skb_frag_t *frag = &skb_shinfo(skb)->frags[i]; + struct page *page = frag->page; + + if (copy > len) + copy = len; + vaddr = kmap(page); + memcpy(to, vaddr + frag->page_offset + + offset - start, copy); + kunmap(page); + if (!(len -= copy)) + return 0; + offset += copy; + to += copy; + } + start = end; + } + + if (skb_shinfo(skb)->frag_list) { + struct sk_buff *list = skb_shinfo(skb)->frag_list; + + for (; list; list = list->next) { + int end; + + BUG_TRAP(start <= offset + len); + + end = start + list->len; + if ((copy = end - offset) > 0) { + if (copy > len) + copy = len; + if (netchannel_skb_copy_datagram(list, + offset - start, + to, copy)) + goto fault; + if ((len -= copy) == 0) + return 0; + offset += copy; + to += copy; + } + start = end; + } + } + if (!len) + return 0; + +fault: + return -EFAULT; +} + +static int netchannel_copy_to_mem(struct netchannel *nc, unsigned int *timeout, unsigned int *len, void *arg) +{ + struct netchannel_mmap *m = nc->priv; + unsigned int copied, skb_offset = 0; + struct sk_buff *skb; + int err; + + skb = netchannel_get_skb(nc, timeout, &err); + if (!skb) + return err; + + copied = skb->len; + + while (copied) { + int pnum = ((m->poff % PAGE_SIZE) % m->pnum); + struct page *page = m->page[pnum]; + void *page_map, *ptr; + unsigned int sz, left; + + left = PAGE_SIZE - (m->poff % (PAGE_SIZE - 1)); + sz = min_t(unsigned int, left, copied); + + if (!sz) { + err = -ENOSPC; + goto err_out; + } + + page_map = kmap_atomic(page, KM_USER0); + if (!page_map) { + err = -ENOMEM; + goto err_out; + } + ptr = page_map + (m->poff % (PAGE_SIZE - 1)); + + err = netchannel_skb_copy_datagram(skb, skb_offset, ptr, sz); + if (err) { + kunmap_atomic(page_map, KM_USER0); + goto err_out; + } + kunmap_atomic(page_map, KM_USER0); + + copied -= sz; + m->poff += sz; + skb_offset += sz; +#if 1 + if (m->poff >= PAGE_SIZE * m->pnum) { + //netchannel_dump_info_unc(&nc->unc, "rewind", nc->hit, 0); + m->poff = 0; + } +#endif + } + *len = skb->len; + + err = 0; + +err_out: + nc->qlen -= skb->len; + kfree_skb(skb); + + return err; +} + +static int netchannel_mmap_setup(struct netchannel *nc) +{ + struct netchannel_mmap *m; + unsigned int i, pnum; + + pnum = (1 << (nc->unc.memory_limit_order - NETCHANNEL_MIN_ORDER)); + + m = kzalloc(sizeof(struct netchannel_mmap) + sizeof(struct page *) * pnum, GFP_KERNEL); + if (!m) + return -ENOMEM; + + m->page = (struct page **)(m + 1); + m->pnum = pnum; + + for (i=0; i<pnum; ++i) { + m->page[i] = alloc_page(GFP_KERNEL); + if (!m->page[i]) + break; + } + + if (i < pnum) { + pnum = i; + goto err_out_free; + } + + nc->priv = m; + nc->nc_read_data = &netchannel_copy_to_mem; + + return 0; + +err_out_free: + for (i=0; i<pnum; ++i) + __free_page(m->page[i]); + + kfree(m); + + return -ENOMEM; - if (!netchannel_hash_table) - return -ENODEV; +} - bucket = netchannel_bucket(unc); +static void netchannel_mmap_cleanup(struct netchannel *nc) +{ + unsigned int i; + struct netchannel_mmap *m = nc->priv; - mutex_lock(&bucket->mutex); + for (i=0; i<m->pnum; ++i) + __free_page(m->page[i]); - if (netchannel_check_full(unc, bucket)) { - err = -EEXIST; - goto out_unlock; + kfree(m); +} + +static void netchannel_cleanup(struct netchannel *nc) +{ + switch (nc->unc.type) { + case NETCHANNEL_COPY_USER: + break; + case NETCHANNEL_MMAP: + netchannel_mmap_cleanup(nc); + break; + default: + break; } +} - if (unc->listen && netchannel_check_dest(unc, bucket)) { - err = -EEXIST; - goto out_unlock; +static int netchannel_setup(struct netchannel *nc) +{ + int ret = 0; + + if (nc->unc.memory_limit_order > NETCHANNEL_MAX_ORDER) + return -E2BIG; + + if (nc->unc.memory_limit_order < NETCHANNEL_MIN_ORDER) + nc->unc.memory_limit_order = NETCHANNEL_MIN_ORDER; + + switch (nc->unc.type) { + case NETCHANNEL_COPY_USER: + nc->nc_read_data = &netchannel_copy_to_user; + break; + case NETCHANNEL_MMAP: + ret = netchannel_mmap_setup(nc); + break; + default: + ret = -EINVAL; + break; } + return ret; +} + +static int netchannel_create(struct unetchannel *unc) +{ + struct netchannel *nc; + int err = -ENOMEM; + struct netchannel_cache_head *bucket; + + if (!netchannel_hash_table) + return -ENODEV; + nc = kmem_cache_alloc(netchannel_cache, GFP_KERNEL); if (!nc) - goto out_exit; + return -ENOMEM; memset(nc, 0, sizeof(struct netchannel)); nc->hit = 0; - skb_queue_head_init(&nc->list); + skb_queue_head_init(&nc->recv_queue); + init_waitqueue_head(&nc->wait); atomic_set(&nc->refcnt, 1); memcpy(&nc->unc, unc, sizeof(struct unetchannel)); - nc->nc_read_data = &netchannel_copy_to_user; + err = netchannel_setup(nc); + if (err) + goto err_out_free; + + bucket = netchannel_bucket(unc); + + mutex_lock(&bucket->mutex); + + if (netchannel_check_full(unc, bucket)) { + err = -EEXIST; + goto err_out_unlock; + } hlist_add_head_rcu(&nc->node, &bucket->head); err = 0; -out_unlock: mutex_unlock(&bucket->mutex); -out_exit: + netchannel_dump_info_unc(unc, "create", 0, err); return err; + +err_out_unlock: + mutex_unlock(&bucket->mutex); + + netchannel_cleanup(nc); + +err_out_free: + kmem_cache_free(netchannel_cache, nc); + + return err; } static int netchannel_remove(struct unetchannel *unc) @@ -488,11 +783,17 @@ static int netchannel_recv_data(struct u nc = netchannel_check_dest(&ctl->unc, bucket); if (!nc) - goto out_unlock; + goto err_out_unlock; + + netchannel_get(nc); + mutex_unlock(&bucket->mutex); - ret = nc->nc_read_data(nc, &ctl->len, data); + ret = nc->nc_read_data(nc, &ctl->timeout, &ctl->len, data); + + netchannel_put(nc); + return ret; -out_unlock: +err_out_unlock: mutex_unlock(&bucket->mutex); return ret; } -- Evgeniy Polyakov - To unsubscribe from this list: send the line "unsubscribe netdev" in the body of a message to [EMAIL PROTECTED] More majordomo info at http://vger.kernel.org/majordomo-info.html