123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- // Copyright 2019-2021 Tauri Programme within The Commons Conservancy
- // SPDX-License-Identifier: Apache-2.0
- // SPDX-License-Identifier: MIT
- //! The singleton async runtime used by Tauri and exposed to users.
- //!
- //! Tauri uses [`tokio`] Runtime to initialize code, such as
- //! [`Plugin::initialize`](../plugin/trait.Plugin.html#method.initialize) and [`crate::Builder::setup`] hooks.
- //! This module also re-export some common items most developers need from [`tokio`]. If there's
- //! one you need isn't here, you could use types in [`tokio`] dierectly.
- //! For custom command handlers, it's recommended to use a plain `async fn` command.
- use futures_lite::future::FutureExt;
- use once_cell::sync::OnceCell;
- pub use tokio::{
- runtime::{Handle as TokioHandle, Runtime as TokioRuntime},
- sync::{
- mpsc::{channel, Receiver, Sender},
- Mutex, RwLock,
- },
- task::JoinHandle as TokioJoinHandle,
- };
- use std::{
- future::Future,
- pin::Pin,
- task::{Context, Poll},
- };
- static RUNTIME: OnceCell<GlobalRuntime> = OnceCell::new();
- struct GlobalRuntime {
- runtime: Option<Runtime>,
- handle: RuntimeHandle,
- }
- impl GlobalRuntime {
- fn handle(&self) -> RuntimeHandle {
- if let Some(r) = &self.runtime {
- r.handle()
- } else {
- self.handle.clone()
- }
- }
- fn spawn<F: Future>(&self, task: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- if let Some(r) = &self.runtime {
- r.spawn(task)
- } else {
- self.handle.spawn(task)
- }
- }
- pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
- where
- F: FnOnce() -> R + Send + 'static,
- R: Send + 'static,
- {
- if let Some(r) = &self.runtime {
- r.spawn_blocking(func)
- } else {
- self.handle.spawn_blocking(func)
- }
- }
- fn block_on<F: Future>(&self, task: F) -> F::Output {
- if let Some(r) = &self.runtime {
- r.block_on(task)
- } else {
- self.handle.block_on(task)
- }
- }
- }
- /// A runtime used to execute asynchronous tasks.
- pub enum Runtime {
- /// The tokio runtime.
- Tokio(TokioRuntime),
- }
- impl Runtime {
- /// Gets a reference to the [`TokioRuntime`].
- pub fn inner(&self) -> &TokioRuntime {
- let Self::Tokio(r) = self;
- r
- }
- /// Returns a handle of the async runtime.
- pub fn handle(&self) -> RuntimeHandle {
- match self {
- Self::Tokio(r) => RuntimeHandle::Tokio(r.handle().clone()),
- }
- }
- /// Spawns a future onto the runtime.
- pub fn spawn<F: Future>(&self, task: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- match self {
- Self::Tokio(r) => JoinHandle::Tokio(r.spawn(task)),
- }
- }
- /// Runs the provided function on an executor dedicated to blocking operations.
- pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
- where
- F: FnOnce() -> R + Send + 'static,
- R: Send + 'static,
- {
- match self {
- Self::Tokio(r) => JoinHandle::Tokio(r.spawn_blocking(func)),
- }
- }
- /// Runs a future to completion on runtime.
- pub fn block_on<F: Future>(&self, task: F) -> F::Output {
- match self {
- Self::Tokio(r) => r.block_on(task),
- }
- }
- }
- /// An owned permission to join on a task (await its termination).
- #[derive(Debug)]
- pub enum JoinHandle<T> {
- /// The tokio JoinHandle.
- Tokio(TokioJoinHandle<T>),
- }
- impl<T> JoinHandle<T> {
- /// Gets a reference to the [`TokioJoinHandle`].
- pub fn inner(&self) -> &TokioJoinHandle<T> {
- let Self::Tokio(t) = self;
- t
- }
- /// Abort the task associated with the handle.
- ///
- /// Awaiting a cancelled task might complete as usual if the task was
- /// already completed at the time it was cancelled, but most likely it
- /// will fail with a cancelled `JoinError`.
- pub fn abort(&self) {
- match self {
- Self::Tokio(t) => t.abort(),
- }
- }
- }
- impl<T> Future for JoinHandle<T> {
- type Output = crate::Result<T>;
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match self.get_mut() {
- Self::Tokio(t) => t.poll(cx).map_err(|e| crate::Error::JoinError(Box::new(e))),
- }
- }
- }
- /// A handle to the async runtime
- #[derive(Clone)]
- pub enum RuntimeHandle {
- /// The tokio handle.
- Tokio(TokioHandle),
- }
- impl RuntimeHandle {
- /// Gets a reference to the [`TokioHandle`].
- pub fn inner(&self) -> &TokioHandle {
- let Self::Tokio(h) = self;
- h
- }
- /// Runs the provided function on an executor dedicated to blocking operations.
- pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
- where
- F: FnOnce() -> R + Send + 'static,
- R: Send + 'static,
- {
- match self {
- Self::Tokio(h) => JoinHandle::Tokio(h.spawn_blocking(func)),
- }
- }
- /// Spawns a future onto the runtime.
- pub fn spawn<F: Future>(&self, task: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- match self {
- Self::Tokio(h) => JoinHandle::Tokio(h.spawn(task)),
- }
- }
- /// Runs a future to completion on runtime.
- pub fn block_on<F: Future>(&self, task: F) -> F::Output {
- match self {
- Self::Tokio(h) => h.block_on(task),
- }
- }
- }
- fn default_runtime() -> GlobalRuntime {
- let runtime = Runtime::Tokio(TokioRuntime::new().unwrap());
- let handle = runtime.handle();
- GlobalRuntime {
- runtime: Some(runtime),
- handle,
- }
- }
- /// Sets the runtime to use to execute asynchronous tasks.
- /// For convinience, this method takes a [`TokioHandle`].
- /// Note that you cannot drop the underlying [`TokioRuntime`].
- ///
- /// # Example
- ///
- /// ```rust
- /// #[tokio::main]
- /// async fn main() {
- /// // perform some async task before initializing the app
- /// do_something().await;
- /// // share the current runtime with Tauri
- /// tauri::async_runtime::set(tokio::runtime::Handle::current());
- ///
- /// // bootstrap the tauri app...
- /// // tauri::Builder::default().run().unwrap();
- /// }
- ///
- /// async fn do_something() {}
- /// ```
- ///
- /// # Panics
- ///
- /// Panics if the runtime is already set.
- pub fn set(handle: TokioHandle) {
- RUNTIME
- .set(GlobalRuntime {
- runtime: None,
- handle: RuntimeHandle::Tokio(handle),
- })
- .unwrap_or_else(|_| panic!("runtime already initialized"))
- }
- /// Returns a handle of the async runtime.
- pub fn handle() -> RuntimeHandle {
- let runtime = RUNTIME.get_or_init(default_runtime);
- runtime.handle()
- }
- /// Runs a future to completion on runtime.
- pub fn block_on<F: Future>(task: F) -> F::Output {
- let runtime = RUNTIME.get_or_init(default_runtime);
- runtime.block_on(task)
- }
- /// Spawns a future onto the runtime.
- pub fn spawn<F>(task: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- let runtime = RUNTIME.get_or_init(default_runtime);
- runtime.spawn(task)
- }
- /// Runs the provided function on an executor dedicated to blocking operations.
- pub fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
- where
- F: FnOnce() -> R + Send + 'static,
- R: Send + 'static,
- {
- let runtime = RUNTIME.get_or_init(default_runtime);
- runtime.spawn_blocking(func)
- }
- pub(crate) fn safe_block_on<F>(task: F) -> F::Output
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- if tokio::runtime::Handle::try_current().is_ok() {
- let (tx, rx) = std::sync::mpsc::sync_channel(1);
- spawn(async move {
- tx.send(task.await).unwrap();
- });
- rx.recv().unwrap()
- } else {
- block_on(task)
- }
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- #[tokio::test]
- async fn runtime_spawn() {
- let join = spawn(async { 5 });
- assert_eq!(join.await.unwrap(), 5);
- }
- #[test]
- fn runtime_block_on() {
- assert_eq!(block_on(async { 0 }), 0);
- }
- #[tokio::test]
- async fn handle_spawn() {
- let handle = handle();
- let join = handle.spawn(async { 5 });
- assert_eq!(join.await.unwrap(), 5);
- }
- #[test]
- fn handle_block_on() {
- let handle = handle();
- assert_eq!(handle.block_on(async { 0 }), 0);
- }
- #[tokio::test]
- async fn handle_abort() {
- let handle = handle();
- let join = handle.spawn(async { 5 });
- join.abort();
- if let crate::Error::JoinError(raw_box) = join.await.unwrap_err() {
- let raw_error = raw_box.downcast::<tokio::task::JoinError>().unwrap();
- assert!(raw_error.is_cancelled());
- } else {
- panic!("Abort did not result in the expected `JoinError`");
- }
- }
- }
|