2
votes

Mémoire du pilote Apache Spark

J'ai essayé d'installer et d'exécuter un simple Java Apache Spark dans intellij sous Windows, mais j'ai une erreur que je ne peux pas résoudre. J'ai installé spark via maven. J'obtiens cette erreur:

package Spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

public class App 
{
    public static void main( String[] args )
    {
        SparkConf conf = new SparkConf().setAppName("Spark").setMaster("local");
//        conf.set("spark.driver.memory","471859200");
        JavaSparkContext sc = new JavaSparkContext(conf);


        List<Integer> data= Arrays.asList(1,2,3,4,5,6,7,8,9,1,2,3,4,5,6,7,8,9);
        JavaRDD<Integer> rdd=sc.parallelize(data);
        JavaRDD<Integer> list=rdd.map(s->s);
        int totalLines=list.reduce((a,b)->a+b);
        System.out.println(totalLines);
    }
}

J'ai essayé de configurer la mémoire du pilote manuellement mais cela n'a pas fonctionné. J'ai également essayé d'installer spark localement, mais changer la mémoire du pilote à partir de l'invite de commande n'a pas aidé.

Voici le code:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/03/20 23:53:23 INFO SparkContext: Running Spark version 2.0.0-cloudera1-SNAPSHOT
19/03/20 23:53:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/03/20 23:53:24 INFO SecurityManager: Changing view acls to: Drakker
19/03/20 23:53:24 INFO SecurityManager: Changing modify acls to: Drakker
19/03/20 23:53:24 INFO SecurityManager: Changing view acls groups to: 
19/03/20 23:53:24 INFO SecurityManager: Changing modify acls groups to: 
19/03/20 23:53:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(Drakker); groups with view permissions: Set(); users  with modify permissions: Set(Drakker); groups with modify permissions: Set()
19/03/20 23:53:25 INFO Utils: Successfully started service 'sparkDriver' on port 50007.
19/03/20 23:53:25 INFO SparkEnv: Registering MapOutputTracker
19/03/20 23:53:25 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
    at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:212)
    at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:194)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:165)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:260)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:429)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at Spark.App.main(App.java:16)
19/03/20 23:53:25 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
    at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:212)
    at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:194)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:165)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:260)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:429)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at Spark.App.main(App.java:16)

J'obtiens l'erreur quand instancier JavaSparkContext. Quelqu'un a-t-il une idée de la façon de résoudre ce problème?

Merci!


1 commentaires

Bienvenue à SO. Il semble que vous essayez de démarrer avec Spark et ... il semble que vous n'allez pas vraiment dans le bon sens: (car il semble que vous mélangez un ancien code Spark avec le nouveau code Spark et Spark version 2.0. 0-cloudera1-SNAPSHOT n'a pas l'air très bien. Puis-je conseiller de regarder des exemples Java? Très égoïstement, je recommanderais github.com/jgperrin/net.jgp.labs.spark ou github.com/jgperrin/net.jgp.books.spark.ch01 ... Heureux de vous aider


4 Réponses :


1
votes

Je suis un peu confus par votre code, car il mélange des constructions pré-Spark 2.x comme SparkConf et beaucoup de RDD. Ce n'est pas faux de les utiliser, mais depuis Spark 2.x, les choses sont un peu différentes.

Voici un exemple utilisant SparkSession et dataframes, qui est une version sur-ensemble, plus puissante, du RDD (pour faire les choses courtes).

Dans l'exemple, vous verrez plusieurs façons de faire vos opérations map / reduction, deux avec map / reduction et une avec une syntaxe simple de type SQL.

mapper et réduire avec getAs ()

package net.jgp.books.spark.ch07.lab990_others;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Simple ingestion followed by map and reduce operations.
 * 
 * @author jgp
 */
public class SelfIngestionApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    SelfIngestionApp app = new SelfIngestionApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Self ingestion")
        .master("local[*]")
        .getOrCreate();

    Dataset<Row> df = createDataframe(spark);
    df.show(false);

    // map and reduce with getAs()
    int totalLines = df
        .map(
            (MapFunction<Row, Integer>) row -> row.<Integer>getAs("i"),
            Encoders.INT())
        .reduce((a, b) -> a + b);
    System.out.println(totalLines);

    // map and reduce with getInt()
    totalLines = df
        .map(
            (MapFunction<Row, Integer>) row -> row.getInt(0),
            Encoders.INT())
        .reduce((a, b) -> a + b);
    System.out.println(totalLines);

    // SQL-like
    long totalLinesL = df.selectExpr("sum(*)").first().getLong(0);
    System.out.println(totalLinesL);
  }

  private static Dataset<Row> createDataframe(SparkSession spark) {
    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "i",
            DataTypes.IntegerType,
            false) });

    List<Integer> data =
        Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9);
    List<Row> rows = new ArrayList<>();
    for (int i : data) {
      rows.add(RowFactory.create(i));
    }

    return spark.createDataFrame(rows, schema);
  }
}

mapper et réduire avec getInt ()

long totalLinesL = df.selectExpr("sum(*)").first().getLong(0);
System.out.println(totalLinesL);

Similaire à SQL

C'est probablement le plus populaire.

totalLines = df
    .map(
        (MapFunction<Row, Integer>) row -> row.getInt(0),
        Encoders.INT())
    .reduce((a, b) -> a + b);
System.out.println(totalLines);

Exemple complet

int totalLines = df
    .map(
        (MapFunction<Row, Integer>) row -> row.<Integer>getAs("i"),
        Encoders.INT())
    .reduce((a, b) -> a + b);
System.out.println(totalLines);

p >


0 commentaires

0
votes

Vous pouvez essayer d'utiliser le générateur de session Spark, et vous pouvez obtenir le contexte Spark par spark.sparkContext ()

public static SparkSession sparkSession(String master,
                                        String appName) {
return    SparkSession.builder().appName(appName)
                       .master(master)
                       .config("spark.dynamicAllocation.enabled", true)
                       .config("spark.shuffle.service.enabled", true)
                       .config("spark.driver.maxResultSize", "8g")
                       .config("spark.executor.memory", "8g")
                       .config("spark.executor.cores", "4")
                       .config("spark.cores.max", "6")
                       .config("spark.submit.deployMode", "client")
                       .config("spark.network.timeout", "3600s")
                       .config("spark.eventLog.enabled", true)
                       .getOrCreate();
}


0 commentaires

2
votes

Si vous utilisez eclipse, vous pouvez définir Exécuter > Configurations d'exécution ... > Arguments > arguments VM et définir max taille de tas comme -Xmx512m.

Dans l’idée, vous pouvez définir Configurations Exécuter \ Déboguer > Options VM: -Xmx512m

Dans votre code, vous pouvez essayer ce conf.set ("spark.testing.memory", "2147480000")


0 commentaires

1
votes

Exception de mémoire du pilote

Cela se produit lorsque le pilote Spark manque de mémoire. C'est à ce moment que le maître d'application qui lance le pilote dépasse la limite et met fin au processus de fil.

Message d'erreur: Java.lang.OutOfMemoryError

Solution: augmentez la mémoire de vos pilotes en configurant ce qui suit:

    conf spark.driver.memory = <XY>g


0 commentaires