rust: task: Add kernel thread support
`ThreadNew` represenets a newly created thread that has not started
running yet, and `Thread` represents a started thread.
Signed-off-by: Boqun Feng <boqun.feng@gmail.com>
diff --git a/rust/kernel/task.rs b/rust/kernel/task.rs
index 927413d..57232f6 100644
--- a/rust/kernel/task.rs
+++ b/rust/kernel/task.rs
@@ -17,6 +17,11 @@
ptr,
};
+pub mod thread;
+
+#[doc(inline)]
+pub use thread::{Thread, ThreadNew};
+
/// A sentinel value used for infinite timeouts.
pub const MAX_SCHEDULE_TIMEOUT: c_long = c_long::MAX;
diff --git a/rust/kernel/task/thread.rs b/rust/kernel/task/thread.rs
new file mode 100644
index 0000000..7fd8af0
--- /dev/null
+++ b/rust/kernel/task/thread.rs
@@ -0,0 +1,314 @@
+// SPDX-License-Identifier: GPL-2.0
+
+//! A kernel thread (kthread).
+//!
+//! This module allows Rust code to create/wakeup/stop a kernel thread.
+
+use core::ffi::CStr;
+use core::ops::FnOnce;
+
+use crate::{
+ alloc::{flags::GFP_KERNEL, KBox},
+ bindings,
+ cpu::CpuId,
+ error::{code::EINTR, from_err_ptr, to_result, Result},
+ types::ARef,
+};
+
+use super::Task;
+
+/// A running kernel thread.
+pub struct Thread {
+ /// The reference to the corresponding [Task].
+ task: ARef<Task>,
+}
+
+/// A kernel thread that has not run yet.
+pub struct ThreadNew<T> {
+ /// The reference to the corresponding [Task].
+ task: ARef<Task>,
+
+ /// Pointer to the thread argument.
+ arg_ptr: *mut T,
+}
+
+/// Bridge function of type `F` from Rust ABI to C.
+extern "C" fn bridge<F>(data: *mut core::ffi::c_void) -> i32
+where
+ F: FnOnce() -> Result,
+{
+ // SAFETY: `data` is the result of `KBox::into_raw()`, therefore it's safe to
+ // `KBox::from_raw()` here.
+ let f = unsafe { KBox::from_raw(data as *mut F) };
+
+ // TODO: Make KBox<FnOnce()> impl FnOnce<Args>.
+ let f = KBox::into_inner(f);
+
+ match f() {
+ Ok(()) => 0,
+ Err(e) => e.to_errno(),
+ }
+}
+
+impl<T> ThreadNew<T> {
+ /// Binds the kernel thread to a particular cpu.
+ pub fn bind(&self, cpu: CpuId) {
+ // SAFETY: `self.task.as_ptr()` is a valid pointer to a task, and `cpu` is a valid cpu number.
+ unsafe { bindings::kthread_bind(self.task.as_ptr(), cpu.as_u32()) };
+ }
+
+ /// Starts a new thread.
+ pub fn start(self) -> Thread {
+ // Do not need to run `ThreadNew::drop()` since we are going start the thread. And we just
+ // need pass the refcount from `ThreadNew` to `Thread.
+ let task = &core::mem::ManuallyDrop::new(self).task;
+
+ // CAST: `Task` is transparent to `bindings::task_struct`.
+ let task = task.as_ptr().cast::<Task>();
+
+ // SAFETY: `task` is not null.
+ let task = unsafe { core::ptr::NonNull::new_unchecked(task) };
+
+ // SAFETY: `ThreadNew` will never be dropped, so the refcount in it can be safely passed to
+ // the `Thread`.
+ let task = unsafe { ARef::from_raw(task) };
+
+ // SAFETY: `task.as_ptr()` is a valid pointer, and `task` is newly created thread, so it's
+ // safe to start.
+ unsafe { bindings::kthread_start(task.as_ptr()) };
+
+ Thread { task }
+ }
+}
+
+impl<T> Drop for ThreadNew<T> {
+ fn drop(&mut self) {
+ // SAFETY: `self.task.as_ptr()` is a valid pointer to a task.
+ let result = to_result(unsafe { bindings::kthread_stop(self.task.as_ptr()) });
+
+ // If the thread has not run, the `result` from `kthread_stop` must be `EINTR`.
+ assert_eq!(Err(EINTR), result);
+
+ // SAFETY: `self.arg_ptr` came from a previous `KBox::into_raw()`, and the thread hasn't
+ // run or won't run either, so it's safe to call `KBox::from_raw()`.
+ drop(unsafe { KBox::from_raw(self.arg_ptr) });
+ }
+}
+
+impl Thread {
+ /// Creates a new thread.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # use kernel::prelude::*;
+ /// use kernel::{sync::{new_condvar, new_mutex, Arc, CondVar, Mutex}, task::Thread};
+ ///
+ /// let boxed = KBox::new(42, GFP_KERNEL)?;
+ /// let lock = Arc::pin_init(new_mutex!(false), GFP_KERNEL)?;
+ /// let cv = Arc::pin_init(new_condvar!(), GFP_KERNEL)?;
+ ///
+ /// let t = Thread::new(c"rust-thread", {
+ /// let lock = lock.clone();
+ /// let cv = cv.clone();
+ /// move || {
+ /// *(lock.lock()) = true;
+ /// cv.notify_all();
+ /// Ok(())
+ /// }
+ /// })?;
+ ///
+ /// let t = t.start();
+ ///
+ /// let mut g = lock.lock();
+ /// while !*g {
+ /// cv.wait(&mut g);
+ /// }
+ ///
+ /// assert!(t.stop().is_ok());
+ ///
+ /// # Ok::<(), Error>(())
+ /// ```
+ ///
+ /// The case where the thread has not started.
+ ///
+ /// ```rust
+ /// # use kernel::prelude::*;
+ /// use kernel::{sync::{new_condvar, new_mutex, Arc, CondVar, Mutex}, task::Thread};
+ ///
+ /// let boxed = KBox::new(42, GFP_KERNEL)?;
+ /// let lock = Arc::pin_init(new_mutex!(false), GFP_KERNEL)?;
+ ///
+ /// let t = Thread::new(c"rust-thread", {
+ /// let lock = lock.clone();
+ /// move || {
+ /// *(lock.lock()) = true;
+ /// Ok(())
+ /// }
+ /// })?;
+ ///
+ /// drop(t);
+ /// assert!(!*lock.lock());
+ ///
+ /// # Ok::<(), Error>(())
+ /// ```
+ ///
+ /// # Context
+ ///
+ /// This function might sleep in `kthread_create_on_node` due to the memory allocation and
+ /// waiting for the completion, therefore do not call this in atomic contexts (i.e.
+ /// preemption-off contexts).
+ pub fn new<F>(name: &CStr, f: F) -> Result<ThreadNew<F>>
+ where
+ F: FnOnce() -> Result,
+ F: Send + 'static,
+ {
+ let boxed_fn = KBox::new(f, GFP_KERNEL)?;
+ let arg = KBox::into_raw(boxed_fn);
+
+ // SAFETY:
+ // - `bridge::<F>` is a valid pointer to a thread function.
+ // - `arg` is a valid pointer.
+ // - `kthread_create_on_node()` will copy the content of `name`, so `name` doesn't not need
+ // to live longer than this function call.
+ let task_ptr = from_err_ptr(unsafe {
+ bindings::kthread_create_on_node(
+ Some(bridge::<F>),
+ arg.cast(),
+ bindings::NUMA_NO_NODE,
+ c"%s".as_ptr(),
+ name.as_ptr(),
+ )
+ })
+ .map_err(|e| {
+ // SAFETY: `kthread_create*()` failure means no thread taking the `arg`, and therefore
+ // it is safe to convert back to a `KBox`.
+ drop(unsafe { KBox::from_raw(arg) });
+
+ e
+ })?;
+
+ // CAST: `Task` is transparent to `bindings::task_struct`.
+ // SAFETY: `task_ptr` is a valid pointer for a `task_struct` because we've checked with
+ // `from_err_ptr` above.
+ let task_ref = unsafe { &*(task_ptr.cast::<Task>()) };
+
+ // Increases the refcount of the task, so that it won't go away if it `do_exit`.
+ Ok(ThreadNew {
+ task: task_ref.into(),
+ arg_ptr: arg,
+ })
+ }
+
+ /// Creates a new thread and run it.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # use kernel::prelude::*;
+ /// use kernel::{sync::{new_condvar, new_mutex, Arc, CondVar, Mutex}, task::Thread};
+ ///
+ /// let boxed = KBox::new(42, GFP_KERNEL)?;
+ /// let lock = Arc::pin_init(new_mutex!(false), GFP_KERNEL)?;
+ /// let cv = Arc::pin_init(new_condvar!(), GFP_KERNEL)?;
+ ///
+ /// let t = Thread::spawn(c"rust-thread", {
+ /// let lock = lock.clone();
+ /// let cv = cv.clone();
+ /// move || {
+ /// *(lock.lock()) = true;
+ /// cv.notify_all();
+ /// Ok(())
+ /// }
+ /// })?;
+ ///
+ /// let mut g = lock.lock();
+ /// while !*g {
+ /// cv.wait(&mut g);
+ /// }
+ ///
+ /// assert!(t.stop().is_ok());
+ ///
+ /// # Ok::<(), Error>(())
+ /// ```
+ ///
+ /// # Context
+ ///
+ /// This function might sleep in `kthread_create_on_node` due to the memory allocation and
+ /// waiting for the completion, therefore do not call this in atomic contexts (i.e.
+ /// preemption-off contexts).
+ pub fn spawn<F>(name: &CStr, f: F) -> Result<Self>
+ where
+ F: FnOnce() -> Result,
+ F: Send + 'static,
+ {
+ Ok(Self::new(name, f)?.start())
+ }
+
+ /// Creates a new thread and run it on a particular cpu.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # use kernel::prelude::*;
+ /// use kernel::{sync::{new_condvar, new_mutex, Arc, CondVar, Mutex}, task::Thread};
+ /// use kernel::cpu::CpuId;
+ ///
+ /// let boxed = KBox::new(42, GFP_KERNEL)?;
+ /// let lock = Arc::pin_init(new_mutex!(false), GFP_KERNEL)?;
+ /// let cv = Arc::pin_init(new_condvar!(), GFP_KERNEL)?;
+ ///
+ /// let cpu = CpuId::from_i32(2).ok_or(kernel::error::code::EINVAL)?;
+ ///
+ /// let t = Thread::spawn_on(c"rust-thread", cpu, {
+ /// let lock = lock.clone();
+ /// let cv = cv.clone();
+ /// move || {
+ /// assert_eq!(CpuId::current().as_u32(), 2);
+ /// *(lock.lock()) = true;
+ /// cv.notify_all();
+ /// Ok(())
+ /// }
+ /// })?;
+ ///
+ /// let mut g = lock.lock();
+ /// while !*g {
+ /// cv.wait(&mut g);
+ /// }
+ ///
+ /// assert!(t.stop().is_ok());
+ ///
+ /// # Ok::<(), Error>(())
+ /// ```
+ ///
+ /// # Context
+ ///
+ /// This function might sleep in `kthread_create_on_node` due to the memory allocation and
+ /// waiting for the completion, therefore do not call this in atomic contexts (i.e.
+ /// preemption-off contexts).
+ pub fn spawn_on<F>(name: &CStr, cpu: CpuId, f: F) -> Result<Self>
+ where
+ F: FnOnce() -> Result,
+ F: Send + 'static,
+ {
+ let t = Self::new(name, f)?;
+
+ t.bind(cpu);
+
+ Ok(t.start())
+ }
+
+ /// Stops the thread.
+ ///
+ /// Waits for the closure to return or the thread `do_exit()` itself.
+ ///
+ /// Consumes the [`Thread`]. In case of error, returns the exit code of the thread.
+ ///
+ /// # Context
+ ///
+ /// This function might sleep, don't call it in atomic contexts.
+ pub fn stop(self) -> Result {
+ // SAFETY: `self.task.as_ptr()` is a valid pointer to a task.
+ to_result(unsafe { bindings::kthread_stop(self.task.as_ptr()) })
+ }
+}