J'ai essayé d'installer et d'exécuter un simple Java Apache Spark dans intellij sous Windows, mais j'ai une erreur que je ne peux pas résoudre. J'ai installé spark via maven. J'obtiens cette erreur:
package Spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; public class App { public static void main( String[] args ) { SparkConf conf = new SparkConf().setAppName("Spark").setMaster("local"); // conf.set("spark.driver.memory","471859200"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> data= Arrays.asList(1,2,3,4,5,6,7,8,9,1,2,3,4,5,6,7,8,9); JavaRDD<Integer> rdd=sc.parallelize(data); JavaRDD<Integer> list=rdd.map(s->s); int totalLines=list.reduce((a,b)->a+b); System.out.println(totalLines); } }
J'ai essayé de configurer la mémoire du pilote manuellement mais cela n'a pas fonctionné. J'ai également essayé d'installer spark localement, mais changer la mémoire du pilote à partir de l'invite de commande n'a pas aidé.
Voici le code:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 19/03/20 23:53:23 INFO SparkContext: Running Spark version 2.0.0-cloudera1-SNAPSHOT 19/03/20 23:53:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19/03/20 23:53:24 INFO SecurityManager: Changing view acls to: Drakker 19/03/20 23:53:24 INFO SecurityManager: Changing modify acls to: Drakker 19/03/20 23:53:24 INFO SecurityManager: Changing view acls groups to: 19/03/20 23:53:24 INFO SecurityManager: Changing modify acls groups to: 19/03/20 23:53:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Drakker); groups with view permissions: Set(); users with modify permissions: Set(Drakker); groups with modify permissions: Set() 19/03/20 23:53:25 INFO Utils: Successfully started service 'sparkDriver' on port 50007. 19/03/20 23:53:25 INFO SparkEnv: Registering MapOutputTracker 19/03/20 23:53:25 ERROR SparkContext: Error initializing SparkContext. java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration. at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:212) at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:194) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:165) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:260) at org.apache.spark.SparkContext.<init>(SparkContext.scala:429) at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58) at Spark.App.main(App.java:16) 19/03/20 23:53:25 INFO SparkContext: Successfully stopped SparkContext Exception in thread "main" java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration. at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:212) at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:194) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:165) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:260) at org.apache.spark.SparkContext.<init>(SparkContext.scala:429) at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58) at Spark.App.main(App.java:16)
J'obtiens l'erreur quand instancier JavaSparkContext. Quelqu'un a-t-il une idée de la façon de résoudre ce problème?
Merci!
4 Réponses :
Je suis un peu confus par votre code, car il mélange des constructions pré-Spark 2.x comme SparkConf
et beaucoup de RDD. Ce n'est pas faux de les utiliser, mais depuis Spark 2.x, les choses sont un peu différentes.
Voici un exemple utilisant SparkSession
et dataframes, qui est une version sur-ensemble, plus puissante, du RDD (pour faire les choses courtes).
Dans l'exemple, vous verrez plusieurs façons de faire vos opérations map / reduction, deux avec map / reduction et une avec une syntaxe simple de type SQL.
package net.jgp.books.spark.ch07.lab990_others; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** * Simple ingestion followed by map and reduce operations. * * @author jgp */ public class SelfIngestionApp { /** * main() is your entry point to the application. * * @param args */ public static void main(String[] args) { SelfIngestionApp app = new SelfIngestionApp(); app.start(); } /** * The processing code. */ private void start() { // Creates a session on a local master SparkSession spark = SparkSession.builder() .appName("Self ingestion") .master("local[*]") .getOrCreate(); Dataset<Row> df = createDataframe(spark); df.show(false); // map and reduce with getAs() int totalLines = df .map( (MapFunction<Row, Integer>) row -> row.<Integer>getAs("i"), Encoders.INT()) .reduce((a, b) -> a + b); System.out.println(totalLines); // map and reduce with getInt() totalLines = df .map( (MapFunction<Row, Integer>) row -> row.getInt(0), Encoders.INT()) .reduce((a, b) -> a + b); System.out.println(totalLines); // SQL-like long totalLinesL = df.selectExpr("sum(*)").first().getLong(0); System.out.println(totalLinesL); } private static Dataset<Row> createDataframe(SparkSession spark) { StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField( "i", DataTypes.IntegerType, false) }); List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9); List<Row> rows = new ArrayList<>(); for (int i : data) { rows.add(RowFactory.create(i)); } return spark.createDataFrame(rows, schema); } }
long totalLinesL = df.selectExpr("sum(*)").first().getLong(0); System.out.println(totalLinesL);
C'est probablement le plus populaire.
totalLines = df .map( (MapFunction<Row, Integer>) row -> row.getInt(0), Encoders.INT()) .reduce((a, b) -> a + b); System.out.println(totalLines);
int totalLines = df .map( (MapFunction<Row, Integer>) row -> row.<Integer>getAs("i"), Encoders.INT()) .reduce((a, b) -> a + b); System.out.println(totalLines);
p >
Vous pouvez essayer d'utiliser le générateur de session Spark, et vous pouvez obtenir le contexte Spark par spark.sparkContext ()
public static SparkSession sparkSession(String master, String appName) { return SparkSession.builder().appName(appName) .master(master) .config("spark.dynamicAllocation.enabled", true) .config("spark.shuffle.service.enabled", true) .config("spark.driver.maxResultSize", "8g") .config("spark.executor.memory", "8g") .config("spark.executor.cores", "4") .config("spark.cores.max", "6") .config("spark.submit.deployMode", "client") .config("spark.network.timeout", "3600s") .config("spark.eventLog.enabled", true) .getOrCreate(); }
Si vous utilisez eclipse, vous pouvez définir Exécuter
> Configurations d'exécution ...
> Arguments
> arguments VM et définir max taille de tas comme -Xmx512m.
Dans l’idée, vous pouvez définir Configurations Exécuter \ Déboguer
> Options VM: -Xmx512m
Dans votre code, vous pouvez essayer ce conf.set ("spark.testing.memory", "2147480000")
Exception de mémoire du pilote
Cela se produit lorsque le pilote Spark manque de mémoire. C'est à ce moment que le maître d'application qui lance le pilote dépasse la limite et met fin au processus de fil.
Message d'erreur: Java.lang.OutOfMemoryError
Solution: augmentez la mémoire de vos pilotes en configurant ce qui suit:
conf spark.driver.memory = <XY>g
Bienvenue à SO. Il semble que vous essayez de démarrer avec Spark et ... il semble que vous n'allez pas vraiment dans le bon sens: (car il semble que vous mélangez un ancien code Spark avec le nouveau code Spark et
Spark version 2.0. 0-cloudera1-SNAPSHOT
n'a pas l'air très bien. Puis-je conseiller de regarder des exemples Java? Très égoïstement, je recommanderais github.com/jgperrin/net.jgp.labs.spark ou github.com/jgperrin/net.jgp.books.spark.ch01 ... Heureux de vous aider