rust: task: Add kernel thread support

`ThreadNew` represenets a newly created thread that has not started
running yet, and `Thread` represents a started thread.

Signed-off-by: Boqun Feng <boqun.feng@gmail.com>
diff --git a/rust/kernel/task.rs b/rust/kernel/task.rs
index 927413d..57232f6 100644
--- a/rust/kernel/task.rs
+++ b/rust/kernel/task.rs
@@ -17,6 +17,11 @@
     ptr,
 };
 
+pub mod thread;
+
+#[doc(inline)]
+pub use thread::{Thread, ThreadNew};
+
 /// A sentinel value used for infinite timeouts.
 pub const MAX_SCHEDULE_TIMEOUT: c_long = c_long::MAX;
 
diff --git a/rust/kernel/task/thread.rs b/rust/kernel/task/thread.rs
new file mode 100644
index 0000000..7fd8af0
--- /dev/null
+++ b/rust/kernel/task/thread.rs
@@ -0,0 +1,314 @@
+// SPDX-License-Identifier: GPL-2.0
+
+//! A kernel thread (kthread).
+//!
+//! This module allows Rust code to create/wakeup/stop a kernel thread.
+
+use core::ffi::CStr;
+use core::ops::FnOnce;
+
+use crate::{
+    alloc::{flags::GFP_KERNEL, KBox},
+    bindings,
+    cpu::CpuId,
+    error::{code::EINTR, from_err_ptr, to_result, Result},
+    types::ARef,
+};
+
+use super::Task;
+
+/// A running kernel thread.
+pub struct Thread {
+    /// The reference to the corresponding [Task].
+    task: ARef<Task>,
+}
+
+/// A kernel thread that has not run yet.
+pub struct ThreadNew<T> {
+    /// The reference to the corresponding [Task].
+    task: ARef<Task>,
+
+    /// Pointer to the thread argument.
+    arg_ptr: *mut T,
+}
+
+/// Bridge function of type `F` from Rust ABI to C.
+extern "C" fn bridge<F>(data: *mut core::ffi::c_void) -> i32
+where
+    F: FnOnce() -> Result,
+{
+    // SAFETY: `data` is the result of `KBox::into_raw()`, therefore it's safe to
+    // `KBox::from_raw()` here.
+    let f = unsafe { KBox::from_raw(data as *mut F) };
+
+    // TODO: Make KBox<FnOnce()> impl FnOnce<Args>.
+    let f = KBox::into_inner(f);
+
+    match f() {
+        Ok(()) => 0,
+        Err(e) => e.to_errno(),
+    }
+}
+
+impl<T> ThreadNew<T> {
+    /// Binds the kernel thread to a particular cpu.
+    pub fn bind(&self, cpu: CpuId) {
+        // SAFETY: `self.task.as_ptr()` is a valid pointer to a task, and `cpu` is a valid cpu number.
+        unsafe { bindings::kthread_bind(self.task.as_ptr(), cpu.as_u32()) };
+    }
+
+    /// Starts a new thread.
+    pub fn start(self) -> Thread {
+        // Do not need to run `ThreadNew::drop()` since we are going start the thread. And we just
+        // need pass the refcount from `ThreadNew` to `Thread.
+        let task = &core::mem::ManuallyDrop::new(self).task;
+
+        // CAST: `Task` is transparent to `bindings::task_struct`.
+        let task = task.as_ptr().cast::<Task>();
+
+        // SAFETY: `task` is not null.
+        let task = unsafe { core::ptr::NonNull::new_unchecked(task) };
+
+        // SAFETY: `ThreadNew` will never be dropped, so the refcount in it can be safely passed to
+        // the `Thread`.
+        let task = unsafe { ARef::from_raw(task) };
+
+        // SAFETY: `task.as_ptr()` is a valid pointer, and `task` is newly created thread, so it's
+        // safe to start.
+        unsafe { bindings::kthread_start(task.as_ptr()) };
+
+        Thread { task }
+    }
+}
+
+impl<T> Drop for ThreadNew<T> {
+    fn drop(&mut self) {
+        // SAFETY: `self.task.as_ptr()` is a valid pointer to a task.
+        let result = to_result(unsafe { bindings::kthread_stop(self.task.as_ptr()) });
+
+        // If the thread has not run, the `result` from `kthread_stop` must be `EINTR`.
+        assert_eq!(Err(EINTR), result);
+
+        // SAFETY: `self.arg_ptr` came from a previous `KBox::into_raw()`, and the thread hasn't
+        // run or won't run either, so it's safe to call `KBox::from_raw()`.
+        drop(unsafe { KBox::from_raw(self.arg_ptr) });
+    }
+}
+
+impl Thread {
+    /// Creates a new thread.
+    ///
+    /// # Examples
+    ///
+    /// ```rust
+    /// # use kernel::prelude::*;
+    /// use kernel::{sync::{new_condvar, new_mutex, Arc, CondVar, Mutex}, task::Thread};
+    ///
+    /// let boxed = KBox::new(42, GFP_KERNEL)?;
+    /// let lock = Arc::pin_init(new_mutex!(false), GFP_KERNEL)?;
+    /// let cv = Arc::pin_init(new_condvar!(), GFP_KERNEL)?;
+    ///
+    /// let t = Thread::new(c"rust-thread", {
+    ///     let lock = lock.clone();
+    ///     let cv = cv.clone();
+    ///     move || {
+    ///         *(lock.lock()) = true;
+    ///         cv.notify_all();
+    ///         Ok(())
+    ///     }
+    /// })?;
+    ///
+    /// let t = t.start();
+    ///
+    /// let mut g = lock.lock();
+    /// while !*g {
+    ///     cv.wait(&mut g);
+    /// }
+    ///
+    /// assert!(t.stop().is_ok());
+    ///
+    /// # Ok::<(), Error>(())
+    /// ```
+    ///
+    /// The case where the thread has not started.
+    ///
+    /// ```rust
+    /// # use kernel::prelude::*;
+    /// use kernel::{sync::{new_condvar, new_mutex, Arc, CondVar, Mutex}, task::Thread};
+    ///
+    /// let boxed = KBox::new(42, GFP_KERNEL)?;
+    /// let lock = Arc::pin_init(new_mutex!(false), GFP_KERNEL)?;
+    ///
+    /// let t = Thread::new(c"rust-thread", {
+    ///     let lock = lock.clone();
+    ///     move || {
+    ///         *(lock.lock()) = true;
+    ///         Ok(())
+    ///     }
+    /// })?;
+    ///
+    /// drop(t);
+    /// assert!(!*lock.lock());
+    ///
+    /// # Ok::<(), Error>(())
+    /// ```
+    ///
+    /// # Context
+    ///
+    /// This function might sleep in `kthread_create_on_node` due to the memory allocation and
+    /// waiting for the completion, therefore do not call this in atomic contexts (i.e.
+    /// preemption-off contexts).
+    pub fn new<F>(name: &CStr, f: F) -> Result<ThreadNew<F>>
+    where
+        F: FnOnce() -> Result,
+        F: Send + 'static,
+    {
+        let boxed_fn = KBox::new(f, GFP_KERNEL)?;
+        let arg = KBox::into_raw(boxed_fn);
+
+        // SAFETY:
+        // - `bridge::<F>` is a valid pointer to a thread function.
+        // - `arg` is a valid pointer.
+        // - `kthread_create_on_node()` will copy the content of `name`, so `name` doesn't not need
+        //   to live longer than this function call.
+        let task_ptr = from_err_ptr(unsafe {
+            bindings::kthread_create_on_node(
+                Some(bridge::<F>),
+                arg.cast(),
+                bindings::NUMA_NO_NODE,
+                c"%s".as_ptr(),
+                name.as_ptr(),
+            )
+        })
+        .map_err(|e| {
+            // SAFETY: `kthread_create*()` failure means no thread taking the `arg`, and therefore
+            // it is safe to convert back to a `KBox`.
+            drop(unsafe { KBox::from_raw(arg) });
+
+            e
+        })?;
+
+        // CAST: `Task` is transparent to `bindings::task_struct`.
+        // SAFETY: `task_ptr` is a valid pointer for a `task_struct` because we've checked with
+        // `from_err_ptr` above.
+        let task_ref = unsafe { &*(task_ptr.cast::<Task>()) };
+
+        // Increases the refcount of the task, so that it won't go away if it `do_exit`.
+        Ok(ThreadNew {
+            task: task_ref.into(),
+            arg_ptr: arg,
+        })
+    }
+
+    /// Creates a new thread and run it.
+    ///
+    /// # Examples
+    ///
+    /// ```rust
+    /// # use kernel::prelude::*;
+    /// use kernel::{sync::{new_condvar, new_mutex, Arc, CondVar, Mutex}, task::Thread};
+    ///
+    /// let boxed = KBox::new(42, GFP_KERNEL)?;
+    /// let lock = Arc::pin_init(new_mutex!(false), GFP_KERNEL)?;
+    /// let cv = Arc::pin_init(new_condvar!(), GFP_KERNEL)?;
+    ///
+    /// let t = Thread::spawn(c"rust-thread", {
+    ///     let lock = lock.clone();
+    ///     let cv = cv.clone();
+    ///     move || {
+    ///         *(lock.lock()) = true;
+    ///         cv.notify_all();
+    ///         Ok(())
+    ///     }
+    /// })?;
+    ///
+    /// let mut g = lock.lock();
+    /// while !*g {
+    ///     cv.wait(&mut g);
+    /// }
+    ///
+    /// assert!(t.stop().is_ok());
+    ///
+    /// # Ok::<(), Error>(())
+    /// ```
+    ///
+    /// # Context
+    ///
+    /// This function might sleep in `kthread_create_on_node` due to the memory allocation and
+    /// waiting for the completion, therefore do not call this in atomic contexts (i.e.
+    /// preemption-off contexts).
+    pub fn spawn<F>(name: &CStr, f: F) -> Result<Self>
+    where
+        F: FnOnce() -> Result,
+        F: Send + 'static,
+    {
+        Ok(Self::new(name, f)?.start())
+    }
+
+    /// Creates a new thread and run it on a particular cpu.
+    ///
+    /// # Examples
+    ///
+    /// ```rust
+    /// # use kernel::prelude::*;
+    /// use kernel::{sync::{new_condvar, new_mutex, Arc, CondVar, Mutex}, task::Thread};
+    /// use kernel::cpu::CpuId;
+    ///
+    /// let boxed = KBox::new(42, GFP_KERNEL)?;
+    /// let lock = Arc::pin_init(new_mutex!(false), GFP_KERNEL)?;
+    /// let cv = Arc::pin_init(new_condvar!(), GFP_KERNEL)?;
+    ///
+    /// let cpu = CpuId::from_i32(2).ok_or(kernel::error::code::EINVAL)?;
+    ///
+    /// let t = Thread::spawn_on(c"rust-thread", cpu, {
+    ///     let lock = lock.clone();
+    ///     let cv = cv.clone();
+    ///     move || {
+    ///         assert_eq!(CpuId::current().as_u32(), 2);
+    ///         *(lock.lock()) = true;
+    ///         cv.notify_all();
+    ///         Ok(())
+    ///     }
+    /// })?;
+    ///
+    /// let mut g = lock.lock();
+    /// while !*g {
+    ///     cv.wait(&mut g);
+    /// }
+    ///
+    /// assert!(t.stop().is_ok());
+    ///
+    /// # Ok::<(), Error>(())
+    /// ```
+    ///
+    /// # Context
+    ///
+    /// This function might sleep in `kthread_create_on_node` due to the memory allocation and
+    /// waiting for the completion, therefore do not call this in atomic contexts (i.e.
+    /// preemption-off contexts).
+    pub fn spawn_on<F>(name: &CStr, cpu: CpuId, f: F) -> Result<Self>
+    where
+        F: FnOnce() -> Result,
+        F: Send + 'static,
+    {
+        let t = Self::new(name, f)?;
+
+        t.bind(cpu);
+
+        Ok(t.start())
+    }
+
+    /// Stops the thread.
+    ///
+    /// Waits for the closure to return or the thread `do_exit()` itself.
+    ///
+    /// Consumes the [`Thread`]. In case of error, returns the exit code of the thread.
+    ///
+    /// # Context
+    ///
+    /// This function might sleep, don't call it in atomic contexts.
+    pub fn stop(self) -> Result {
+        // SAFETY: `self.task.as_ptr()` is a valid pointer to a task.
+        to_result(unsafe { bindings::kthread_stop(self.task.as_ptr()) })
+    }
+}