11
votes

Parallèle / éviter la boucle de la boucle

J'ai écrit une classe qui obtient un Dataframe, certains calculs sur elle et peuvent exporter les résultats. Les dataframes sont générés par une liste de clés. Je sais que je fais cela de manière très inconduite en ce moment:

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//...
var l = List(34, 32, 132, 352).par
// same code as above, just parallelized before calling foreach
// i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly

scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
    org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
.
.
.

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext                  org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:104)
 org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
   org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104)


1 commentaires

Veuillez fournir l'erreur complète StackTrace. Aussi la ligne datacontainer.getdataframe (i) :: l n'a pas l'air juste.


3 Réponses :


14
votes

5 commentaires

Merci! Le cluster que j'utilise a plusieurs exécuteurs. Est-ce déjà le moyen le plus efficace? Que fait ma solution (voir édition)


S'il vous plaît jeter un coup d'œil à cela depuis Spark Documentation - spark.apache.org/docs/latest/... suivant est la citation pertinente: "Par défaut, Spark's Scheduler exécute des travaux de FIFO Mode. [...] Démarrage de Spark 0.8, il est également possible de configurer Le partage équitable entre les emplois. Sous le partage équitable, Spark assigne des tâches entre des emplois dans une mode "Round Robin", de sorte que tous les emplois obtiennent une part approximativement égale des ressources de cluster. Pour activer le planificateur équitable, définissez simplement la propriété SPARK.SCHEDULER.MODE à la foire lors de la configuration d'un SparkContext. "


Utilisez-vous un cluster autonome d'étincelles ou avec du fil?


Je ne suis pas en mode local, j'utilise le client de fil et j'ai 4 nœuds exécutants


Vous pouvez configurer le fil à utiliser Planification équitable aussi. Je pense que cela utilise également la FIFO par défaut.



2
votes

Vous pouvez utiliser l'avenir de Scala et Spark Fair planifie, par exemple

import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

object YourApp extends App { 
  val sc = ... // SparkContext, be sure to set spark.scheduler.mode=FAIR
  var pool = 0
  // this is to have different pools per job, you can wrap it to limit the no. of pools
  def poolId = {
    pool = pool + 1
    pool
  }
  def runner(i: Int) = Future {
    sc.setLocalProperty("spark.scheduler.pool", poolId)
    val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass with new Object
    x.setSettings(...)
    x.calcSomething()
    x.saveResults()
  }

  val l = List(34, 32, 132, 352)      // Scala List
  val futures = l map(i => runner(i))

  // now you need to wait all your futures to be completed
  futures foreach(f => Await.ready(f, Duration.Inf))

}


0 commentaires

1
votes

J'ai fait cela en utilisant quelque chose comme à l'aide de list.par.foreach {objet => impression (objet)} code>. J'utilise Zeppelin sur Spark 2.3. J'ai un cas d'usage similaire où j'ai besoin d'obtenir la journée de données après jour et de le traiter séparément. Cela ne peut pas être fait en utilisant des données du mois entier en raison de certaines conditions de jointure sur les tables que j'utilise. Voici un échantillon de mon code:

import java.time.LocalDate
import java.sql.Date

var start =  LocalDate.of(2019, 1, 1)
val end   =  LocalDate.of(2019, 2, 1)
var list : List[LocalDate] = List()

var usersDf = spark.read.load("s3://production/users/")
usersDf.createOrReplaceTempView("usersDf")

while (start.isBefore(end)){
    list = start :: list
    start = start.plusDays(1)
}

list.par.foreach{ loopDate =>
    //println(start)
    var yesterday = loopDate.plusDays(-1)
    var tomorrow = loopDate.plusDays(1)
    var lastDay = yesterday.getDayOfMonth()
    var lastMonth = yesterday.getMonthValue()
    var lastYear = yesterday.getYear()

    var day = loopDate.getDayOfMonth()
    var month = loopDate.getMonthValue()
    var year = loopDate.getYear()
    var dateDay = loopDate

    var condition: String = ""
    if (month == lastMonth) {
        condition = s"where year = $year and month = $month and day in ($day, $lastDay)"
    } else {
        condition = s"""where ((year = $year and month = $month and day = $day) or
        (year = $lastYear and month = $lastMonth and day = $lastDay)) 
        """
    }

    //Get events in local timezone
    var aggPbDf = spark.sql(s"""
            with users as (
            select * from users
            where account_creation_date < '$tomorrow'
        )
        , cte as (
            select e.* date(from_utc_timestamp(to_timestamp(concat(e.year,'-', e.month, '-', e.day, ' ', e.hour), 'yyyy-MM-dd HH'), coalesce(u.timezone_name, 'UTC'))) as local_date
            from events.user_events e
            left join users u
            on u.account_id = e.account_id
            $condition)
        select * from cte
        where local_date = '$dateDay'
    """
    )
    aggPbDf.write.mode("overwrite")
        .format("parquet")
        .save(s"s3://prod-bucket/events/local-timezone/date_day=$dateDay")
}


0 commentaires