Signed-off-by: Andy Zhou <az...@ovn.org> --- lib/jsonrpc.c | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++++---- lib/jsonrpc.h | 7 ++++++ 2 files changed, 77 insertions(+), 4 deletions(-)
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c index 35428a6..e8ea8b4 100644 --- a/lib/jsonrpc.c +++ b/lib/jsonrpc.c @@ -138,11 +138,15 @@ jsonrpc_run(struct jsonrpc *rpc) } /* Arranges for the poll loop to wake up when 'rpc' needs to perform - * maintenance activities. */ + * maintenance activities. + * + * This function should not be called when 'rpc' has joined a poll + * group. Use poll_group_get_events() instead. */ void jsonrpc_wait(struct jsonrpc *rpc) { if (!rpc->status) { + ovs_assert(!stream_joined(rpc->stream)); stream_run_wait(rpc->stream); if (!list_is_empty(&rpc->output)) { stream_send_wait(rpc->stream); @@ -353,11 +357,36 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) return EAGAIN; } +bool +jsonrpc_joined_poll_group(const struct jsonrpc *rpc) +{ + return rpc->status ? false : stream_joined(rpc->stream); +} + +bool +jsonrpc_has_pending_input(const struct jsonrpc *rpc) +{ + return rpc->status ? false : !byteq_is_empty(&rpc->input); +} + +void +jsonrpc_poll_group_update(struct jsonrpc *rpc, bool write) +{ + if (!rpc->status) { + ovs_assert(stream_joined(rpc->stream)); + stream_update(rpc->stream, write); + } +} + /* Causes the poll loop to wake up when jsonrpc_recv() may return a value other * than EAGAIN. */ void jsonrpc_recv_wait(struct jsonrpc *rpc) { + if (!rpc->status) { + ovs_assert(!stream_joined(rpc->stream)); + } + if (rpc->status || !byteq_is_empty(&rpc->input)) { poll_immediate_wake_at(rpc->name); } else { @@ -377,6 +406,8 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg) fatal_signal_run(); + ovs_assert(!stream_joined(rpc->stream)); + error = jsonrpc_send(rpc, msg); if (error) { return error; @@ -397,6 +428,8 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg) int jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) { + ovs_assert(!stream_joined(rpc->stream)); + for (;;) { int error = jsonrpc_recv(rpc, msgp); if (error != EAGAIN) { @@ -827,7 +860,6 @@ jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp) s->stream = NULL; s->pstream = NULL; s->seqno = 0; - return s; } @@ -976,12 +1008,26 @@ jsonrpc_session_run(struct jsonrpc_session *s) } } +/* Only wait for stream within the 's'. Poll group does not + * handle pstream, and stream's initial connection, these + * are still using poll loop. */ void jsonrpc_session_wait(struct jsonrpc_session *s) { + /* When s->rpc is set, The jsonrpc session is in connected + * state. Check if 's' has already registered with a poll group. + * + * s->stream is set when the stream is not in a connected state + * continue to let poll loop handle it. + * + * poll group currently does not work with pstream. Let + * poll loop handle all pstreams. */ if (s->rpc) { - jsonrpc_wait(s->rpc); - } else if (s->stream) { + if (!jsonrpc_joined_poll_group(s->rpc)) { + jsonrpc_wait(s->rpc); + } + } + if (s->stream) { stream_run_wait(s->stream); stream_connect_wait(s->stream); } @@ -1055,6 +1101,26 @@ jsonrpc_session_recv(struct jsonrpc_session *s) return NULL; } +bool +jsonrpc_session_joined_poll_group(const struct jsonrpc_session *s) +{ + return s->rpc ? jsonrpc_joined_poll_group(s->rpc) : false; +} + +bool +jsonrpc_session_has_pending_input(const struct jsonrpc_session *s) +{ + return s->rpc ? jsonrpc_has_pending_input(s->rpc) : false; +} + +void +jsonrpc_session_poll_group_update(struct jsonrpc_session *s, bool write) +{ + if (s->rpc) { + jsonrpc_poll_group_update(s->rpc, write); + } +} + void jsonrpc_session_recv_wait(struct jsonrpc_session *s) { diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h index 5f46e3b..c8e6690 100644 --- a/lib/jsonrpc.h +++ b/lib/jsonrpc.h @@ -53,6 +53,10 @@ size_t jsonrpc_get_backlog(const struct jsonrpc *); unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *); const char *jsonrpc_get_name(const struct jsonrpc *); +bool jsonrpc_joined_poll_group(const struct jsonrpc *); +bool jsonrpc_has_pending_input(const struct jsonrpc *); +void jsonrpc_poll_group_update(struct jsonrpc *, bool); + int jsonrpc_send(struct jsonrpc *, struct jsonrpc_msg *); int jsonrpc_recv(struct jsonrpc *, struct jsonrpc_msg **); void jsonrpc_recv_wait(struct jsonrpc *); @@ -107,6 +111,9 @@ void jsonrpc_session_run(struct jsonrpc_session *); void jsonrpc_session_wait(struct jsonrpc_session *); size_t jsonrpc_session_get_backlog(const struct jsonrpc_session *); +bool jsonrpc_session_joined_poll_group(const struct jsonrpc_session *); +bool jsonrpc_session_has_pending_input(const struct jsonrpc_session *); +void jsonrpc_session_poll_group_update(struct jsonrpc_session *, bool); const char *jsonrpc_session_get_name(const struct jsonrpc_session *); int jsonrpc_session_send(struct jsonrpc_session *, struct jsonrpc_msg *); -- 1.9.1 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev