The commit is pushed to "branch-rh9-5.14.0-284.25.1.vz9.30.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git after rh9-5.14.0-284.25.1.vz9.30.8 ------> commit 5472b6c50d7130397c3fa0deb72579231e5506e8 Author: Alexey Kuznetsov <kuz...@virtuozzo.com> Date: Fri Oct 6 18:42:55 2023 +0800
fuse: cs acceleration for writes If we have a chain of CSes: D1 -> D2 -> Dk -> Nk+1 ... -> Nn whered k D-CSes are accessible locally and Nk+1 .. Nn are accessible only through network (some of them can happen to be local). We can fanout direct writes to D1, ... and Dk and send a write to remaining tail Nk+1 ... -> Nn. It is for user-space CS to decide when this is legal thing to do. https://pmc.acronis.work/browse/VSTOR-54040 Signed-off-by: Alexey Kuznetsov <kuz...@acronis.com> Feature: vStorage --- fs/fuse/kio/pcs/pcs_cs.c | 32 ++- fs/fuse/kio/pcs/pcs_cs.h | 2 + fs/fuse/kio/pcs/pcs_cs_accel.c | 518 +++++++++++++++++++++++++++++++++++++++-- fs/fuse/kio/pcs/pcs_map.c | 18 +- fs/fuse/kio/pcs/pcs_map.h | 16 ++ fs/fuse/kio/pcs/pcs_req.h | 38 +++ 6 files changed, 592 insertions(+), 32 deletions(-) diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c index c518cc9792a4..fffbce6110b5 100644 --- a/fs/fuse/kio/pcs/pcs_cs.c +++ b/fs/fuse/kio/pcs/pcs_cs.c @@ -291,6 +291,11 @@ void cs_log_io_times(struct pcs_int_request * ireq, struct pcs_msg * resp, unsig struct pcs_cs_iohdr * h = (struct pcs_cs_iohdr *)msg_inline_head(resp); int reqt = h->hdr.type != PCS_CS_SYNC_RESP ? ireq->iochunk.cmd : PCS_REQ_T_SYNC; + if (ireq->iochunk.parent_N && h->hdr.type != PCS_CS_READ_RESP && h->hdr.type != PCS_CS_FIEMAP_RESP) { + pcs_csa_relay_iotimes(ireq->iochunk.parent_N, h, resp->rpc->peer_id); + return; + } + fuse_stat_observe(fc, reqt, ktime_sub(ktime_get(), ireq->ts_sent)); if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) { int n = 1; @@ -611,10 +616,28 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq) BUG_ON(msg->rpc); - if (ireq->iochunk.cmd == PCS_REQ_T_READ && !((ireq->iochunk.size|ireq->iochunk.offset) & 511) && - !(ireq->flags & IREQ_F_NO_ACCEL)) { - if (pcs_csa_cs_submit(cs, ireq)) - return; + ireq->ts_sent = ktime_get(); + + if (!((ireq->iochunk.size|ireq->iochunk.offset) & 511) && !(ireq->flags & IREQ_F_NO_ACCEL)) { + if (ireq->iochunk.cmd == PCS_REQ_T_READ) { + if (pcs_csa_cs_submit(cs, ireq)) + return; + } else if (ireq->iochunk.cmd == PCS_REQ_T_WRITE) { + /* Synchronous writes in accel mode are still not supported */ + if (!(ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) && + !ireq->dentry->no_write_delay) { + struct pcs_int_request * sreq; + + sreq = pcs_csa_csl_write_submit(ireq); + if (!sreq) + return; + if (sreq != ireq) { + ireq = sreq; + cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs; + msg = &ireq->iochunk.msg; + } + } + } } msg->private = cs; @@ -686,7 +709,6 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq) msg->timeout = csl->write_timeout; else msg->timeout = csl->read_timeout; - ireq->ts_sent = ktime_get(); ireq->wait_origin.val = 0; diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h index 0564ef33514d..0b17e924a534 100644 --- a/fs/fuse/kio/pcs/pcs_cs.h +++ b/fs/fuse/kio/pcs/pcs_cs.h @@ -214,6 +214,8 @@ u32 pcs_cs_msg_size(u32 size, u32 storage_version); struct pcs_msg* pcs_alloc_cs_msg(u32 type, u32 size, u32 storage_version); int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq); +struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq); +void pcs_csa_relay_iotimes(struct pcs_int_request * ireq, struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id); void pcs_csa_cs_detach(struct pcs_cs * cs); #endif /* _PCS_CS_H_ */ diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c index fc6a35fb17eb..ae8562eff822 100644 --- a/fs/fuse/kio/pcs/pcs_cs_accel.c +++ b/fs/fuse/kio/pcs/pcs_cs_accel.c @@ -146,14 +146,9 @@ void pcs_csa_cs_detach(struct pcs_cs * cs) } } -static inline struct pcs_csa_entry * cse_lookup(struct pcs_csa_context * ctx, u64 chunk_id) +static inline struct pcs_csa_entry * __cse_lookup(struct pcs_csa_context * ctx, u64 chunk_id) { - struct pcs_csa_entry * cse; - - rcu_read_lock(); - cse= radix_tree_lookup(&ctx->tree, chunk_id); - rcu_read_unlock(); - return cse; + return radix_tree_lookup(&ctx->tree, chunk_id); } static int csa_update(struct pcs_csa_context * ctx, PCS_CHUNK_UID_T chunk_id, u32 flags, PCS_MAP_VERSION_T * vers, @@ -372,12 +367,15 @@ static void __pcs_csa_final_completion(struct pcs_aio_req *areq) if (!pcs_if_error(&ireq->error) && (ireq->flags & IREQ_F_CRYPT)) { struct pcs_cs * cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs; - struct pcs_csa_context * ctx = rcu_dereference(cs->csa_ctx); + struct pcs_csa_context * ctx; + rcu_read_lock(); + ctx = rcu_dereference(cs->csa_ctx); if (!ctx || !ctx->tfm || decrypt_data(ireq, ctx->tfm)) { ireq->error.remote = 1; ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id; ireq->error.value = PCS_ERR_IO; } + rcu_read_unlock(); } if (areq->crc) { @@ -564,7 +562,7 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum BUG_ON(parent->type != PCS_IREQ_API); ar = parent->apireq.req; - ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, 0); + ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, READ); if (!iov_iter_is_bvec(it)) { FUSE_KTRACE(ireq->cc->fc, "Not a bvec, falling back"); return -EINVAL; @@ -581,7 +579,6 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum /* One ref is ours, other is for AIO. If crc read is needed we will grab the third */ atomic_set(&areq->iocount, 2); - ireq->ts_sent = ktime_get(); ret = call_read_iter(file, iocb, it); if (unlikely(ret != -EIOCBQUEUED)) { @@ -637,28 +634,521 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq) { - struct pcs_csa_context * csa_ctx = rcu_dereference(cs->csa_ctx); + struct pcs_csa_context * csa_ctx; + + rcu_read_lock(); + csa_ctx = rcu_dereference(cs->csa_ctx); if (csa_ctx) { struct pcs_map_entry * map = ireq->iochunk.map; - struct pcs_csa_entry * csa = cse_lookup(csa_ctx, map->id); + struct pcs_csa_entry * csa; + + csa = __cse_lookup(csa_ctx, map->id); if (csa && memcmp(&ireq->iochunk.csl->version, &csa->version, sizeof(PCS_MAP_VERSION_T)) == 0 && (csa->flags & PCS_CSA_FL_READ)) { /* XXX Paranoia? Verify! */ if (!(map->state & PCS_MAP_DEAD) && map->cs_list == ireq->iochunk.csl) { + struct file * file = get_file(csa->file); + struct file * cfile = csa->cfile ? get_file(csa->cfile) : NULL; + unsigned int flags = csa->flags; + int err; + if (csa_ctx->tfm) ireq->flags |= IREQ_F_CRYPT; - if (!csa_submit(csa->file, csa->cfile, csa->flags&PCS_CSA_FL_CSUM, ireq)) + + rcu_read_unlock(); + err = csa_submit(file, cfile, flags&PCS_CSA_FL_CSUM, ireq); + fput(file); + if (cfile) + fput(cfile); + if (!err) return 1; + rcu_read_lock(); /* Clear state which could be rewritten by csa_submit */ ireq->iochunk.msg.destructor = NULL; ireq->iochunk.msg.rpc = NULL; } } } + rcu_read_unlock(); return 0; } +/* Write engine. It is similar to read, code could be merged. Actually the situation + * with nsrv=1 is just exactly the same. But yet reads can be optimized a lot better + * and we do not want to lose this advantage. + * + * Terminology: + * Original ireq - ireq which is supposed to be submitted to head of cs chain + * D-request - replicas at head of chain which have accelrated mappings and eligible + * for local aio processing + * They are presented as struct's pcs_accel_write_req which are stored + * as element of array awr[i] in struct pcs_accel_req in original ireq.iochunk.acr + * N-request - Request to be submitted to tail of cs chain following the last D-request + * It is presented as cloned original ireq with overriden completion callback, + * so that its errors and not preocessed, but copied to the original ireq + * to be processed on completion of original. + */ + +static void ireq_init_acr(struct pcs_int_request * ireq) +{ + ireq->iochunk.parent_N = NULL; + atomic_set(&ireq->iochunk.acr.iocount, 1); + ireq->iochunk.acr.num_awr = 0; + ireq->iochunk.acr.num_iotimes = 0; +} + +static void ireq_clear_acr(struct pcs_int_request * ireq) +{ + int i, n; + + for (i = 0; i < ireq->iochunk.acr.num_awr; i++) { + struct bio_vec * bvec = ireq->iochunk.acr.awr[i].bvec_copy; + if (bvec) { + for (n = ireq->iochunk.acr.awr[i].num_copy_bvecs-1; n>=0; n--) { + if (bvec[n].bv_page) + put_page(bvec[n].bv_page); + } + kfree(bvec); + ireq->iochunk.acr.awr[i].bvec_copy = NULL; + } + } + ireq->iochunk.msg.destructor = NULL; + ireq->iochunk.msg.rpc = NULL; + ireq->iochunk.parent_N = NULL; + ireq->flags |= IREQ_F_NO_ACCEL; +} + +void pcs_csa_relay_iotimes(struct pcs_int_request * ireq, struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id) +{ + int idx = ireq->iochunk.acr.num_awr; + struct pcs_cs_sync_resp * srec; + + ireq->iochunk.acr.io_times[idx].cs_id = cs_id; + ireq->iochunk.acr.io_times[idx].sync = h->sync; + + for (srec = (struct pcs_cs_sync_resp*)(h + 1), idx++; + (void*)(srec + 1) <= (void*)h + h->hdr.len && idx < PCS_MAX_ACCEL_CS; + srec++, idx++) + ireq->iochunk.acr.io_times[idx] = *srec; + + ireq->iochunk.acr.num_iotimes = idx; +} + +static void __complete_acr_work(struct work_struct * w) +{ + struct pcs_int_request * ireq = container_of(w, struct pcs_int_request, iochunk.acr.work); + + if (pcs_if_error(&ireq->error)) { + FUSE_KTRACE(ireq->cc->fc, "IO error %d %lu, ireq:%p : %llu:%u+%u", + ireq->error.value, + ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL, + ireq, (unsigned long long)ireq->iochunk.chunk, + (unsigned)ireq->iochunk.offset, + (unsigned)ireq->iochunk.size); + } else { + struct fuse_conn * fc = container_of(ireq->cc, struct pcs_fuse_cluster, cc)->fc; + + fuse_stat_observe(fc, PCS_REQ_T_WRITE, ktime_sub(ktime_get(), ireq->ts_sent)); + + if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) { + struct fuse_trace_hdr * t; + int n = ireq->iochunk.acr.num_iotimes; + + t = FUSE_TRACE_PREPARE(fc->ktrace, FUSE_KTRACE_IOTIMES, sizeof(struct fuse_tr_iotimes_hdr) + + n*sizeof(struct fuse_tr_iotimes_cs)); + if (t) { + struct fuse_tr_iotimes_hdr * th = (struct fuse_tr_iotimes_hdr *)(t + 1); + struct fuse_tr_iotimes_cs * ch = (struct fuse_tr_iotimes_cs *)(th + 1); + int i; + + th->chunk = ireq->iochunk.chunk; + th->offset = ireq->iochunk.chunk + ireq->iochunk.offset; + th->size = ireq->iochunk.size; + th->start_time = ktime_to_us(ireq->ts); + th->local_delay = ktime_to_us(ktime_sub(ireq->ts_sent, ireq->ts)); + th->lat = t->time - ktime_to_us(ireq->ts_sent); + th->ino = ireq->dentry->fileinfo.attr.id; + th->type = PCS_CS_WRITE_AL_RESP; + th->cses = n; + + for (i = 0; i < n; i++) { + ch->csid = ireq->iochunk.acr.io_times[i].cs_id.val; + ch->misc = ireq->iochunk.acr.io_times[i].sync.misc; + ch->ts_net = ireq->iochunk.acr.io_times[i].sync.ts_net; + ch->ts_io = ireq->iochunk.acr.io_times[i].sync.ts_io; + ch++; + } + } + } + FUSE_TRACE_COMMIT(fc->ktrace); + } + + ireq_clear_acr(ireq); + /* This will either complete or retry the whole request */ + ireq_complete(ireq); +} + +static inline void csa_complete_acr(struct pcs_int_request * ireq) +{ + if (atomic_dec_and_test(&ireq->iochunk.acr.iocount)) { + INIT_WORK(&ireq->iochunk.acr.work, __complete_acr_work); + queue_work(ireq->cc->wq, &ireq->iochunk.acr.work); + } +} + + +static void __pcs_csa_write_final_completion(struct pcs_accel_write_req *areq) +{ + struct pcs_int_request * ireq; + + fput(areq->iocb.ki_filp); + + ireq = container_of(areq - areq->index, struct pcs_int_request, iochunk.acr.awr[0]); + + if (!pcs_if_error(&ireq->error)) { + struct pcs_cs_sync_resp * sresp = &ireq->iochunk.acr.io_times[areq->index]; + sresp->cs_id.val = ireq->iochunk.csl->cs[areq->index].info.id.val | PCS_NODE_ALT_MASK; + sresp->sync.ts_net = 0; + sresp->sync.ts_io = ktime_to_us(ktime_get()) - sresp->sync.misc; + } + + csa_complete_acr(ireq); +} + +static void csa_write_complete_work(struct work_struct *w) +{ + struct pcs_accel_write_req * areq = container_of(w, struct pcs_accel_write_req, work); + + __pcs_csa_write_final_completion(areq); +} + +static void csa_write_complete(struct kiocb *iocb, long ret) +{ + struct pcs_accel_write_req * areq; + struct pcs_int_request * ireq; + + areq = container_of(iocb, struct pcs_accel_write_req, iocb); + ireq = container_of(areq-areq->index, struct pcs_int_request, iochunk.acr.awr[0]); + + if (ret != ireq->iochunk.size) { + if (!ireq->error.value) { + ireq->error.remote = 1; + ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id; + ireq->error.value = PCS_ERR_IO; + ireq->flags |= IREQ_F_ACCELERROR; + } + } + + if (atomic_dec_and_test(&areq->iocount)) { + INIT_WORK(&areq->work, csa_write_complete_work); + queue_work(ireq->cc->wq, &areq->work); + } +} + +static void encrypt_page_ctr(struct crypto_sync_skcipher * tfm, struct page * dst, struct page *src, + unsigned int offset, unsigned int len, u64 pos, u64 chunk_id) +{ + struct scatterlist sgi, sgo; + struct { u64 a, b; } iv; + SYNC_SKCIPHER_REQUEST_ON_STACK(req, tfm); + + skcipher_request_set_sync_tfm(req, tfm); + skcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL); + sg_init_table(&sgi, 1); + sg_init_table(&sgo, 1); + + iv.a = chunk_id; + iv.b = cpu_to_be64(pos / 16); + sg_set_page(&sgi, src, len, offset); + sg_set_page(&sgo, dst, len, offset); + skcipher_request_set_crypt(req, &sgi, &sgo, len, &iv); + crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->encrypt(req); +} + +static void encrypt_page_xts(struct crypto_sync_skcipher * tfm, struct page * dst, struct page *src, + unsigned int offset, unsigned int len, u64 pos, u64 chunk_id) +{ + struct scatterlist sgi, sgo; + struct { u64 a, b; } iv; + SYNC_SKCIPHER_REQUEST_ON_STACK(req, tfm); + + skcipher_request_set_sync_tfm(req, tfm); + skcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL); + sg_init_table(&sgi, 1); + sg_init_table(&sgo, 1); + + for ( ; len > 0; len -= 512) { + iv.a = pos / 512; + iv.b = chunk_id; + sg_set_page(&sgi, src, 512, offset); + sg_set_page(&sgo, dst, 512, offset); + skcipher_request_set_crypt(req, &sgi, &sgo, 512, &iv); + crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->encrypt(req); + pos += 512; + offset += 512; + } +} + +static int init_crypted_data(struct pcs_int_request * ireq, int idx) +{ + struct pcs_int_request *parent = ireq->completion_data.parent; + struct pcs_fuse_req * r; + struct bio_vec * bvec; + int n, nvec; + u64 pos; + u64 chunk_id; + struct pcs_csa_context * csa_ctx; + struct crypto_sync_skcipher * tfm; + + BUG_ON(parent->type != PCS_IREQ_API); + r = parent->apireq.req->datasource; + + nvec = r->exec.io.num_bvecs; + + /* XXX oops, this can sleep. tfm can be destroyed. Need refcount yet? + * Seems, not. We just have to refetch tfm from cs after allocations, + * failing if it is destroyed already. + */ + bvec = kmalloc(sizeof(struct bio_vec) * nvec, GFP_NOIO); + if (!bvec) + return -ENOMEM; + + for (n = 0; n < nvec; n++) { + bvec[n] = r->exec.io.bvec[n]; + if ((bvec[n].bv_offset|bvec[n].bv_len)&511) + goto out; + bvec[n].bv_page = alloc_page(GFP_NOIO); + if (!bvec[n].bv_page) + goto out; + } + + rcu_read_lock(); + csa_ctx = rcu_dereference(ireq->iochunk.csl->cs[idx].cslink.cs->csa_ctx); + if (!csa_ctx || ((tfm = rcu_dereference(csa_ctx->tfm)) == NULL)) { + rcu_read_unlock(); + goto out; + } + + pos = ireq->iochunk.offset; + chunk_id = ireq->iochunk.map->id; + for (n = 0; n < nvec; n++) { + if (tfm->base.base.__crt_alg->cra_priority == 400) + encrypt_page_ctr(tfm, bvec[n].bv_page, r->exec.io.bvec[n].bv_page, bvec[n].bv_offset, bvec[n].bv_len, pos, chunk_id); + else + encrypt_page_xts(tfm, bvec[n].bv_page, r->exec.io.bvec[n].bv_page, bvec[n].bv_offset, bvec[n].bv_len, pos, chunk_id); + pos += bvec[n].bv_len; + } + rcu_read_unlock(); + + ireq->iochunk.acr.awr[idx].bvec_copy = bvec; + ireq->iochunk.acr.awr[idx].num_copy_bvecs = n; + return 0; + +out: + while (--n >= 0) + put_page(bvec[n].bv_page); + kfree(bvec); + return -ENOMEM; +} + +static inline int csa_submit_write(struct file * file, struct pcs_int_request * ireq, int idx, int do_crypt) +{ + struct pcs_accel_write_req * areq = &ireq->iochunk.acr.awr[idx]; + struct kiocb * iocb = &areq->iocb; + struct iov_iter iter; + struct iov_iter * it = &iter; /* Just to use this pointer instead of &iter */ + unsigned int size = ireq->iochunk.size; + int ret; + + if (do_crypt) { + if (init_crypted_data(ireq, idx)) + return -EINVAL; + iov_iter_bvec(it, WRITE, areq->bvec_copy, areq->num_copy_bvecs, size); + } else { + struct pcs_int_request *parent = ireq->completion_data.parent; + pcs_api_iorequest_t *ar; + + areq->bvec_copy = NULL; + BUG_ON(parent->type != PCS_IREQ_API); + ar = parent->apireq.req; + ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, WRITE); + if (!iov_iter_is_bvec(it)) { + FUSE_KTRACE(ireq->cc->fc, "Not a bvec, falling back"); + return -EINVAL; + } + iov_iter_truncate(it, size); + } + + iocb->ki_pos = ireq->iochunk.offset; + iocb->ki_filp = get_file(file); + iocb->ki_complete = csa_write_complete; + iocb->ki_flags = IOCB_DIRECT; + iocb->ki_ioprio = IOPRIO_PRIO_VALUE(IOPRIO_CLASS_NONE, 0); + + /* One ref is ours, other is for AIO. */ + atomic_set(&areq->iocount, 2); + atomic_inc(&ireq->iochunk.acr.iocount); + areq->index = idx; + ireq->iochunk.acr.num_awr = idx + 1; + + ireq->iochunk.acr.io_times[idx].sync.misc = ktime_to_us(ktime_get()); + + ret = call_write_iter(file, iocb, it); + + if (unlikely(ret != -EIOCBQUEUED)) { + if (ret != size) { + if (!ireq->error.value) { + ireq->error.remote = 1; + ireq->error.offender = ireq->iochunk.csl->cs[idx].info.id; + ireq->error.value = PCS_ERR_IO; + } + + /* Do not drop refs, we do not want to complete ireq. */ + fput(areq->iocb.ki_filp); + FUSE_KTRACE(ireq->cc->fc, "AIO submit rejected ret=%d %lu, ireq:%p : %llu:%u+%u", + ret, ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL, + ireq, (unsigned long long)ireq->iochunk.chunk, + (unsigned)ireq->iochunk.offset, + (unsigned)size); + if (atomic_dec_and_test(&ireq->iochunk.acr.iocount)) + BUG(); + return ret >= 0 ? -EIO : ret; + } + + /* IO already finished. Drop AIO refcnt and proceed to crc */ + FUSE_KTRACE(ireq->cc->fc, "No good, AIO executed synchronously, ireq:%p : %llu:%u+%u", + ireq, (unsigned long long)ireq->iochunk.chunk, + (unsigned)ireq->iochunk.offset, + (unsigned)size); + + if (atomic_dec_and_test(&areq->iocount)) + BUG(); + } + + if (atomic_dec_and_test(&areq->iocount)) { + INIT_WORK(&areq->work, csa_write_complete_work); + queue_work(ireq->cc->wq, &areq->work); + } + return 0; +} + +static int csa_cs_submit_write(struct pcs_int_request * ireq, int idx) +{ + struct pcs_cs * cs = ireq->iochunk.csl->cs[idx].cslink.cs; + struct pcs_csa_context * csa_ctx; + + if (idx >= PCS_MAX_ACCEL_CS) + return 0; + + rcu_read_lock(); + csa_ctx = rcu_dereference(cs->csa_ctx); + if (csa_ctx) { + struct pcs_map_entry * map = ireq->iochunk.map; + struct pcs_csa_entry * csa = __cse_lookup(csa_ctx, map->id); + if (csa && memcmp(&ireq->iochunk.csl->version, &csa->version, sizeof(PCS_MAP_VERSION_T)) == 0 && + (csa->flags & PCS_CSA_FL_WRITE)) { + /* XXX Paranoia? Verify! */ + if (!(map->state & PCS_MAP_DEAD) && map->cs_list == ireq->iochunk.csl) { + struct file * file = get_file(csa->file); + int do_crypt = (csa_ctx->tfm != NULL); + int err; + + rcu_read_unlock(); + err = csa_submit_write(file, ireq, idx, do_crypt); + fput(file); + return !err; + } + } + } + rcu_read_unlock(); + return 0; +} + +static void complete_N_request(struct pcs_int_request * sreq) +{ + struct pcs_int_request * ireq = sreq->iochunk.parent_N; + + if (pcs_if_error(&sreq->error)) { + /* Error on N-request overrides any error on a D-request. */ + ireq->error = sreq->error; + ireq->flags |= IREQ_F_NO_ACCEL; + /* Clear ACCELERROR to deliver this error normally, through invalidating the map */ + ireq->flags &= ~IREQ_F_ACCELERROR; + } + + /* And free all clone resources */ + pcs_sreq_detach(sreq); + if (sreq->iochunk.map) + pcs_map_put(sreq->iochunk.map); + if (sreq->iochunk.csl) + cslist_put(sreq->iochunk.csl); + if (sreq->iochunk.flow) + pcs_flow_put(sreq->iochunk.flow, &sreq->cc->maps.ftab); + ireq_destroy(sreq); + + csa_complete_acr(ireq); +} + + +struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq) +{ + int idx; + struct pcs_cs_list *csl = ireq->iochunk.csl; + + if (csl->nsrv > PCS_MAX_ACCEL_CS) + return ireq; + + ireq_init_acr(ireq); + + for (idx = 0; idx < csl->nsrv; idx++) { + if (!csa_cs_submit_write(ireq, idx)) + break; + } + + if (idx == 0) { + /* Nothing was handled. Just proceed to normal submit */ + ireq_clear_acr(ireq); + return ireq; + } else if (idx >= csl->nsrv) { + /* Everything went locally. No network at all. */ + ireq->iochunk.acr.num_iotimes = idx; + csa_complete_acr(ireq); + return NULL; + } else { + /* Harder case. We have to transmit to tail replicas */ + struct pcs_int_request * sreq = pcs_ireq_split(ireq, 0, 1); + if (sreq == NULL) { + /* Some D replicas are submitted. So, we have to go + * through error cycle. + */ + ireq->error.remote = 1; + ireq->error.offender = ireq->iochunk.csl->cs[idx].info.id; + ireq->error.value = PCS_ERR_NORES; + csa_complete_acr(ireq); + return NULL; + } + + ireq->iochunk.acr.num_iotimes = idx; + + /* ireq_split does not copy size and csl */ + sreq->iochunk.size = ireq->iochunk.size; + sreq->iochunk.csl = ireq->iochunk.csl; + cslist_get(ireq->iochunk.csl); + /* Yet this sreq is not actually accounted, the accounting is made for original ireq */ + sreq->flags |= IREQ_F_NOACCT; + sreq->complete_cb = complete_N_request; + sreq->iochunk.parent_N = ireq; + sreq->iochunk.cs_index = idx; + + /* Our original iocount ref goes to N-request, + * Proceed with sending sreq to the tail of cs chain + */ + return sreq; + } +} + + static long csa_ioctl(struct file *file, unsigned int cmd, unsigned long arg) { struct pcs_csa_context *ctx = file->private_data; @@ -717,6 +1207,7 @@ static int csa_release(struct inode *inode, struct file *file) struct pcs_cs * cs; ctx->dead = 1; + rcu_read_lock(); if ((cs = rcu_dereference(ctx->cs)) != NULL) { spin_lock(&cs->lock); if (ctx->cs == cs) { @@ -728,6 +1219,7 @@ static int csa_release(struct inode *inode, struct file *file) } spin_unlock(&cs->lock); } + rcu_read_unlock(); wake_up_poll(&ctx->wqh, EPOLLHUP); pcs_csa_put(ctx); return 0; diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c index 9dc1c95733fd..634775b3cb9e 100644 --- a/fs/fuse/kio/pcs/pcs_map.c +++ b/fs/fuse/kio/pcs/pcs_map.c @@ -70,7 +70,7 @@ static inline unsigned int pcs_sync_timeout(struct pcs_cluster_core *cc) return PCS_SYNC_TIMEOUT; } -static void cslist_destroy(struct pcs_cs_list * csl) +void cslist_destroy(struct pcs_cs_list * csl) { int i; @@ -97,19 +97,6 @@ static void cslist_destroy(struct pcs_cs_list * csl) kfree(csl); } -static inline void cslist_get(struct pcs_cs_list * csl) -{ - TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt)); - - atomic_inc(&csl->refcnt); -} -static inline void cslist_put(struct pcs_cs_list * csl) -{ - TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt)); - if (atomic_dec_and_test(&csl->refcnt)) - cslist_destroy(csl); -} - static void map_drop_cslist(struct pcs_map_entry * m) { assert_spin_locked(&m->lock); @@ -1579,6 +1566,9 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err) unsigned long long match_id = 0; struct pcs_cs_list * csl, ** csl_p = 0; + if (ireq->flags & IREQ_F_NOACCT) + return; + switch (ireq->type) { case PCS_IREQ_IOCHUNK: csl_p = &ireq->iochunk.csl; diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h index e2b3c14a5b28..bfe0719eebe2 100644 --- a/fs/fuse/kio/pcs/pcs_map.h +++ b/fs/fuse/kio/pcs/pcs_map.h @@ -221,6 +221,22 @@ void ireq_drop_tokens(struct pcs_int_request * ireq); extern unsigned int cs_io_locality; +void cslist_destroy(struct pcs_cs_list * csl); + +static inline void cslist_get(struct pcs_cs_list * csl) +{ + TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt)); + + atomic_inc(&csl->refcnt); +} + +static inline void cslist_put(struct pcs_cs_list * csl) +{ + TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt)); + if (atomic_dec_and_test(&csl->refcnt)) + cslist_destroy(csl); +} + #define MAP_FMT "(%p) 0x%lld s:%x" DENTRY_FMT #define MAP_ARGS(m) (m), (long long)(m)->index, (m)->state, DENTRY_ARGS(pcs_dentry_from_map((m))) diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h index 68cf2702b2ea..8ee32b33f3f0 100644 --- a/fs/fuse/kio/pcs/pcs_req.h +++ b/fs/fuse/kio/pcs/pcs_req.h @@ -59,6 +59,40 @@ struct pcs_aio_req u32 crcb[PCS_MAX_INLINE_CRC]; }; +#define PCS_MAX_ACCEL_CS 3 + +struct pcs_accel_write_req +{ + int index; + struct kiocb iocb; + atomic_t iocount; + struct work_struct work; + + /* Crypto bits. This holds an encrypted copy of original data for use by aio writes */ + struct bio_vec *bvec_copy; + unsigned num_copy_bvecs; +}; + +struct pcs_accel_req +{ + struct pcs_int_request *parent; + atomic_t iocount; + int num_awr; + struct pcs_accel_write_req awr[PCS_MAX_ACCEL_CS]; + int num_iotimes; + struct pcs_cs_sync_resp io_times[PCS_MAX_ACCEL_CS]; + struct work_struct work; +}; + +struct pcs_iochunk_req { + struct pcs_msg msg; + struct pcs_cs_iohdr hbuf; /* Buffer for header. + * A little ugly + */ + struct kvec hbuf_kv; + struct pcs_int_request *parent_N; +}; + struct pcs_int_request { struct pcs_cluster_core* cc; @@ -84,6 +118,7 @@ struct pcs_int_request #define IREQ_F_NO_ACCEL 0x1000 #define IREQ_F_CRYPT 0x2000 #define IREQ_F_ACCELERROR 0x4000 +#define IREQ_F_NOACCT 0x8000 atomic_t iocount; @@ -141,8 +176,11 @@ struct pcs_int_request * A little ugly */ struct kvec hbuf_kv; + struct pcs_int_request *parent_N; }; + struct pcs_iochunk_req ir; struct pcs_aio_req ar; + struct pcs_accel_req acr; }; } iochunk; _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel