6
votes

Récupérer un acteur Akka ou le créer s'il n'existe pas

Je développe une application qui crée des acteurs Akka pour gérer et traiter les messages provenant d'un sujet Kafka. Les messages avec la même clé sont traités par le même acteur. J'utilise également la clé de message pour nommer l'acteur correspondant.

Lorsqu'un nouveau message est lu à partir du sujet, je ne sais pas si l'acteur avec l'id égal à la clé de message a déjà été créé par l'acteur système ou pas. Par conséquent, j'essaye de résoudre l'acteur en utilisant son nom, et s'il n'existe pas encore, je le crée. J'ai besoin de gérer la concurrence en ce qui concerne la résolution des acteurs. Il est donc possible que plusieurs clients demandent au système d'acteurs si un acteur existe.

Le code que j'utilise actuellement est le suivant:

private CompletableFuture<ActorRef> getActor(String uuid) {
    return system.actorSelection(String.format("/user/%s", uuid))
                 .resolveOne(Duration.ofMillis(1000))
                 .toCompletableFuture()
                 .exceptionally(ex -> 
                     system.actorOf(Props.create(MyActor.class, uuid), uuid))
                 .exceptionally(ex -> {
                     try {
                         return system.actorSelection(String.format("/user/%s",uuid)).resolveOne(Duration.ofMillis(1000)).toCompletableFuture().get();
                     } catch (InterruptedException | ExecutionException e) {
                         throw new RuntimeException(e);
                     }
                 });
}

Le code ci-dessus n'est pas optimisé et la gestion des exceptions peut être améliorée.

Cependant, existe-t-il dans Akka un moyen plus idiomatique de résoudre un acteur, ou de le créer s'il n'existe pas? Est-ce que je manque quelque chose?


0 commentaires

3 Réponses :


4
votes

Pensez à créer un acteur qui conserve comme état une carte des identifiants de message vers les ActorRef . Cet acteur "réceptionniste" traiterait toutes les demandes pour obtenir un acteur de traitement de message. Lorsque le réceptionniste reçoit une demande d'acteur (la demande inclurait l'ID du message), il essaie de rechercher un acteur associé dans sa carte: si un tel acteur est trouvé, il renvoie le ActorRef à l'expéditeur; sinon, il crée un nouvel acteur de traitement, ajoute cet acteur à sa carte et renvoie cette référence d'acteur à l'expéditeur.


2 commentaires

Merci pour la réponse. Cela pourrait être une bonne idée. Cependant, je parle de millions d'acteurs: (je sais, il faut l'insérer dans la question ...


Dans ce cas, regardez Akka Clustering. Il fait effectivement la même chose, mais les acteurs sont répartis sur plusieurs partitions par une stratégie que vous définissez. Les messages utilisent la stratégie de partition pour acheminer vers une partition particulière, puis la partition les dirige vers l'acteur approprié. Si l'acteur n'est pas présent, il est créé. Cela vous permettra également de passer à l'échelle de plusieurs nœuds si vos millions d'acteurs en ont besoin.



1
votes

La réponse de Jeffrey Chung est en effet de manière Akka. L'inconvénient d'une telle approche est sa faible performance. La solution la plus performante consiste à utiliser Méthode ConcurrentHashMap.computeIfAbsent () .


0 commentaires

1
votes

J'envisagerais d'utiliser akka-cluster et akka-cluster-sharding . Premièrement, cela vous donne un débit , ainsi que de la fiabilité. Cependant, cela permettra également au système de gérer la création des acteurs «entité».

Mais vous devez changer la façon dont vous parlez à ces acteurs. Vous créez un acteur ShardRegion qui gère tous les messages:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;


public class MyEventReceiver extends AbstractActor {

    private final ActorRef shardRegion;

    public static Props props() {
        return Props.create(MyEventReceiver.class, MyEventReceiver::new);
    }

    static ShardRegion.MessageExtractor messageExtractor 
      = new ShardRegion.HashCodeMessageExtractor(100) {
            // using the supplied hash code extractor to shard
            // the actors based on the hashcode of the entityid

        @Override
        public String entityId(Object message) {
            if (message instanceof EventInput) {
                return ((EventInput) message).uuid().toString();
            }
            return null;
        }

        @Override
        public Object entityMessage(Object message) {
            if (message instanceof EventInput) {
                return message;
            }
            return message; // I don't know why they do this it's in the sample
        }
    };


    public MyEventReceiver() {
        ActorSystem system = getContext().getSystem();
        ClusterShardingSettings settings =
           ClusterShardingSettings.create(system);
        // this is setup for the money shot
        shardRegion = ClusterSharding.get(system)
                .start("EventShardingSytem",
                        Props.create(EventActor.class),
                        settings,
                        messageExtractor);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(
                EventInput.class,
                e -> {
                    log.info("Got an event with UUID {} forwarding ... ",
                            e.uuid());
                    // the money shot
                    deviceRegion.tell(e, getSender());
                }
        ).build();
    }
}

Donc, cet acteur MyEventReceiver fonctionne sur tous les nœuds de votre cluster, et encapsule l'acteur shardRegion . Vous ne envoyez plus de message directement à vos EventActor , mais, en utilisant les acteurs MyEventReceiver et deviceRegion , vous utilisez le système de partage pour savoir quel nœud dans le cluster sur lequel vit particulier EventActor . Il en créera un si aucun n'a été créé auparavant, ou acheminera les messages s'il en a. Chaque EventActor doit avoir un identifiant unique: qui est extrait du message (donc un UUID est assez bon pour cela, mais il pourrait en être un autre identifiant, comme un customerID, ou un orderID, ou autre, à condition qu'il soit unique pour l'instance Actor avec laquelle vous voulez le traiter).

(j'omets le EventActor code> code, c'est par ailleurs un acteur assez normal, selon ce que vous en faites, la «magie» est dans le code ci-dessus).

Le système de partitionnement sait automatiquement créer le EventActor et allouez-le à un fragment, en fonction de l'algorithme que vous avez choisi (dans ce cas particulier, il est basé sur le hashCode de l'ID unique, qui est tout ce que j'ai jamais utilisé) . De plus, vous n'êtes garanti que un acteur pour un identifiant unique donné. Le message est acheminé de manière transparente vers le nœud et le fragment correct où qu'il se trouve; quel que soit le nœud et le fragment envoyé.

Il y a plus d'informations et un exemple de code dans le site et la documentation d'Akka.

C'est une manière assez radieuse de s'assurer que la même entité / Actor traite toujours les messages qui lui sont destinés. Le cluster et le sharding s'occupent automatiquement de distribuer correctement les acteurs, ainsi que le basculement et autres (vous devrez ajouter akka-persistence pour obtenir la passivation, la réhydratation et le basculement si l'acteur a un tas de état qui lui est associé (qui doit être restauré)).


0 commentaires