io_uring/rsrc: move node refs == 0 checking outside of hot path
One thing that is slightly suboptimal on the current resource node
handling, is that putting a node neccesitates decrementing the
ref count AND checking if it hit zero. While this isn't an atomic
operation (as we're under the ctx lock), it does mean the decrement
cannot be pipelined.
Move the dead node checking to unregistration time. The holder of a
node will still decrement the reference count, but it need not check
if it hit zero. It will, by definition, never hit zero until it gets
unregistered, as the table itself holds a reference.
At unregistration time, if the node ref count isn't zero when the
table reference is dropped, then its known that someone else holds a
reference to it and will put it at some point. For this situation,
store an entry for later reaping in ctx->rsrc_free. Whenever a batch
of nodes is flushed, whether ctx->rsrc_free is empty or not is
checked, and previously released nodes will be freed.
With that, the overhead of putting a node from the hot path is reduced
quite a bit, as everything pipelines nicely and doesn't stall on
bringing in the node cacheline.
Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index ad50011..e458ebe 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -320,6 +320,13 @@ struct io_ring_ctx {
unsigned cq_entries;
struct io_ev_fd __rcu *io_ev_fd;
unsigned cq_extra;
+
+ /*
+ * Deferred free of io_rsrc_node entries that were busy
+ * at the time of unregistration.
+ */
+ unsigned int rsrc_free_nr;
+ struct xarray rsrc_free;
} ____cacheline_aligned_in_smp;
/*
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index f34fa1e..efbc3b59 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -293,6 +293,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
return NULL;
xa_init(&ctx->io_bl_xa);
+ xa_init(&ctx->rsrc_free);
/*
* Use 5 bits less than the max cq entries, that should give us around
@@ -366,6 +367,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
io_futex_cache_free(ctx);
kvfree(ctx->cancel_table.hbs);
xa_destroy(&ctx->io_bl_xa);
+ xa_destroy(&ctx->rsrc_free);
kfree(ctx);
return NULL;
}
@@ -1562,6 +1564,8 @@ static void io_free_batch_list(struct io_ring_ctx *ctx,
node = req->comp_list.next;
io_req_add_to_cache(req, ctx);
} while (node);
+
+ io_reap_rsrc_nodes(ctx);
}
void __io_submit_flush_completions(struct io_ring_ctx *ctx)
@@ -3009,6 +3013,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_futex_cache_free(ctx);
io_destroy_buffers(ctx);
io_unregister_cqwait_reg(ctx);
+ io_reap_rsrc_nodes(ctx);
mutex_unlock(&ctx->uring_lock);
if (ctx->sq_creds)
put_cred(ctx->sq_creds);
@@ -3034,6 +3039,9 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_napi_free(ctx);
kvfree(ctx->cancel_table.hbs);
xa_destroy(&ctx->io_bl_xa);
+ WARN_ON_ONCE(!xa_empty(&ctx->rsrc_free));
+ WARN_ON_ONCE(ctx->rsrc_free_nr);
+ xa_destroy(&ctx->rsrc_free);
kfree(ctx);
}
diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c
index 2fb1791..2effdd3 100644
--- a/io_uring/rsrc.c
+++ b/io_uring/rsrc.c
@@ -118,14 +118,67 @@ static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_rsrc_node *node)
}
}
+void __io_reap_rsrc_nodes(struct io_ring_ctx *ctx)
+{
+ struct io_rsrc_node *node;
+ unsigned long index;
+
+ /* Find and reap nodes that hit zero refs */
+ xa_for_each(&ctx->rsrc_free, index, node) {
+ if (!node->refs)
+ io_free_rsrc_node(node);
+ }
+}
+
+void io_release_rsrc_node(struct io_rsrc_node *node)
+{
+ struct io_ring_ctx *ctx = io_rsrc_node_ctx(node);
+ unsigned long index = (unsigned long) node;
+
+ /*
+ * If ref dropped to zero, nobody else has a reference to it and we
+ * can safely free it. If it's non-zero, then someone else has a
+ * reference and will put it at some point. As the put side is the
+ * fast path, no checking is done there for the ref dropping to zero.
+ * It just means that nobody else has it, and also that nobody else
+ * can find it as it's been removed from the lookup table prior to
+ * that. A ref dropping to zero as part of fast path node put cannot
+ * happen without the unregister side having already stored the node
+ * in ctx->rsrc_free.
+ */
+ if (!--node->refs) {
+ io_free_rsrc_node(node);
+ return;
+ }
+
+ /*
+ * If the free list already has an entry for this node, then it was
+ * already stashed for cleanup. Just let the normal cleanup reap it.
+ */
+ if (xa_load(&ctx->rsrc_free, index))
+ return;
+
+ /* Slot was reserved at registration time */
+ ctx->rsrc_free_nr++;
+ if (xa_store(&ctx->rsrc_free, index, node, GFP_NOWAIT))
+ WARN_ON_ONCE(1);
+}
+
struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx, int type)
{
struct io_rsrc_node *node;
node = kzalloc(sizeof(*node), GFP_KERNEL);
if (node) {
- node->ctx_ptr = (unsigned long) ctx | type;
- node->refs = 1;
+ unsigned long index = (unsigned long) node;
+
+ if (!xa_reserve(&ctx->rsrc_free, index, GFP_KERNEL)) {
+ node->ctx_ptr = (unsigned long) ctx | type;
+ node->refs = 1;
+ return node;
+ }
+ kfree(node);
+ node = NULL;
}
return node;
}
@@ -134,10 +187,8 @@ __cold void io_rsrc_data_free(struct io_rsrc_data *data)
{
if (!data->nr)
return;
- while (data->nr--) {
- if (data->nodes[data->nr])
- io_put_rsrc_node(data->nodes[data->nr]);
- }
+ while (data->nr--)
+ io_reset_rsrc_node(data, data->nr);
kvfree(data->nodes);
data->nodes = NULL;
data->nr = 0;
@@ -465,6 +516,8 @@ void io_free_rsrc_node(struct io_rsrc_node *node)
break;
}
+ if (xa_erase(&ctx->rsrc_free, (unsigned long) node))
+ ctx->rsrc_free_nr--;
kfree(node);
}
@@ -475,6 +528,7 @@ int io_sqe_files_unregister(struct io_ring_ctx *ctx)
io_free_file_tables(&ctx->file_table);
io_file_table_set_alloc_range(ctx, 0, 0);
+ io_reap_rsrc_nodes(ctx);
return 0;
}
@@ -552,6 +606,7 @@ int io_sqe_buffers_unregister(struct io_ring_ctx *ctx)
if (!ctx->buf_table.nr)
return -ENXIO;
io_rsrc_data_free(&ctx->buf_table);
+ io_reap_rsrc_nodes(ctx);
return 0;
}
@@ -788,7 +843,7 @@ static struct io_rsrc_node *io_sqe_buffer_register(struct io_ring_ctx *ctx,
if (ret) {
kvfree(imu);
if (node)
- io_put_rsrc_node(node);
+ io_release_rsrc_node(node);
node = ERR_PTR(ret);
}
kvfree(pages);
@@ -1038,8 +1093,8 @@ static int io_clone_buffers(struct io_ring_ctx *ctx, struct io_ring_ctx *src_ctx
out_put_free:
i = data.nr;
while (i--) {
- io_buffer_unmap(src_ctx, data.nodes[i]);
- kfree(data.nodes[i]);
+ if (data.nodes[i])
+ io_free_rsrc_node(data.nodes[i]);
}
out_unlock:
io_rsrc_data_free(&data);
diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h
index bc3a863..10e06e2 100644
--- a/io_uring/rsrc.h
+++ b/io_uring/rsrc.h
@@ -45,6 +45,8 @@ struct io_imu_folio_data {
};
struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx, int type);
+void io_release_rsrc_node(struct io_rsrc_node *node);
+void __io_reap_rsrc_nodes(struct io_ring_ctx *ctx);
void io_free_rsrc_node(struct io_rsrc_node *node);
void io_rsrc_data_free(struct io_rsrc_data *data);
int io_rsrc_data_alloc(struct io_rsrc_data *data, unsigned nr);
@@ -76,10 +78,20 @@ static inline struct io_rsrc_node *io_rsrc_node_lookup(struct io_rsrc_data *data
return NULL;
}
+static inline void io_reap_rsrc_nodes(struct io_ring_ctx *ctx)
+{
+ if (ctx->rsrc_free_nr)
+ __io_reap_rsrc_nodes(ctx);
+}
+
+/*
+ * Reaping done at unregistration/removal time, hence no checking needed
+ * here - just drop the held reference.
+ */
static inline void io_put_rsrc_node(struct io_rsrc_node *node)
{
- if (node && !--node->refs)
- io_free_rsrc_node(node);
+ if (node)
+ node->refs--;
}
static inline bool io_reset_rsrc_node(struct io_rsrc_data *data, int index)
@@ -88,7 +100,7 @@ static inline bool io_reset_rsrc_node(struct io_rsrc_data *data, int index)
if (!node)
return false;
- io_put_rsrc_node(node);
+ io_release_rsrc_node(node);
data->nodes[index] = NULL;
return true;
}