3
votes

scala futures - comment obtenir le résultat ou l'échec des deux futurs

J'utilise for pour exécuter 2 futures en parallèle. Je veux savoir lequel a réussi et lequel a échoué dans tous les cas (tous devraient fonctionner jusqu'à la fin avec un résultat ou un état d'échec). Actuellement, je ne peux récupérer qu'un résultat de succès combiné

J'ai pris l'inspection d'ici mais cela ne suffit pas car je n'obtiens pas les statuts de réussite lorsqu'un seul échoue ni les échecs des deux en cas d'échec des deux échec dans le futur de Scala pour la compréhension

def toFutureTry[A](future: Future[A]):Future[Try[A]] = future.map(Success(_)).recover {case t: Throwable => Failure(t)}

    val fa: Future[Try[Blah]] = toFutureTry(f1)
    val fb: Future[Try[Foo]] = toFutureTry(f2)

    val combinedRes = for {
      ra <- fa
      rb <- fb
    } yield (ra,rb)

    combinedRes.onComplete {
      case Success(successRes: (Try[Blah], Try[Foo])) => // all of these cases are success or fails
      case Failure(f: Throwable) => // i think it is unused right?
    }

J'essaye d'éviter ce désordre:

var countCompleted = 0 ... or some other atomic way to count 
f1 onComplete {
  case Success(value) => {
    ... countCompleted increment ... 
    // handle success
    if both completed {
       // handle returning a status
    }
  }
  case Failure(error) => {
    ... countCompleted increment ... 
    // handle failure
    if both completed {
       // handle returning a status
    }
  }
}

f2 onComplete {
  case Success(value) => {
    ... countCompleted increment ... 
    // handle success
    if both completed {
       // handle returning a status
    }
  }
  case Failure(error) => {
    ... countCompleted increment ... 
    // handle failure
    if both completed {
       // handle returning a status
    }
  }
}

Edit: Une autre version - Est-ce une approche valide?

case class TaggedException(context:String, val throwable: Throwable) extends Exception(throwable.getMessage)

val f1 = Future {...}.recoverWith {case e:Throwable => Future.Failed(new TaggedException("first one failed", e))}
val f2 = Future {...}.recoverWith {case e: Throwable => Future.Failed(new TaggedException("second one failed", e))}

val combinedResult = for {
  r1 <- f1
  r2 <- f2
} yield (r1,r2)

combinedResult.onFailure {
case e : TaggedException => ... // if both fail I only get the first line in the for
// in case where single fails I only know fail status without the success of the second
}


0 commentaires

4 Réponses :


0
votes

Avec for-comprehensions, dès qu'une ligne échoue, le code s'arrête juste là et toute exception est levée. Si r1 lance un Throwable, r2 ne sera jamais touché en premier lieu.

Personnellement, je les jetterais tous les deux dans un Soit [Throwable, peu importe] , plutôt que de mettre chaque .recoverWith dans un Future.Failed (...) . De cette façon, plutôt que de simplement faire un onFailure , vous pouvez faire quelque chose avec toutes les valeurs Left que vous obtenez et faire autre chose avec toutes les valeurs Right que vous obtenez . Ou vous pouvez utiliser Try / Success / Failure ... Cela dépend de ce que vous voulez faire avec les erreurs.

Je ne connais pas votre cas d'utilisation spécifique, mais si vous voulez gérer chaque cas de réussite ou d'échec individuellement, vous pouvez faire quelque chose comme ceci:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

val f1 = Future(Try(1 / 1))
val f2 = Future(Try(1 / 0))

// for-comprehensions won't fall over when reading a Failure
// as failures don't extend Throwable
val combinedResult = for {
  r1 <- f1 // Success(1)
  r2 <- f2 // Failure(java.lang.ArithmeticException: / by zero)
} yield List(r1,r2)

combinedResult.map { // get inside Future
  f =>
    f.map { // get inside List
      case Success(a) => // do something with success
      case Failure(e: IndexOutOfBoundsException) => // do something with failure
      case Failure(e: ArithmeticException) => // do something with failure
      case Failure(e) => // do something with failure
    }
}

Personnellement, je n'aime pas utiliser onComplete; Je préfère conserver les données dans un futur aussi longtemps que possible en mappant à l'intérieur. C'est juste une préférence personnelle.


5 commentaires

pouvez-vous élaborer avec un exemple?


J'ai mis à jour la réponse avec un exemple générique d'une façon dont j'aime traiter les exceptions dans Futures ... les envelopper dans un Try si vous n'avez pas une sorte de classe de type ou de trait ou quelque chose peut être vraiment utile si vous ne veux traiter les exceptions que plus tard.


Non, cela ne fonctionne pas. Le résultat de la pour-compréhension est un avenir raté, vous n'avez donc rien à tracer.


@Dima non ce n'est pas le cas, le résultat de cette for-compréhension est un Future [List [Try [Int]]] .


Oh, j'ai manqué le Essayez ... Désolé



1
votes

L'utilisation de flatMap sur Future [A] n'aidera pas, car elle sera toujours court-circuitée lors du premier échec produit par l'un d'entre eux, là où vous voulez vraiment accumuler les erreurs.

Une solution utilisant Future.traverse qui fonctionnera sur de nombreuses instances Future [A] arbitraires:

Oh no, err: TaggedException$3: 42
Received num: 42

Nous pouvons également utiliser un peu d'aide des chats avec son type Validé :

import cats.data.Validated.{Invalid, Valid}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import cats.implicits._
import scala.util.{Failure, Success}

def main(args: Array[String]): Unit = {
  case class TaggedException(context: String, throwable: Throwable)
    extends Exception(throwable.getMessage)

  val f1 = Future.failed[Int](new Exception("42")).recoverWith {
    case e: Throwable => Future.failed(TaggedException("first one failed", e))
  }

  val f2 = Future(42).recoverWith {
    case e: Throwable => Future.failed(TaggedException("second one failed", e))
  }

  val res: Future[List[Validated[Throwable, Int]]] = 
    List(f1, f2)
     .traverse(eventualInt => eventualInt
                       .map(i => Valid(i))
                       .recover { case err => Invalid(err) })

  res.onComplete {
    case Failure(exception) =>
      println(exception)
    case Success(value) =>
      value.foreach {
        case Valid(int) => println(s"Received num: $int")
        case Invalid(err) => println(s"Oh no, err: $err")
      }
  }

  Await.result(res, Duration.Inf)
}

Donnera:

val f1 = Future.failed[Int](new Exception("42")).recoverWith {
  case e: Throwable => Future.failed(TaggedException("first one failed", e))
}

val f2 = Future(42).recoverWith {
  case e: Throwable =>
    Future.failed(TaggedException("second one failed", e))
}

val res: Future[List[Either[Throwable, Int]]] = 
  Future
   .traverse(List(f1, f2)) {
      eventualInt => eventualInt
       .map(i => Right(i))
       .recover { case err => Left(err) }
   }

res.onComplete {
  case Failure(exception) =>
    println(exception)
  case Success(value) =>
    value.foreach {
      case Right(int) => println(s"Received num: $int")
      case Left(err) => println(s"Oh no, err: $err")
    }
}

Await.result(res, Duration.Inf)


7 commentaires

et sans catz:) _


L'approche @AvnerBarr James fonctionnera, car il encapsule le thunk interne avec un Try [A] , puis flatMap dessus qui ne court-circuitera pas.


mais j'obtiens un avenir à la suite d'un appel de fonction - je dois "insérer" l'essai dedans


Je pourrais fournir une implémentation spécifique de List [Future [A]] à Future [List [A]] si vous vouliez l'utiliser ... mais ce sera très spécifique.


Est-ce une approche valable? : `` `def returnsAFuture () = Future {1/0} val f3 = returnsAFuture () def toFutureTry [T] (future: Future [T]) = future.map (Success ()). recover {case t: Throwable => Failure (t)} val sdf: Future [Try [Int] with Product with Serializable] = toFutureTry (f3) val successFail = f3.map {Success ()} .recover {cas t: Throwable => Échec (t)} `` `


@AvnerBarr J'ai mis à jour ma réponse pour utiliser les types et fonctions Scala standard. Voyez si cela fonctionne pour vous.


pouvez-vous valider ma réponse modifiée mise à jour - j'ai testé localement, semblait donner le résultat souhaité



0
votes

~~ Tout d'abord, vous ne dirigez pas vos futurs en parallèle (pour-compréhension les exécutera séquentiellement).~~

Mettre à jour ce qui précède n'est pas vrai, comme mentionné dans les commentaires. J'ai raté le fait que les futurs sont créés en dehors de la pour-compréhension.

Et deuxièmement, comme vous l'avez remarqué, si l'un des futurs échoue, l'autre est perdu.

Pour contourner le fait que si l'un des contrats à terme échoue, le résultat de l'autre est perdu, vous pouvez "élever" vos futures valeurs à Essayer :

 (f1.liftToTry zip f2.liftToTry).map { 
    case (Success(foo), Success(bar)) => ... 
    ...
 }

Maintenant, vous pouvez faire quelque chose comme:

 object FutureSyntax { 
   implicit class FutureOps[A](val fu: Future[A]) extends AnyVal {
      def liftToTry: Future[Try[A]] = fu.transform(Success(_))
   }
 }

Si vous vous retrouvez à faire cela souvent, vous pourriez trouver cela utile, "souteneur "vos futurs avec l'opération lift :

 (lifted1 zip lifted2).map { 
   case (Success(foo), Success(bar)) => // both succeeded!
   case (Success(foo), Failure(t)) => // foo succeeded, bar failed with t
   case (Failure(t), Success(bar)) => // guess what!
   case (Failure(t1), Failure(t2)) => // oops
 }

Donc, maintenant, si vous importez FutureSyntax._ , ce qui précède peut être écrit comme

val lifted1: Future[Try[Foo]] = f1.transform(Success(_))
val lifted2: Future[Try[Bar]] = f1.transform(Success(_))

Vous pourriez aussi écrire la même chose avec un pour-compréhension, ce serait juste plus verbeux. La pour-compréhension ne serait pas mon choix dans ce cas: ils sont bons lors du traitement d'une séquence de futurs, où ceux plus tard dans la séquence dépendent des résultats des précédents. Pour traiter des contrats à terme indépendants, zip ou sequence est généralement un meilleur choix.


7 commentaires

vous ne dirigez pas vos futurs en parallèle (la for-compréhension les exécutera séquentiellement). Il crée les futurs avec impatience avant de les utiliser dans la for-compréhension. Il n'y a pas de séquentialité ici.


oui, ils doivent fonctionner en parallèle depuis leur création en dehors de la boucle


Ah oui. J'ai manqué ça. Oops.


vous pouvez faire: fu.transform (Success (_)) au lieu de fu.map (Success.apply) .recover {case NonFatal (t) => Failure (t)


@ViktorKlang Je pense que vous avez besoin du deuxième argument, mais fu.transform (Success (_), identity) fonctionne aussi, oui.


@Dima Je fais référence à la méthode qui a un seul argument.


Ah, ouais ... je suis toujours sur 2.11 ici: /



1
votes

Vous pouvez combiner transform et zip comme ceci:

combinedResult map {
  case (Success(v1), Success(v2)) =>
  case (Success(v1), Failure(f2)) =>
  case (Failure(f1), Success(v2)) =>
  case (Failure(f1), Failure(f2)) =>
}

Ensuite, vous pouvez faire:

val combinedResult: Future[(Try[T], Try[T])] =
  f1.transform(Success(_)).zip(f2.transform(Success(_)))


0 commentaires