5
votes

Comment exécuter une tâche asynchrone à partir d'un thread non principal dans Tokio?

thread '<unnamed>' panicked at 'must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`', src/main.rs:7:9

6 commentaires

Quel est votre objectif réel ici? Pourquoi essayez-vous de générer cette tâche sur un autre thread? Je pense que la réponse dépend vraiment de ce que vous voulez faire.


@Frxstrem J'ai besoin de démarrer un autre thread à partir d'un thread non principal (t), et le thread (t) devrait continuer. Avant cela, j'utilisais un pool de threads. Quelque chose comme repl.it/repls/AssuredWellmadeParentheses Maintenant, j'ai décidé de remplacer le pool par async / await Tokio


La vraie question est: pourquoi mélangez-vous les threads et les tâches?


Si vous utilisez tokio, il vaut mieux l'utiliser partout. les tâches ne sont pas des threads et ne peuvent pas être générées de n'importe où, uniquement à partir d'un thread géré par tokio. Vous pourrez peut-être utiliser un canal tokio pour communiquer entre les deux, mais c'est un travail supplémentaire considérable et une structure de code très différente


Vous devez obtenir un handle vers le runtime, que vous pouvez passer au thread. Vous devrez peut-être créer manuellement le runtime pour pouvoir le faire. Le handle gérera ensuite le marshaling de la tâche du thread sur lequel il est appelé vers le thread de travail tokio.


Je suppose que le thread non principal est créé par quelque chose hors de votre contrôle.


3 Réponses :


2
votes

J'ai une application de traitement des tâches qui expose une API Web pour ajouter des tâches et les traiter, mais la demande d'API ne doit pas attendre la fin du travail (cela peut prendre un certain temps). J'utilise les événements envoyés par le serveur pour diffuser le résultat du travail. Cela signifie que le serveur API principal s'exécute à l'intérieur de main avec #[tokio::main] , mais où dois-je exécuter l'exécuteur de tâches? Dans l'exécuteur de travaux, j'aurai beaucoup d'attente: des choses comme le téléchargement. Ils interféreront avec le serveur API Web. La question cruciale est de savoir comment commencer les deux exécutions en parallèle?

Dans ce scénario, vous devez créer un thread séparé avec thread::spawn intérieur duquel vous allez créer un exécuteur Tokio. L'erreur que vous obtenez est que dans votre deuxième thread, il n'y a pas d'exécuteur Tokio (runtime). Vous devez en créer un manuellement et lui dire d'exécuter vos tâches. Le moyen le plus simple consiste à utiliser l'API Runtime :

use tokio; // 0.2.23

#[tokio::main]
async fn main() {
    let (_, _) = tokio::join!(start_server_listener(), start_job_processor());
}

Dans votre thread principal, un exécuteur est déjà disponible avec l'utilisation de #[tokio::main] . Avant l'ajout de cet attribut, le runtime était créé manuellement.

Si vous souhaitez vous en tenir à la philosophie async / await, vous pouvez utiliser join :

use tokio::runtime::Runtime; // 0.2.23

// Create the runtime
let rt = Runtime::new().unwrap();

// Spawn a future onto the runtime
rt.spawn(async {
    println!("now running on a worker thread");
});

C'est pourquoi la plupart des réponses remettent en question votre approche. Bien que très rare, je pense qu'il existe des scénarios dans lesquels vous souhaitez qu'un runtime asynchrone soit sur un autre thread tout en ayant l'avantage de configurer manuellement le runtime.


0 commentaires

0
votes

Il y a 2 choses importantes :

  1. Enveloppez également un point d'entrée pour le thread avec #[tokio::main] (ou créez un runtime tokio manuellement).
  2. Ne mélangez jamais différentes versions de tokio ! Par exemple, n'utilisez pas ensemble "0.2" avec "0.3" ou des caisses qui utilisent différentes versions de tokio crate.

L'exemple de travail:

use std::thread;
use tokio::task;

#[tokio::main]
async fn main() {
    thread::spawn(entrypoint).join();
    println!("printed after 123");
}

#[tokio::main]
async fn entrypoint() {
    let handle = task::spawn(async {
        println!("123");
    });
    handle.await;
}


0 commentaires

0
votes

L'élément clé est que vous devez obtenir une Handle Tokio. Il s'agit d'une référence à un Runtime et il vous permet de générer des tâches asynchrones en dehors du runtime.

Lorsque vous utilisez #[tokio::main] , le moyen le plus simple d'obtenir un Handle est via Handle::current avant de générer un autre thread, puis de donner le handle à chaque thread qui pourrait vouloir démarrer une tâche asynchrone:

let handle = HANDLE.lock().unwrap().as_ref().unwrap().clone();

Vous pouvez également créer un singleton global et mutable d'un Mutex<Option<Handle>> , l'initialiser sur None , puis le définir sur Some début de votre fonction tokio::main . Ensuite, vous pouvez récupérer cette variable globale, la dérouler et cloner le Handle lorsque vous en avez besoin:

*HANDLE.lock().unwrap() = Some(Handle::current());
use once_cell::sync::Lazy; // 1.5.2

static HANDLE: Lazy<Mutex<Option<Handle>>> = Lazy::new(Default::default);
use std::thread;
use tokio::runtime::Handle; // 0.3.4

#[tokio::main]
async fn main() {
    let threads: Vec<_> = (0..3)
        .map(|thread_id| {
            let handle = Handle::current();

            thread::spawn(move || {
                eprintln!("Thread {} started", thread_id);

                for task_id in 0..3 {
                    handle.spawn(async move {
                        eprintln!("Thread {} / Task {}", thread_id, task_id);
                    });
                }

                eprintln!("Thread {} finished", thread_id);
            })
        })
        .collect();

    for t in threads {
        t.join().expect("Thread panicked");
    }
}

Voir également:


0 commentaires