If we have a chain of CSes:

D1 -> D2 -> Dk -> Nk+1 ... -> Nn

whered k D-CSes are accessible locally and Nk+1 .. Nn are accessible
only through network (some of them can happen to be local).

We can fanout direct writes to D1, ... and Dk and send a write to remaining tail
Nk+1 ... -> Nn.

It is for user-space CS to decide when this is legal thing to do.

Signed-off-by: Alexey Kuznetsov <kuz...@acronis.com>
---
 fs/fuse/kio/pcs/pcs_cs.c       |  32 ++-
 fs/fuse/kio/pcs/pcs_cs.h       |   2 +
 fs/fuse/kio/pcs/pcs_cs_accel.c | 518 +++++++++++++++++++++++++++++++++++++++--
 fs/fuse/kio/pcs/pcs_map.c      |  18 +-
 fs/fuse/kio/pcs/pcs_map.h      |  16 ++
 fs/fuse/kio/pcs/pcs_req.h      |  38 +++
 6 files changed, 592 insertions(+), 32 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index c518cc9..fffbce6 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -291,6 +291,11 @@ void cs_log_io_times(struct pcs_int_request * ireq, struct 
pcs_msg * resp, unsig
        struct pcs_cs_iohdr * h = (struct pcs_cs_iohdr *)msg_inline_head(resp);
        int reqt = h->hdr.type != PCS_CS_SYNC_RESP ? ireq->iochunk.cmd : 
PCS_REQ_T_SYNC;
 
+       if (ireq->iochunk.parent_N && h->hdr.type != PCS_CS_READ_RESP && 
h->hdr.type != PCS_CS_FIEMAP_RESP) {
+               pcs_csa_relay_iotimes(ireq->iochunk.parent_N, h, 
resp->rpc->peer_id);
+               return;
+       }
+
        fuse_stat_observe(fc, reqt, ktime_sub(ktime_get(), ireq->ts_sent));
        if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) {
                int n = 1;
@@ -611,10 +616,28 @@ void pcs_cs_submit(struct pcs_cs *cs, struct 
pcs_int_request *ireq)
 
        BUG_ON(msg->rpc);
 
-       if (ireq->iochunk.cmd == PCS_REQ_T_READ && 
!((ireq->iochunk.size|ireq->iochunk.offset) & 511) &&
-           !(ireq->flags & IREQ_F_NO_ACCEL)) {
-               if (pcs_csa_cs_submit(cs, ireq))
-                       return;
+       ireq->ts_sent = ktime_get();
+
+       if (!((ireq->iochunk.size|ireq->iochunk.offset) & 511) && !(ireq->flags 
& IREQ_F_NO_ACCEL)) {
+               if (ireq->iochunk.cmd == PCS_REQ_T_READ) {
+                       if (pcs_csa_cs_submit(cs, ireq))
+                               return;
+               } else if (ireq->iochunk.cmd == PCS_REQ_T_WRITE) {
+                       /* Synchronous writes in accel mode are still not 
supported */
+                       if (!(ireq->dentry->fileinfo.attr.attrib & 
PCS_FATTR_IMMEDIATE_WRITE) &&
+                           !ireq->dentry->no_write_delay) {
+                               struct pcs_int_request * sreq;
+
+                               sreq = pcs_csa_csl_write_submit(ireq);
+                               if (!sreq)
+                                       return;
+                               if (sreq != ireq) {
+                                       ireq = sreq;
+                                       cs = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
+                                       msg = &ireq->iochunk.msg;
+                               }
+                       }
+               }
        }
 
        msg->private = cs;
@@ -686,7 +709,6 @@ void pcs_cs_submit(struct pcs_cs *cs, struct 
pcs_int_request *ireq)
                msg->timeout = csl->write_timeout;
        else
                msg->timeout = csl->read_timeout;
-       ireq->ts_sent = ktime_get();
        ireq->wait_origin.val = 0;
 
 
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index 0564ef3..0b17e924a 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -214,6 +214,8 @@ static inline bool cs_is_blacklisted(struct pcs_cs *cs)
 struct pcs_msg* pcs_alloc_cs_msg(u32 type, u32 size, u32 storage_version);
 
 int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq);
+struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * 
ireq);
+void pcs_csa_relay_iotimes(struct pcs_int_request * ireq,  struct pcs_cs_iohdr 
* h, PCS_NODE_ID_T cs_id);
 void pcs_csa_cs_detach(struct pcs_cs * cs);
 
 #endif /* _PCS_CS_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
index fc6a35f..ae8562e 100644
--- a/fs/fuse/kio/pcs/pcs_cs_accel.c
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -146,14 +146,9 @@ void pcs_csa_cs_detach(struct pcs_cs * cs)
        }
 }
 
-static inline struct pcs_csa_entry * cse_lookup(struct pcs_csa_context * ctx, 
u64 chunk_id)
+static inline struct pcs_csa_entry * __cse_lookup(struct pcs_csa_context * 
ctx, u64 chunk_id)
 {
-       struct pcs_csa_entry * cse;
-
-       rcu_read_lock();
-       cse= radix_tree_lookup(&ctx->tree, chunk_id);
-       rcu_read_unlock();
-       return cse;
+       return radix_tree_lookup(&ctx->tree, chunk_id);
 }
 
 static int csa_update(struct pcs_csa_context * ctx, PCS_CHUNK_UID_T chunk_id, 
u32 flags, PCS_MAP_VERSION_T * vers,
@@ -372,12 +367,15 @@ static void __pcs_csa_final_completion(struct pcs_aio_req 
*areq)
 
        if (!pcs_if_error(&ireq->error) && (ireq->flags & IREQ_F_CRYPT)) {
                struct pcs_cs * cs = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
-               struct pcs_csa_context * ctx = rcu_dereference(cs->csa_ctx);
+               struct pcs_csa_context * ctx;
+               rcu_read_lock();
+               ctx = rcu_dereference(cs->csa_ctx);
                if (!ctx || !ctx->tfm || decrypt_data(ireq, ctx->tfm)) {
                        ireq->error.remote = 1;
                        ireq->error.offender = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
                        ireq->error.value = PCS_ERR_IO;
                }
+               rcu_read_unlock();
        }
 
        if (areq->crc) {
@@ -564,7 +562,7 @@ static inline int csa_submit(struct file * file, struct 
file *cfile, int do_csum
        BUG_ON(parent->type != PCS_IREQ_API);
        ar = parent->apireq.req;
 
-       ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, 0);
+       ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, READ);
        if (!iov_iter_is_bvec(it)) {
                FUSE_KTRACE(ireq->cc->fc, "Not a bvec, falling back");
                return -EINVAL;
@@ -581,7 +579,6 @@ static inline int csa_submit(struct file * file, struct 
file *cfile, int do_csum
        /* One ref is ours, other is for AIO. If crc read is needed we will 
grab the third */
        atomic_set(&areq->iocount, 2);
 
-       ireq->ts_sent = ktime_get();
        ret = call_read_iter(file, iocb, it);
 
        if (unlikely(ret != -EIOCBQUEUED)) {
@@ -637,28 +634,521 @@ static inline int csa_submit(struct file * file, struct 
file *cfile, int do_csum
 
 int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
 {
-       struct pcs_csa_context * csa_ctx = rcu_dereference(cs->csa_ctx);
+       struct pcs_csa_context * csa_ctx;
+
+       rcu_read_lock();
+       csa_ctx = rcu_dereference(cs->csa_ctx);
 
        if (csa_ctx) {
                struct pcs_map_entry * map = ireq->iochunk.map;
-               struct pcs_csa_entry * csa = cse_lookup(csa_ctx, map->id);
+               struct pcs_csa_entry * csa;
+
+               csa = __cse_lookup(csa_ctx, map->id);
                if (csa && memcmp(&ireq->iochunk.csl->version, &csa->version, 
sizeof(PCS_MAP_VERSION_T)) == 0 &&
                    (csa->flags & PCS_CSA_FL_READ)) {
                        /* XXX Paranoia? Verify! */
                        if (!(map->state & PCS_MAP_DEAD) && map->cs_list == 
ireq->iochunk.csl) {
+                               struct file * file = get_file(csa->file);
+                               struct file * cfile = csa->cfile ? 
get_file(csa->cfile) : NULL;
+                               unsigned int flags = csa->flags;
+                               int err;
+
                                if (csa_ctx->tfm)
                                        ireq->flags |= IREQ_F_CRYPT;
-                               if (!csa_submit(csa->file, csa->cfile, 
csa->flags&PCS_CSA_FL_CSUM, ireq))
+
+                               rcu_read_unlock();
+                               err = csa_submit(file, cfile, 
flags&PCS_CSA_FL_CSUM, ireq);
+                               fput(file);
+                               if (cfile)
+                                       fput(cfile);
+                               if (!err)
                                        return 1;
+                               rcu_read_lock();
                                /* Clear state which could be rewritten by 
csa_submit */
                                ireq->iochunk.msg.destructor = NULL;
                                ireq->iochunk.msg.rpc = NULL;
                        }
                }
        }
+       rcu_read_unlock();
        return 0;
 }
 
+/* Write engine. It is similar to read, code could be merged. Actually the 
situation
+ * with nsrv=1 is just exactly the same. But yet reads can be optimized a lot 
better
+ * and we do not want to lose this advantage.
+ *
+ * Terminology:
+ *  Original ireq - ireq which is supposed to be submitted to head of cs chain
+ *   D-request    - replicas at head of chain which have accelrated mappings 
and eligible
+ *                  for local aio processing
+ *                  They are presented as struct's pcs_accel_write_req which 
are stored
+ *                  as element of array awr[i] in struct pcs_accel_req in 
original ireq.iochunk.acr
+ *   N-request    - Request to be submitted to tail of cs chain following the 
last D-request
+ *                  It is presented as cloned original ireq with overriden 
completion callback,
+ *                  so that its errors and not preocessed, but copied to the 
original ireq
+ *                  to be processed on completion of original.
+ */
+
+static void ireq_init_acr(struct pcs_int_request * ireq)
+{
+       ireq->iochunk.parent_N = NULL;
+       atomic_set(&ireq->iochunk.acr.iocount, 1);
+       ireq->iochunk.acr.num_awr = 0;
+       ireq->iochunk.acr.num_iotimes = 0;
+}
+
+static void ireq_clear_acr(struct pcs_int_request * ireq)
+{
+       int i, n;
+
+       for (i = 0; i < ireq->iochunk.acr.num_awr; i++) {
+               struct bio_vec * bvec = ireq->iochunk.acr.awr[i].bvec_copy;
+               if (bvec) {
+                       for (n = ireq->iochunk.acr.awr[i].num_copy_bvecs-1; 
n>=0; n--) {
+                               if (bvec[n].bv_page)
+                                       put_page(bvec[n].bv_page);
+                       }
+                       kfree(bvec);
+                       ireq->iochunk.acr.awr[i].bvec_copy = NULL;
+               }
+       }
+       ireq->iochunk.msg.destructor = NULL;
+       ireq->iochunk.msg.rpc = NULL;
+       ireq->iochunk.parent_N = NULL;
+       ireq->flags |= IREQ_F_NO_ACCEL;
+}
+
+void pcs_csa_relay_iotimes(struct pcs_int_request * ireq,  struct pcs_cs_iohdr 
* h, PCS_NODE_ID_T cs_id)
+{
+       int idx = ireq->iochunk.acr.num_awr;
+       struct pcs_cs_sync_resp * srec;
+
+       ireq->iochunk.acr.io_times[idx].cs_id = cs_id;
+       ireq->iochunk.acr.io_times[idx].sync = h->sync;
+
+       for (srec = (struct pcs_cs_sync_resp*)(h + 1), idx++;
+            (void*)(srec + 1) <= (void*)h + h->hdr.len && idx < 
PCS_MAX_ACCEL_CS;
+            srec++, idx++)
+               ireq->iochunk.acr.io_times[idx] = *srec;
+
+       ireq->iochunk.acr.num_iotimes = idx;
+}
+
+static void __complete_acr_work(struct work_struct * w)
+{
+       struct pcs_int_request * ireq = container_of(w, struct pcs_int_request, 
iochunk.acr.work);
+
+       if (pcs_if_error(&ireq->error)) {
+               FUSE_KTRACE(ireq->cc->fc, "IO error %d %lu, ireq:%p : 
%llu:%u+%u",
+                     ireq->error.value,
+                     ireq->error.remote ? (unsigned 
long)ireq->error.offender.val : 0UL,
+                     ireq, (unsigned long long)ireq->iochunk.chunk,
+                     (unsigned)ireq->iochunk.offset,
+                     (unsigned)ireq->iochunk.size);
+       } else {
+               struct fuse_conn * fc = container_of(ireq->cc, struct 
pcs_fuse_cluster, cc)->fc;
+
+               fuse_stat_observe(fc, PCS_REQ_T_WRITE, ktime_sub(ktime_get(), 
ireq->ts_sent));
+
+               if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) {
+                       struct fuse_trace_hdr * t;
+                       int n = ireq->iochunk.acr.num_iotimes;
+
+                       t = FUSE_TRACE_PREPARE(fc->ktrace, FUSE_KTRACE_IOTIMES, 
sizeof(struct fuse_tr_iotimes_hdr) +
+                                              n*sizeof(struct 
fuse_tr_iotimes_cs));
+                       if (t) {
+                               struct fuse_tr_iotimes_hdr * th = (struct 
fuse_tr_iotimes_hdr *)(t + 1);
+                               struct fuse_tr_iotimes_cs * ch = (struct 
fuse_tr_iotimes_cs *)(th + 1);
+                               int i;
+
+                               th->chunk = ireq->iochunk.chunk;
+                               th->offset = ireq->iochunk.chunk + 
ireq->iochunk.offset;
+                               th->size = ireq->iochunk.size;
+                               th->start_time = ktime_to_us(ireq->ts);
+                               th->local_delay = 
ktime_to_us(ktime_sub(ireq->ts_sent, ireq->ts));
+                               th->lat = t->time - ktime_to_us(ireq->ts_sent);
+                               th->ino = ireq->dentry->fileinfo.attr.id;
+                               th->type = PCS_CS_WRITE_AL_RESP;
+                               th->cses = n;
+
+                               for (i = 0; i < n; i++) {
+                                       ch->csid = 
ireq->iochunk.acr.io_times[i].cs_id.val;
+                                       ch->misc = 
ireq->iochunk.acr.io_times[i].sync.misc;
+                                       ch->ts_net = 
ireq->iochunk.acr.io_times[i].sync.ts_net;
+                                       ch->ts_io = 
ireq->iochunk.acr.io_times[i].sync.ts_io;
+                                       ch++;
+                               }
+                       }
+               }
+               FUSE_TRACE_COMMIT(fc->ktrace);
+       }
+
+       ireq_clear_acr(ireq);
+       /* This will either complete or retry the whole request */
+       ireq_complete(ireq);
+}
+
+static inline void csa_complete_acr(struct pcs_int_request * ireq)
+{
+       if (atomic_dec_and_test(&ireq->iochunk.acr.iocount)) {
+               INIT_WORK(&ireq->iochunk.acr.work, __complete_acr_work);
+               queue_work(ireq->cc->wq, &ireq->iochunk.acr.work);
+       }
+}
+
+
+static void __pcs_csa_write_final_completion(struct pcs_accel_write_req *areq)
+{
+       struct pcs_int_request * ireq;
+
+       fput(areq->iocb.ki_filp);
+
+       ireq = container_of(areq - areq->index, struct pcs_int_request, 
iochunk.acr.awr[0]);
+
+       if (!pcs_if_error(&ireq->error)) {
+               struct pcs_cs_sync_resp * sresp = 
&ireq->iochunk.acr.io_times[areq->index];
+               sresp->cs_id.val = 
ireq->iochunk.csl->cs[areq->index].info.id.val | PCS_NODE_ALT_MASK;
+               sresp->sync.ts_net = 0;
+               sresp->sync.ts_io = ktime_to_us(ktime_get()) - sresp->sync.misc;
+       }
+
+       csa_complete_acr(ireq);
+}
+
+static void csa_write_complete_work(struct work_struct *w)
+{
+       struct pcs_accel_write_req * areq = container_of(w, struct 
pcs_accel_write_req, work);
+
+       __pcs_csa_write_final_completion(areq);
+}
+
+static void csa_write_complete(struct kiocb *iocb, long ret)
+{
+       struct pcs_accel_write_req * areq;
+       struct pcs_int_request * ireq;
+
+       areq = container_of(iocb, struct pcs_accel_write_req, iocb);
+       ireq = container_of(areq-areq->index, struct pcs_int_request, 
iochunk.acr.awr[0]);
+
+       if (ret != ireq->iochunk.size) {
+               if (!ireq->error.value) {
+                       ireq->error.remote = 1;
+                       ireq->error.offender = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
+                       ireq->error.value = PCS_ERR_IO;
+                       ireq->flags |= IREQ_F_ACCELERROR;
+               }
+       }
+
+       if (atomic_dec_and_test(&areq->iocount)) {
+               INIT_WORK(&areq->work, csa_write_complete_work);
+               queue_work(ireq->cc->wq, &areq->work);
+       }
+}
+
+static void encrypt_page_ctr(struct crypto_sync_skcipher * tfm, struct page * 
dst, struct page *src,
+                            unsigned int offset, unsigned int len, u64 pos, 
u64 chunk_id)
+{
+       struct scatterlist sgi, sgo;
+       struct { u64 a, b; } iv;
+       SYNC_SKCIPHER_REQUEST_ON_STACK(req, tfm);
+
+       skcipher_request_set_sync_tfm(req, tfm);
+       skcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, 
NULL);
+       sg_init_table(&sgi, 1);
+       sg_init_table(&sgo, 1);
+
+       iv.a = chunk_id;
+       iv.b = cpu_to_be64(pos / 16);
+       sg_set_page(&sgi, src, len, offset);
+       sg_set_page(&sgo, dst, len, offset);
+       skcipher_request_set_crypt(req, &sgi, &sgo, len, &iv);
+       crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->encrypt(req);
+}
+
+static void encrypt_page_xts(struct crypto_sync_skcipher * tfm, struct page * 
dst, struct page *src,
+                            unsigned int offset, unsigned int len, u64 pos, 
u64 chunk_id)
+{
+       struct scatterlist sgi, sgo;
+       struct { u64 a, b; } iv;
+       SYNC_SKCIPHER_REQUEST_ON_STACK(req, tfm);
+
+       skcipher_request_set_sync_tfm(req, tfm);
+       skcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, 
NULL);
+       sg_init_table(&sgi, 1);
+       sg_init_table(&sgo, 1);
+
+       for ( ; len > 0; len -= 512) {
+               iv.a = pos / 512;
+               iv.b = chunk_id;
+               sg_set_page(&sgi, src, 512, offset);
+               sg_set_page(&sgo, dst, 512, offset);
+               skcipher_request_set_crypt(req, &sgi, &sgo, 512, &iv);
+               crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->encrypt(req);
+               pos += 512;
+               offset += 512;
+       }
+}
+
+static int init_crypted_data(struct pcs_int_request * ireq, int idx)
+{
+       struct pcs_int_request *parent = ireq->completion_data.parent;
+       struct pcs_fuse_req * r;
+       struct bio_vec * bvec;
+       int n, nvec;
+       u64 pos;
+       u64 chunk_id;
+       struct pcs_csa_context * csa_ctx;
+       struct crypto_sync_skcipher * tfm;
+
+       BUG_ON(parent->type != PCS_IREQ_API);
+       r = parent->apireq.req->datasource;
+
+       nvec = r->exec.io.num_bvecs;
+
+       /* XXX oops, this can sleep. tfm can be destroyed. Need refcount yet?
+        * Seems, not. We just have to refetch tfm from cs after allocations,
+        * failing if it is destroyed already.
+        */
+       bvec = kmalloc(sizeof(struct bio_vec) * nvec, GFP_NOIO);
+       if (!bvec)
+               return -ENOMEM;
+
+       for (n = 0; n < nvec; n++) {
+               bvec[n] = r->exec.io.bvec[n];
+               if ((bvec[n].bv_offset|bvec[n].bv_len)&511)
+                       goto out;
+               bvec[n].bv_page = alloc_page(GFP_NOIO);
+               if (!bvec[n].bv_page)
+                       goto out;
+       }
+
+       rcu_read_lock();
+       csa_ctx = 
rcu_dereference(ireq->iochunk.csl->cs[idx].cslink.cs->csa_ctx);
+       if (!csa_ctx || ((tfm = rcu_dereference(csa_ctx->tfm)) == NULL)) {
+               rcu_read_unlock();
+               goto out;
+       }
+
+       pos = ireq->iochunk.offset;
+       chunk_id = ireq->iochunk.map->id;
+       for (n = 0; n < nvec; n++) {
+               if (tfm->base.base.__crt_alg->cra_priority == 400)
+                       encrypt_page_ctr(tfm, bvec[n].bv_page, 
r->exec.io.bvec[n].bv_page, bvec[n].bv_offset, bvec[n].bv_len, pos, chunk_id);
+               else
+                       encrypt_page_xts(tfm, bvec[n].bv_page, 
r->exec.io.bvec[n].bv_page, bvec[n].bv_offset, bvec[n].bv_len, pos, chunk_id);
+               pos += bvec[n].bv_len;
+       }
+       rcu_read_unlock();
+
+       ireq->iochunk.acr.awr[idx].bvec_copy = bvec;
+       ireq->iochunk.acr.awr[idx].num_copy_bvecs = n;
+       return 0;
+
+out:
+       while (--n >= 0)
+               put_page(bvec[n].bv_page);
+       kfree(bvec);
+       return -ENOMEM;
+}
+
+static inline int csa_submit_write(struct file * file, struct pcs_int_request 
* ireq, int idx, int do_crypt)
+{
+       struct pcs_accel_write_req * areq =  &ireq->iochunk.acr.awr[idx];
+       struct kiocb * iocb = &areq->iocb;
+       struct iov_iter iter;
+       struct iov_iter * it = &iter; /* Just to use this pointer instead of 
&iter */
+       unsigned int size = ireq->iochunk.size;
+       int ret;
+
+       if (do_crypt) {
+               if (init_crypted_data(ireq, idx))
+                       return -EINVAL;
+               iov_iter_bvec(it, WRITE, areq->bvec_copy, areq->num_copy_bvecs, 
size);
+       } else {
+               struct pcs_int_request *parent = ireq->completion_data.parent;
+               pcs_api_iorequest_t *ar;
+
+               areq->bvec_copy = NULL;
+               BUG_ON(parent->type != PCS_IREQ_API);
+               ar = parent->apireq.req;
+               ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, 
WRITE);
+               if (!iov_iter_is_bvec(it)) {
+                       FUSE_KTRACE(ireq->cc->fc, "Not a bvec, falling back");
+                       return -EINVAL;
+               }
+               iov_iter_truncate(it, size);
+       }
+
+       iocb->ki_pos = ireq->iochunk.offset;
+       iocb->ki_filp = get_file(file);
+       iocb->ki_complete = csa_write_complete;
+       iocb->ki_flags = IOCB_DIRECT;
+       iocb->ki_ioprio = IOPRIO_PRIO_VALUE(IOPRIO_CLASS_NONE, 0);
+
+       /* One ref is ours, other is for AIO. */
+       atomic_set(&areq->iocount, 2);
+       atomic_inc(&ireq->iochunk.acr.iocount);
+       areq->index = idx;
+       ireq->iochunk.acr.num_awr = idx + 1;
+
+       ireq->iochunk.acr.io_times[idx].sync.misc = ktime_to_us(ktime_get());
+
+       ret = call_write_iter(file, iocb, it);
+
+       if (unlikely(ret != -EIOCBQUEUED)) {
+               if (ret != size) {
+                       if (!ireq->error.value) {
+                               ireq->error.remote = 1;
+                               ireq->error.offender = 
ireq->iochunk.csl->cs[idx].info.id;
+                               ireq->error.value = PCS_ERR_IO;
+                       }
+
+                       /* Do not drop refs, we do not want to complete ireq. */
+                       fput(areq->iocb.ki_filp);
+                       FUSE_KTRACE(ireq->cc->fc, "AIO submit rejected ret=%d 
%lu, ireq:%p : %llu:%u+%u",
+                                   ret, ireq->error.remote ? (unsigned 
long)ireq->error.offender.val : 0UL,
+                                   ireq, (unsigned long 
long)ireq->iochunk.chunk,
+                                   (unsigned)ireq->iochunk.offset,
+                                   (unsigned)size);
+                       if (atomic_dec_and_test(&ireq->iochunk.acr.iocount))
+                               BUG();
+                       return ret >= 0 ? -EIO : ret;
+               }
+
+               /* IO already finished. Drop AIO refcnt and proceed to crc */
+               FUSE_KTRACE(ireq->cc->fc, "No good, AIO executed synchronously, 
ireq:%p : %llu:%u+%u",
+                           ireq, (unsigned long long)ireq->iochunk.chunk,
+                           (unsigned)ireq->iochunk.offset,
+                           (unsigned)size);
+
+               if (atomic_dec_and_test(&areq->iocount))
+                       BUG();
+       }
+
+       if (atomic_dec_and_test(&areq->iocount)) {
+               INIT_WORK(&areq->work, csa_write_complete_work);
+               queue_work(ireq->cc->wq, &areq->work);
+       }
+       return 0;
+}
+
+static int csa_cs_submit_write(struct pcs_int_request * ireq, int idx)
+{
+       struct pcs_cs * cs = ireq->iochunk.csl->cs[idx].cslink.cs;
+       struct pcs_csa_context * csa_ctx;
+
+       if (idx >= PCS_MAX_ACCEL_CS)
+               return 0;
+
+       rcu_read_lock();
+       csa_ctx = rcu_dereference(cs->csa_ctx);
+       if (csa_ctx) {
+               struct pcs_map_entry * map = ireq->iochunk.map;
+               struct pcs_csa_entry * csa = __cse_lookup(csa_ctx, map->id);
+               if (csa && memcmp(&ireq->iochunk.csl->version, &csa->version, 
sizeof(PCS_MAP_VERSION_T)) == 0 &&
+                   (csa->flags & PCS_CSA_FL_WRITE)) {
+                       /* XXX Paranoia? Verify! */
+                       if (!(map->state & PCS_MAP_DEAD) && map->cs_list == 
ireq->iochunk.csl) {
+                               struct file * file = get_file(csa->file);
+                               int do_crypt = (csa_ctx->tfm != NULL);
+                               int err;
+
+                               rcu_read_unlock();
+                               err = csa_submit_write(file, ireq, idx, 
do_crypt);
+                               fput(file);
+                               return !err;
+                       }
+               }
+       }
+       rcu_read_unlock();
+       return 0;
+}
+
+static void complete_N_request(struct pcs_int_request * sreq)
+{
+       struct pcs_int_request * ireq = sreq->iochunk.parent_N;
+
+       if (pcs_if_error(&sreq->error)) {
+               /* Error on N-request overrides any error on a D-request. */
+               ireq->error = sreq->error;
+               ireq->flags |= IREQ_F_NO_ACCEL;
+               /* Clear ACCELERROR to deliver this error normally, through 
invalidating the map */
+               ireq->flags &= ~IREQ_F_ACCELERROR;
+       }
+
+       /* And free all clone resources */
+       pcs_sreq_detach(sreq);
+       if (sreq->iochunk.map)
+               pcs_map_put(sreq->iochunk.map);
+       if (sreq->iochunk.csl)
+               cslist_put(sreq->iochunk.csl);
+       if (sreq->iochunk.flow)
+               pcs_flow_put(sreq->iochunk.flow, &sreq->cc->maps.ftab);
+       ireq_destroy(sreq);
+
+       csa_complete_acr(ireq);
+}
+
+
+struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * 
ireq)
+{
+       int idx;
+       struct pcs_cs_list *csl = ireq->iochunk.csl;
+
+       if (csl->nsrv > PCS_MAX_ACCEL_CS)
+               return ireq;
+
+       ireq_init_acr(ireq);
+
+       for (idx = 0; idx < csl->nsrv; idx++) {
+               if (!csa_cs_submit_write(ireq, idx))
+                       break;
+       }
+
+       if (idx == 0) {
+               /* Nothing was handled. Just proceed to normal submit */
+               ireq_clear_acr(ireq);
+               return ireq;
+       } else if (idx >= csl->nsrv) {
+               /* Everything went locally. No network at all. */
+               ireq->iochunk.acr.num_iotimes = idx;
+               csa_complete_acr(ireq);
+               return NULL;
+       } else {
+               /* Harder case. We have to transmit to tail replicas */
+               struct pcs_int_request * sreq = pcs_ireq_split(ireq, 0, 1);
+               if (sreq == NULL) {
+                       /* Some D replicas are submitted. So, we have to go
+                        * through error cycle.
+                        */
+                       ireq->error.remote = 1;
+                       ireq->error.offender = 
ireq->iochunk.csl->cs[idx].info.id;
+                       ireq->error.value = PCS_ERR_NORES;
+                       csa_complete_acr(ireq);
+                       return NULL;
+               }
+
+               ireq->iochunk.acr.num_iotimes = idx;
+
+               /* ireq_split does not copy size and csl */
+               sreq->iochunk.size = ireq->iochunk.size;
+               sreq->iochunk.csl = ireq->iochunk.csl;
+               cslist_get(ireq->iochunk.csl);
+               /* Yet this sreq is not actually accounted, the accounting is 
made for original ireq */
+               sreq->flags |= IREQ_F_NOACCT;
+               sreq->complete_cb = complete_N_request;
+               sreq->iochunk.parent_N = ireq;
+               sreq->iochunk.cs_index = idx;
+
+               /* Our original iocount ref goes to N-request,
+                * Proceed with sending sreq to the tail of cs chain
+                */
+               return sreq;
+       }
+}
+
+
 static long csa_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
 {
        struct pcs_csa_context *ctx = file->private_data;
@@ -717,6 +1207,7 @@ static int csa_release(struct inode *inode, struct file 
*file)
        struct pcs_cs * cs;
 
        ctx->dead = 1;
+       rcu_read_lock();
        if ((cs = rcu_dereference(ctx->cs)) != NULL) {
                spin_lock(&cs->lock);
                if (ctx->cs == cs) {
@@ -728,6 +1219,7 @@ static int csa_release(struct inode *inode, struct file 
*file)
                }
                spin_unlock(&cs->lock);
        }
+       rcu_read_unlock();
        wake_up_poll(&ctx->wqh, EPOLLHUP);
        pcs_csa_put(ctx);
        return 0;
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 9dc1c95..634775b 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -70,7 +70,7 @@ static inline unsigned int pcs_sync_timeout(struct 
pcs_cluster_core *cc)
        return PCS_SYNC_TIMEOUT;
 }
 
-static void cslist_destroy(struct pcs_cs_list * csl)
+void cslist_destroy(struct pcs_cs_list * csl)
 {
        int i;
 
@@ -97,19 +97,6 @@ static void cslist_destroy(struct pcs_cs_list * csl)
        kfree(csl);
 }
 
-static inline void cslist_get(struct pcs_cs_list * csl)
-{
-       TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, 
atomic_read(&csl->refcnt));
-
-       atomic_inc(&csl->refcnt);
-}
-static inline void cslist_put(struct pcs_cs_list * csl)
-{
-       TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, 
atomic_read(&csl->refcnt));
-       if (atomic_dec_and_test(&csl->refcnt))
-               cslist_destroy(csl);
-}
-
 static void map_drop_cslist(struct pcs_map_entry * m)
 {
        assert_spin_locked(&m->lock);
@@ -1579,6 +1566,9 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, 
pcs_error_t * err)
        unsigned long long match_id = 0;
        struct pcs_cs_list * csl, ** csl_p = 0;
 
+       if (ireq->flags & IREQ_F_NOACCT)
+               return;
+
        switch (ireq->type) {
        case PCS_IREQ_IOCHUNK:
                csl_p = &ireq->iochunk.csl;
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index e2b3c14..bfe0719 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -221,6 +221,22 @@ static inline struct pcs_cluster_core *cc_from_map(struct 
pcs_map_entry * m)
 
 extern unsigned int cs_io_locality;
 
+void cslist_destroy(struct pcs_cs_list * csl);
+
+static inline void cslist_get(struct pcs_cs_list * csl)
+{
+       TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, 
atomic_read(&csl->refcnt));
+
+       atomic_inc(&csl->refcnt);
+}
+
+static inline void cslist_put(struct pcs_cs_list * csl)
+{
+       TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, 
atomic_read(&csl->refcnt));
+       if (atomic_dec_and_test(&csl->refcnt))
+               cslist_destroy(csl);
+}
+
 #define MAP_FMT        "(%p) 0x%lld s:%x" DENTRY_FMT
 #define MAP_ARGS(m) (m), (long long)(m)->index,         (m)->state, 
DENTRY_ARGS(pcs_dentry_from_map((m)))
 
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 68cf270..8ee32b3 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -59,6 +59,40 @@ struct pcs_aio_req
        u32                     crcb[PCS_MAX_INLINE_CRC];
 };
 
+#define PCS_MAX_ACCEL_CS       3
+
+struct pcs_accel_write_req
+{
+       int                     index;
+       struct kiocb            iocb;
+       atomic_t                iocount;
+       struct work_struct      work;
+
+       /* Crypto bits. This holds an encrypted copy of original data for use 
by aio writes */
+       struct bio_vec          *bvec_copy;
+       unsigned                num_copy_bvecs;
+};
+
+struct pcs_accel_req
+{
+       struct pcs_int_request          *parent;
+       atomic_t                        iocount;
+       int                             num_awr;
+       struct pcs_accel_write_req      awr[PCS_MAX_ACCEL_CS];
+       int                             num_iotimes;
+       struct pcs_cs_sync_resp         io_times[PCS_MAX_ACCEL_CS];
+       struct work_struct              work;
+};
+
+struct pcs_iochunk_req {
+       struct pcs_msg          msg;
+       struct pcs_cs_iohdr     hbuf;           /* Buffer for header.
+                                                * A little ugly
+                                                */
+       struct kvec             hbuf_kv;
+       struct pcs_int_request  *parent_N;
+};
+
 struct pcs_int_request
 {
        struct pcs_cluster_core* cc;
@@ -84,6 +118,7 @@ struct pcs_int_request
 #define IREQ_F_NO_ACCEL                0x1000
 #define IREQ_F_CRYPT           0x2000
 #define IREQ_F_ACCELERROR      0x4000
+#define IREQ_F_NOACCT          0x8000
 
        atomic_t                iocount;
 
@@ -141,8 +176,11 @@ struct pcs_int_request
                                                                                
 * A little ugly
                                                                                
 */
                                        struct kvec             hbuf_kv;
+                                       struct pcs_int_request  *parent_N;
                                };
+                               struct pcs_iochunk_req          ir;
                                struct pcs_aio_req              ar;
+                               struct pcs_accel_req            acr;
                        };
                } iochunk;
 
-- 
1.8.3.1

_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to