2
votes

Comment utiliser un cache asynchrone avec les coroutines Kotlin?

J'ai une application serveur Kotlin JVM utilisant des coroutines et je dois mettre un cache devant un appel réseau non bloquant. Je suppose que je peux utiliser une caféine AsyncLoadingCache pour obtenir le comportement de cache non bloquant dont j'ai besoin. Interface AsyncCacheLoader J'aurais besoin d'implémenter des utilisations CompletableFuture . En attendant, la méthode que je souhaite appeler pour charger les entrées du cache est une fonction suspend .

Je peux combler le vide comme ceci:

abstract class SuspendingCacheLoader<K, V>: AsyncCacheLoader<K, V> {
    abstract suspend fun load(key: K): V

    final override fun asyncLoad(key: K, executor: Executor): CompletableFuture<V> {
        return GlobalScope.async(executor.asCoroutineDispatcher()) {
            load(key)
        }.asCompletableFuture()
    }
}

Cela lancera la fonction load sur le Executor fourni (par défaut, le ForkJoinPool ), qui du point de vue de Caffeine est le comportement correct.

Cependant, je sais que je devrais essayer de évitez d'utiliser GlobalScope pour lancer des coroutines .

J'ai envisagé de faire implémenter mon SuspendingCacheLoader CoroutineScope et gérer son propre contexte de coroutine. Mais CoroutineScope est destiné à être implémenté par des objets avec un cycle de vie géré. Ni le cache ni le AsyncCacheLoader n'ont de hooks de cycle de vie. Le cache possède les instances Executor et CompletableFuture , il contrôle donc déjà le cycle de vie des tâches de chargement de cette façon. Je ne vois pas que le fait que les tâches appartiennent à un contexte coroutine ajouterait quoi que ce soit, et je crains de ne pas pouvoir fermer correctement le contexte coroutine après que le cache a cessé d'être utilisé.

Ecrire mon propre mécanisme de mise en cache asynchrone serait extrêmement difficile, je voudrais donc l'intégrer à l'implémentation de Caffeine si je le peux.

Utilise GlobalScope la bonne approche pour implémenter AsyncCacheLoader , ou y a-t-il une meilleure solution?


0 commentaires

5 Réponses :


3
votes

Le cache possède les instances Executor et CompletableFuture, il contrôle donc déjà le cycle de vie des tâches de chargement de cette façon.

Ce n'est pas vrai, la documentation sur Caffeine spécifie qu'elle utilise un Executor fourni par l'utilisateur ou ForkJoinPool.commonPool () si aucun est fourni. Cela signifie qu'il n'y a pas de cycle de vie par défaut.

Quoi qu'il en soit, appeler directement GlobalScope semble être la mauvaise solution car il n'y a aucune raison de coder en dur un choix. Fournissez simplement un CoroutineScope via le constructeur et utilisez GlobalScope comme argument alors que vous n'avez pas de cycle de vie explicite auquel le cache doit se lier.


2 commentaires

Merci pour la réponse et pour la clarification sur l'exécuteur de la caféine. J'aime l'idée d'accepter un CoroutineScope dans le constructeur. Si je devais finir par passer un CoroutineScope de l'application plus large, quel genre d'avantages cela pourrait-il me procurer? J'imagine que dans AsyncCacheLoader , je voudrais remplacer à la fois le Dispatcher (en utilisant le Executor fourni) et le Job (car il n'aurait pas de sens que les échecs de chargement du cache se propagent en dehors du cache). Une fois que cela est fait, il semble qu'il ne reste plus grand-chose du contexte d'origine.


Je ne vois pas quels avantages vous obtiendriez d'une portée de coroutine passée car vous travaillez clairement en dehors du monde de la coroutine et vos appelants ne l'ont pas sous la main.



1
votes

Après réflexion, j'ai trouvé une solution beaucoup plus simple qui, à mon avis, utilise les coroutines de manière plus idiomatique.

L'approche fonctionne en utilisant AsyncCache.get (key, mappingFunction) , au lieu d'implémenter un AsyncCacheLoader . Cependant, il ignore le Executor que le cache est configuré pour utiliser, en suivant les conseils de certaines des autres réponses ici.

private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, executor ->
    future(executor.asCoroutineDispatcher()) { 
        loadValue(k) 
    }
}

Notez que ceci dépend de kotlinx-coroutines-jdk8 pour le futur coroutine builder et de la fonction await () .

Je pense ignorer l ' Executor est probablement le bon choix. Comme le souligne @Kiskae, le cache utilisera le ForkJoinPool par défaut. Choisir d'utiliser cela plutôt que le répartiteur de coroutine par défaut n'est probablement pas utile. Cependant, il serait facile de l'utiliser si nous le voulions, en changeant la fonction getAsync :

class SuspendingCache<K, V>(private val asyncCache: AsyncCache<K, V>) {
    suspend fun get(key: K): V = supervisorScope {
        getAsync(key).await()
    }

    private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, _ ->
        future { 
            loadValue(k) 
        }
    }

    private suspend fun loadValue(key: K): V = TODO("Load the value")
}


0 commentaires

0
votes

Voici ma solution:

Définissez une fonction d'extension de CoroutineVerticle

suspend fun doSomething() {
    val data = cache.get('key').await()

    val future = cache.get('key2')
    val data2 = future.await()
}

Créez notre cache dans CoroutineVerticle code >

val cache : AsyncLoadingCache<String, String> = buildCache({
  maximumSize(10_000)
  expireAfterWrite(10, TimeUnit.MINUTES)
}) { key ->
    // load data and return it
    delay(1000)
    "data for key: $key"
}

Utiliser le cache

fun <K, V> CoroutineVerticle.buildCache(configurator: Caffeine<Any, Any>.() -> Unit = {}, loader: suspend CoroutineScope.(K) -> V) = Caffeine.newBuilder().apply(configurator).buildAsync { key: K, _ ->
    // do not use cache's executor
    future {
        loader(key)
    }
}


0 commentaires

0
votes

Voici une solution simple. Remplacez la notation K, V par votre type.

    val cache = Caffeine.newBuilder().buildAsync<K, V> { key: K, _ ->
      val future = CompletableFuture<V>()

      launch {
        val result = someAwaitOperation(key)
        future.complete(result)
      }

      future
    }


0 commentaires

0
votes

Suggérer une méthode d'extension comme celle-ci

suspend inline fun <K: Any, V: Any> Caffeine<Any, Any>.suspendingLoadingCache(
    crossinline suspendedLoader: suspend (key: K) -> V
): AsyncLoadingCache<K, V> =
    buildAsync { key, executor: Executor ->
        CoroutineScope(executor.asCoroutineDispatcher()).future {
            suspendedLoader(key)
        }
    }

Non recommandé GlobalScope , utilisez CoroutineScope(executor.asCoroutineDispatcher())

La méthode future est définie dans le module kotlinx-coroutines-jdk8


0 commentaires