pcs_cs_accel added some error reporting from unaerialized contexts,
it is a bug. Not new. Actually, the same problem was present since
day zero in pcs_sreq_complete, where error was propagated from
children iochunk requests to parent api request.

Add an atomic version of pcs_error_t, which can be set
from arbitrary context and move some racy error updates
to serialized context.

Signed-off-by: Alexey Kuznetsov <kuz...@acronis.com>
---
 fs/fuse/kio/pcs/pcs_cluster.c  |  2 +-
 fs/fuse/kio/pcs/pcs_cs_accel.c | 97 +++++++++++++++++-------------------------
 fs/fuse/kio/pcs/pcs_error.h    | 44 +++++++++++++++++--
 fs/fuse/kio/pcs/pcs_map.c      |  2 +-
 fs/fuse/kio/pcs/pcs_req.h      |  1 +
 5 files changed, 83 insertions(+), 63 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cluster.c b/fs/fuse/kio/pcs/pcs_cluster.c
index dbca68f..a19a559 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.c
+++ b/fs/fuse/kio/pcs/pcs_cluster.c
@@ -61,7 +61,7 @@ void pcs_sreq_complete(struct pcs_int_request *sreq)
                                ireq_delay(sreq);
                                return;
                        }
-                       pcs_copy_error(&ireq->error, &sreq->error);
+                       pcs_copy_error_cond_atomic(&ireq->error, &sreq->error);
                }
 
                if (sreq->type != PCS_IREQ_CUSTOM)
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
index 33a60ca..6db4562 100644
--- a/fs/fuse/kio/pcs/pcs_cs_accel.c
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -370,21 +370,15 @@ static void __pcs_csa_final_completion(struct pcs_aio_req 
*areq)
                struct pcs_csa_context * ctx;
                rcu_read_lock();
                ctx = rcu_dereference(cs->csa_ctx);
-               if (!ctx || !ctx->tfm || decrypt_data(ireq, ctx->tfm)) {
-                       ireq->error.remote = 1;
-                       ireq->error.offender = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
-                       ireq->error.value = PCS_ERR_IO;
-               }
+               if (!ctx || !ctx->tfm || decrypt_data(ireq, ctx->tfm))
+                       pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1, 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id);
                rcu_read_unlock();
        }
 
        if (areq->crc) {
                if (!pcs_if_error(&ireq->error)) {
-                       if (verify_crc(ireq, areq->crc)) {
-                               ireq->error.remote = 1;
-                               ireq->error.offender = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
-                               ireq->error.value = PCS_ERR_IO;
-                       }
+                       if (verify_crc(ireq, areq->crc))
+                               pcs_set_error_cond_atomic(&ireq->error, 
PCS_ERR_IO, 1, ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id);
                }
 
                if (areq->crc && areq->crc != areq->crcb) {
@@ -483,11 +477,7 @@ static void csa_crc_work(struct work_struct *w)
                areq->crc = kmalloc(ncrc, GFP_KERNEL);
                if (areq->crc == NULL) {
 out:
-                       if (!ireq->error.value) {
-                               ireq->error.remote = 1;
-                               ireq->error.offender = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
-                               ireq->error.value = PCS_ERR_NORES;
-                       }
+                       pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_NORES, 
1, ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id);
                        fput(areq->cfile);
                        if (areq->crc && areq->crc != areq->crcb) {
                                kfree(areq->crc);
@@ -524,13 +514,8 @@ static void pcs_csa_complete(struct kiocb *iocb, long ret)
        areq = container_of(iocb, struct pcs_aio_req, iocb);
        ireq = container_of(areq, struct pcs_int_request, iochunk.ar);
 
-       if (ret != ireq->iochunk.size) {
-               if (!ireq->error.value) {
-                       ireq->error.remote = 1;
-                       ireq->error.offender = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
-                       ireq->error.value = PCS_ERR_IO;
-               }
-       }
+       if (ret != ireq->iochunk.size)
+               pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1, 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id);
 
        if (atomic_dec_and_test(&areq->iocount)) {
                INIT_WORK(&areq->work, csa_complete_work);
@@ -585,11 +570,7 @@ static inline int csa_submit(struct file * file, struct 
file *cfile, int do_csum
 
        if (unlikely(ret != -EIOCBQUEUED)) {
                if (ret != size) {
-                       if (!ireq->error.value) {
-                               ireq->error.remote = 1;
-                               ireq->error.offender = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
-                               ireq->error.value = PCS_ERR_IO;
-                       }
+                       pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1, 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id);
 
                        /* Do not drop refs, we do not want to complete ireq. */
                        fput(areq->iocb.ki_filp);
@@ -696,6 +677,7 @@ static void ireq_init_acr(struct pcs_int_request * ireq)
 {
        atomic_set(&ireq->iochunk.acr.iocount, 1);
        ireq->iochunk.acr.num_awr = 0;
+       pcs_clear_error(&ireq->iochunk.acr.net_error);
        ireq->iochunk.acr.num_iotimes = 0;
 }
 
@@ -745,6 +727,17 @@ static void __complete_acr_work(struct work_struct * w)
 {
        struct pcs_int_request * ireq = container_of(w, struct pcs_int_request, 
iochunk.acr.work);
 
+       if (pcs_if_error(&ireq->iochunk.acr.net_error)) {
+               /* Error on N-request overrides any error on a D-request. */
+               pcs_copy_error(&ireq->error, &ireq->iochunk.acr.net_error);
+               ireq->flags |= IREQ_F_NO_ACCEL;
+               /* Clear ACCELERROR to deliver this error normally, through 
invalidating the map */
+               ireq->flags &= ~(IREQ_F_ACCELERROR|IREQ_F_ONCE);
+       } else if (pcs_if_error(&ireq->error)) {
+               ireq->flags |= IREQ_F_NO_ACCEL|IREQ_F_ACCELERROR;
+               ireq->flags &= ~IREQ_F_ONCE;
+       }
+
        if (pcs_if_error(&ireq->error)) {
                FUSE_KTRACE(ireq->cc->fc, "IO error %d %lu, ireq:%p : 
%llu:%u+%u",
                      ireq->error.value,
@@ -806,7 +799,6 @@ static inline void csa_complete_acr(struct pcs_int_request 
* ireq)
        }
 }
 
-
 static void __pcs_csa_write_final_completion(struct pcs_accel_write_req *areq)
 {
        struct pcs_int_request * ireq;
@@ -842,12 +834,8 @@ static void csa_sync_work(struct work_struct *w)
 
        res = vfs_fsync(areq->iocb.ki_filp, 1);
 
-       if (res) {
-               ireq->error.remote = 1;
-               ireq->error.offender = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
-               ireq->error.value = PCS_ERR_IO;
-               ireq->flags |= IREQ_F_ACCELERROR;
-       }
+       if (res)
+               pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1, 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id);
 
        if (atomic_dec_and_test(&areq->iocount))
                __pcs_csa_write_final_completion(areq);
@@ -869,15 +857,11 @@ static void csa_write_complete(struct kiocb *iocb, long 
ret)
        ireq = container_of(areq-areq->index, struct pcs_int_request, 
iochunk.acr.awr[0]);
 
        if (ret != ireq->iochunk.size) {
-               if (!ireq->error.value) {
-                       ireq->error.remote = 1;
-                       ireq->error.offender = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
-                       ireq->error.value = PCS_ERR_IO;
-                       ireq->flags |= IREQ_F_ACCELERROR;
-               }
+               if (!pcs_if_error(&ireq->error))
+                       pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1, 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id);
        }
 
-       if (!ireq->error.value) {
+       if (!pcs_if_error(&ireq->error)) {
                if ((ireq->dentry->fileinfo.attr.attrib & 
PCS_FATTR_IMMEDIATE_WRITE) ||
                    ireq->dentry->no_write_delay) {
                        INIT_WORK(&areq->work, csa_sync_work);
@@ -1044,11 +1028,7 @@ static inline int csa_submit_write(struct file * file, 
struct pcs_int_request *
 
        if (unlikely(ret != -EIOCBQUEUED)) {
                if (ret != size) {
-                       if (!ireq->error.value) {
-                               ireq->error.remote = 1;
-                               ireq->error.offender = 
ireq->iochunk.csl->cs[idx].info.id;
-                               ireq->error.value = PCS_ERR_IO;
-                       }
+                       pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1, 
ireq->iochunk.csl->cs[idx].info.id);
 
                        /* Do not drop refs, we do not want to complete ireq. */
                        fput(areq->iocb.ki_filp);
@@ -1115,16 +1095,12 @@ static void complete_N_request(struct pcs_int_request * 
sreq)
 {
        struct pcs_int_request * ireq = sreq->iochunk.parent_N;
 
-       if (pcs_if_error(&sreq->error)) {
-               /* Error on N-request overrides any error on a D-request. */
-               ireq->error = sreq->error;
-               ireq->flags |= IREQ_F_NO_ACCEL;
-               /* Clear ACCELERROR to deliver this error normally, through 
invalidating the map */
-               ireq->flags &= ~IREQ_F_ACCELERROR;
-       }
+       if (pcs_if_error(&sreq->error))
+               pcs_copy_error(&ireq->iochunk.acr.net_error, &sreq->error);
 
        /* And free all clone resources */
-       pcs_sreq_detach(sreq);
+       if (!pcs_sreq_detach(sreq))
+               BUG();
        if (sreq->iochunk.map)
                pcs_map_put(sreq->iochunk.map);
        if (sreq->iochunk.csl)
@@ -1167,9 +1143,7 @@ struct pcs_int_request * pcs_csa_csl_write_submit(struct 
pcs_int_request * ireq)
                        /* Some D replicas are submitted. So, we have to go
                         * through error cycle.
                         */
-                       ireq->error.remote = 1;
-                       ireq->error.offender = 
ireq->iochunk.csl->cs[idx].info.id;
-                       ireq->error.value = PCS_ERR_NORES;
+                       pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_NORES, 
1, ireq->iochunk.csl->cs[idx].info.id);
                        csa_complete_acr(ireq);
                        return NULL;
                }
@@ -1193,7 +1167,6 @@ struct pcs_int_request * pcs_csa_csl_write_submit(struct 
pcs_int_request * ireq)
        }
 }
 
-
 int pcs_csa_csl_write_submit_single(struct pcs_int_request * ireq, int idx)
 {
        if (idx >= PCS_MAX_ACCEL_CS)
@@ -1360,12 +1333,20 @@ int pcs_csa_register(struct pcs_cluster_core * cc, 
PCS_NODE_ID_T cs_id, struct c
 
 int pcs_csa_init(void)
 {
+       struct _old_pcs_error_t
+       {
+               unsigned int     value : 31, remote: 1;
+               PCS_NODE_ID_T           offender;
+       };
+
        pcs_csa_cachep = kmem_cache_create("pcs_csa",
                                            sizeof(struct pcs_csa_entry),
                                            0, 
SLAB_RECLAIM_ACCOUNT|SLAB_ACCOUNT, NULL);
        if (!pcs_csa_cachep)
                return -ENOMEM;
 
+       BUILD_BUG_ON(sizeof(struct _old_pcs_error_t) != sizeof(struct 
_pcs_error_t));
+
        return 0;
 }
 
diff --git a/fs/fuse/kio/pcs/pcs_error.h b/fs/fuse/kio/pcs/pcs_error.h
index e19e8df..cbf5c15 100644
--- a/fs/fuse/kio/pcs/pcs_error.h
+++ b/fs/fuse/kio/pcs/pcs_error.h
@@ -208,9 +208,15 @@ static __inline const char *pcs_strerror(pcs_err_t errnum)
 
 struct _pcs_error_t
 {
-       unsigned int    value : 31, remote: 1;
-
-       PCS_NODE_ID_T           offender;
+       union {
+               struct {
+                       u32     value : 31, remote: 1;
+                       u32     pad;
+               };
+               atomic64_t atomic;
+       };
+
+        PCS_NODE_ID_T           offender;
 };
 typedef struct _pcs_error_t pcs_error_t;
 
@@ -244,6 +250,38 @@ static __inline void pcs_set_local_error(pcs_error_t * 
status, int err)
        status->remote = 0;
 }
 
+/* Atomicity is special here, intended only to set an error status from 
multiple contexts.
+ * Actual access to content is made only from serialized context and does not 
need atomicity.
+ * It is important that an error set once cannot be cleared or changed or even 
read until
+ * all the context which could modify error are finished.
+ */
+static __inline void pcs_set_error_cond_atomic(pcs_error_t * status, int err, 
int remote, PCS_NODE_ID_T id)
+{
+       if (!status->value) {
+               for (;;) {
+                       s64 val = err | (remote ? 1ULL << 31 : 0);
+                       s64 old_val = atomic64_read(&status->atomic);
+                       if (old_val & 0x7FFFFFFFUL)
+                               break;
+                       if (old_val == atomic64_cmpxchg(&status->atomic, 
old_val, val)) {
+                               if (remote)
+                                       status->offender = id;
+                               break;
+                       }
+               }
+       }
+}
+
+static __inline void pcs_set_local_error_cond_atomic(pcs_error_t * status, int 
err)
+{
+       pcs_set_error_cond_atomic(status, err, 0, (PCS_NODE_ID_T) { .val = 0 });
+}
+
+static __inline void pcs_copy_error_cond_atomic(pcs_error_t * dst, pcs_error_t 
* src)
+{
+       pcs_set_error_cond_atomic(dst, src->value, src->remote, src->offender);
+}
+
 int pcs_error_to_errno(pcs_error_t *);
 
 static __inline void *pcs_err_ptr(int err)
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index df33b52..f0520ba 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -3068,7 +3068,7 @@ static void pcs_flushreq_complete(struct pcs_int_request 
* sreq)
                        return;
                }
                FUSE_KTRACE(sreq->cc->fc, "flush error %d", sreq->error.value);
-               pcs_copy_error(&ireq->error, &sreq->error);
+               pcs_copy_error_cond_atomic(&ireq->error, &sreq->error);
                notify_error = 1;
        }
 
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index c677332..cdf1c97 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -80,6 +80,7 @@ struct pcs_accel_req
        atomic_t                        iocount;
        int                             num_awr;
        struct pcs_accel_write_req      awr[PCS_MAX_ACCEL_CS];
+       pcs_error_t                     net_error;
        int                             num_iotimes;
        struct fuse_tr_iotimes_cs       io_times[PCS_MAX_ACCEL_CS];
        struct work_struct              work;
-- 
1.8.3.1

_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to