J'utilise PySpark (sur AWS Glue, si cela compte). J'obtiens des erreurs de temporisation: (il semble que l'écriture sur le parquet échoue)
Les journaux complets sur https: // pastebin. com / TmuAcFx7
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import UserDefinedFunction, udf, regexp_replace, to_timestamp, date_format, lit from datetime import datetime, timedelta from pyspark.sql.types import ArrayType, StringType, DateType, Row import math ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # READ IN FLIGHTS, AIRPORTS, AGENTS TABLES # NOTE: Bookmarks enabled for flights data catalog airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "airports") airportsDF = airportsGDF.toDF().select("airportId", "countryName", "cityName", "airportName") airportsDF = airportsDF.withColumn("airportId", airportsDF["airportId"].cast("string")) airportsDF.createOrReplaceTempView("airports") agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "agents") agentsRawDF = agentsGDF.toDF().select("id", "name") agentsRawDF = agentsRawDF.withColumn("id", agentsRawDF["id"].cast("string")) agentsRawDF.createOrReplaceTempView("agents") def batch(iterable, n=1): l = len(iterable) for ndx in range(0, l, n): yield iterable[ndx:min(ndx + n, l)] arr = [13301,12929,14511,9968,15280,10193,13531,13439,16122,9498,16162,17210,12728,14534,12542,13303,16716,13311,12913,11036,17471,16240,10902,15526,17294,15671,10858,17482,12071,12337,17521,12274,10032,17396,11052,9970,12917,12195,10658,17409,13078,17416,17388,12118,10438,13113,11170,14213,9762,10871,11780,12392,15518,13536,10724,14260,16747,18490,17402,10284,10982,10431,16743,12482,10497,15168,16587,15412,17106,11017,17368,13804,15461,19461,16923,9794,12795,25396,12952,15422,10101,14147,10485,12210,25336,9449,15395,13947,11893,11109,9921,9799,15253,16945,13164,10031,17002,17152,16516,13180,16451,16437,11336,13428,10182,25405,16955,10180,12191] def generate_date_series(start, stop): return [start + timedelta(days=x) for x in range(0, (stop-start).days + 1)] spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType())) def getInterval(num, start, stop, incr): if (num is None): return "" lower = math.floor(num / incr) * incr upper = lower + incr return "(%d,%d]" % (lower, upper) spark.udf.register("getInterval", getInterval, StringType()) getIntervalUdf = udf(getInterval) # CREATE DF FOR PAST 90 DAYS EXCLUDING PAST 7 DAYS today = datetime.utcnow().date() start = today - timedelta(days = 14) # TODO: CHANGE TO 90 sevenDaysAgo = today - timedelta(days = 7) print(">>> Generate data frame for ", start, " to ", sevenDaysAgo, "... ") relaventDatesDf = spark.createDataFrame([ Row(start=start, stop=sevenDaysAgo) ]) relaventDatesDf.createOrReplaceTempView("relaventDates") relaventDatesDf = spark.sql("SELECT explode(generate_date_series(start, stop)) AS querydatetime FROM relaventDates") relaventDatesDf.createOrReplaceTempView("relaventDates") print("===LOG:Dates===") relaventDatesDf.show() flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "flights", transformation_ctx="flights", push_down_predicate=""" querydatetime BETWEEN '%s' AND '%s' AND querydestinationplace IN (%s) """ % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), ",".join(map(lambda s: str(s), arr)))) flightsDf = flightsGDF.toDF() flightsDf.createOrReplaceTempView("flights") print("===LOG:STARTING_QUERY===") resultDf = spark.sql(""" SELECT f.*, CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key, countryName, cityName, airportName, a.name AS agentName FROM flights f INNER JOIN agents a ON f.agent = a.id INNER JOIN airports p ON f.querydestinationplace = p.airportId LIMIT 5000000 """) \ .sample(False, 0.001) resultDf.explain(True) print("===LOG:ADDING_COLUMNS===") resultDf = resultDf \ .withColumn("querydatetime", resultDf["querydatetime"].cast("date")) \ .withColumn("queryoutbounddate", resultDf["queryoutbounddate"].cast("date")) \ .withColumn("queryinbounddate", resultDf["queryinbounddate"].cast("date")) \ .withColumn("outdeparture", to_timestamp(resultDf["outdeparture"], "yyyy-MM-dd'T'HH:mm:ss")) \ .withColumn("outarrival", to_timestamp(resultDf["outarrival"], "yyyy-MM-dd'T'HH:mm:ss")) \ .withColumn("indeparture", to_timestamp(resultDf["indeparture"], "yyyy-MM-dd'T'HH:mm:ss")) \ .withColumn("inarrival", to_timestamp(resultDf["inarrival"], "yyyy-MM-dd'T'HH:mm:ss")) print("===LOG:WRITING_RAW===") print("===LOG:DONE_WRITING_RAW===") resultDf.createOrReplaceTempView("flights") # GET DISTINCT DATASET # distinctKeysDf = resultDf.select("outboundlegid", "inboundlegid", "agent").groupBy(["outboundlegid", "inboundlegid", "agent"]) distinctKeysDf = spark.sql(""" SELECT key FROM flights GROUP BY key """) distinctKeysDf.createOrReplaceTempView("distinctKeys") # GET RELAVENT DATES DATASET print("===LOG:WRITING_EXPANDED===") expandedKeyDatesDf = spark.sql(""" SELECT key, querydatetime FROM relaventDates CROSS JOIN distinctKeys """) expandedKeyDatesDf.createOrReplaceTempView("expandedKeyDates") print("===LOG:DONE_WRITING_EXPANDED===") cleanedFlightsDf = spark.sql(""" SELECT e.key AS master_key, e.querydatetime AS master_querydatetime, f.* FROM expandedKeyDates e LEFT JOIN flights f ON e.key = f.key AND e.querydatetime = f.querydatetime ORDER BY e.key, e.querydatetime """) cleanedFlightsDf = cleanedFlightsDf \ .withColumn("created_day", date_format(cleanedFlightsDf["querydatetime"], "EEEE")) \ .withColumn("created_month", date_format(cleanedFlightsDf["querydatetime"], "yyyy-MM")) \ .withColumn("created_month_m", date_format(cleanedFlightsDf["querydatetime"], "M").cast("int")) \ .withColumn("created_week", date_format(cleanedFlightsDf["querydatetime"], "w").cast("int")) \ .withColumn("out_day", date_format(cleanedFlightsDf["outdeparture"], "EEE")) \ .withColumn("out_month", date_format(cleanedFlightsDf["outdeparture"], "yyyy-MM")) \ .withColumn("out_month_m", date_format(cleanedFlightsDf["outdeparture"], "M").cast("int")) \ .withColumn("out_week", date_format(cleanedFlightsDf["outdeparture"], "w").cast("int")) \ .withColumn("out_departure_interval", getIntervalUdf(date_format(cleanedFlightsDf["outdeparture"], "H").cast("int"), lit(0), lit(24), lit(4))) \ .withColumn("out_hour", date_format(cleanedFlightsDf["outdeparture"], "k").cast("int")) \ .withColumn("in_day", date_format(cleanedFlightsDf["indeparture"], "EEE")) \ .withColumn("in_month", date_format(cleanedFlightsDf["indeparture"], "yyyy-MM")) \ .withColumn("in_month_m", date_format(cleanedFlightsDf["indeparture"], "M").cast("int")) \ .withColumn("in_week", date_format(cleanedFlightsDf["indeparture"], "w").cast("int")) \ .withColumn("in_departure_interval", getIntervalUdf(date_format(cleanedFlightsDf["indeparture"], "H").cast("int"), lit(0), lit(24), lit(4))) \ .withColumn("in_hour", date_format(cleanedFlightsDf["indeparture"], "k").cast("int")) print("===LOG:WRITING_CLEANED===") cleanedFlightsDf \ .repartition("countryName", "querydatetime") \ .write \ .mode("overwrite") \ .partitionBy(["countryName", "querydatetime"]) \ .parquet("s3://xxx-glue/cleanedFlights") print("===LOG:DONE_WRITING_CLEANED===") print("===LOG:DONE BATCH %s" % (batch)) job.commit()
Certains googlages suggèrent qu'il a échoué en raison d'un délai d'expiration pendant la diffusion?
Mon code ressemble à ci-dessous. Je pense qu'il a échoué près de la dernière partie, écrire au parquet? Mais les journaux d'explication suggèrent qu'il exécute la requête alors aussi?
File "script_2019-02-06-02-32-43.py", line 197, in <module> .parquet("s3://xxx-glue/cleanedFlights") File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 691, in parquet File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o246.parquet. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange hashpartitioning(countryName#24, querydatetime#213, 200) +- *Project [master_key#588, master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 33 more fields] +- BatchEvalPython [getInterval(cast(date_format(outdeparture#416, H, Some(Zulu)) as int), 0, 24, 4), getInterval(cast(date_format(indeparture#498, H, Some(Zulu)) as int), 0, 24, 4)], [master_key#588, master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 21 more fields] +- *Sort [key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST, 200) +- *Project [key#250 AS master_key#588, querydatetime#101 AS master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 19 more fields] +- *BroadcastHashJoin [key#250, querydatetime#101], [key#590, querydatetime#213], LeftOuter, BuildRight :- *Project [key#250, querydatetime#101] : +- BroadcastNestedLoopJoin BuildRight, Cross : :- Generate explode(pythonUDF0#1633), false, false, [querydatetime#101] : : +- BatchEvalPython [generate_date_series(start#94, stop#95)], [start#94, stop#95, pythonUDF0#1633] : : +- Scan ExistingRDD[start#94,stop#95] : +- BroadcastExchange IdentityBroadcastMode : +- *HashAggregate(keys=[key#250], functions=[], output=[key#250]) : +- *HashAggregate(keys=[key#250], functions=[], output=[key#250]) : +- *Sample 0.0, 0.001, false, 7736333241016522154 : +- *GlobalLimit 5000000 : +- Exchange SinglePartition : +- *LocalLimit 5000000 : +- *Project [concat(outboundlegid#190, -, inboundlegid#191, -, agent#187) AS key#250] : +- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner : :- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(querydestinationplace#212, 200) : : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212] : : +- *SortMergeJoin [agent#187], [id#89], Inner : : :- *Sort [agent#187 ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(agent#187, 200) : : : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212] : : : +- *Filter (isnotnull(agent#187) && isnotnull(querydestinationplace#212)) : : : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields] : : +- *Sort [id#89 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(id#89, 200) : : +- *Project [cast(id#67L as string) AS id#89] : : +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string))) : : +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75] : +- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(cast(airportId#38 as int), 200) : +- *Project [cast(airportId#18L as string) AS airportId#38] : +- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string))) : +- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24] +- BroadcastExchange HashedRelationBroadcastMode(List(input[34, string, true], input[33, date, true])) +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, cast(QueryOutboundDate#183 as date) AS queryoutbounddate#334, cast(QueryInboundDate#184 as date) AS queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, cast(unix_timestamp(OutDeparture#192, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outdeparture#416, cast(unix_timestamp(OutArrival#193, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, cast(unix_timestamp(InDeparture#202, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS indeparture#498, cast(unix_timestamp(InArrival#203, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS inarrival#539, ... 15 more fields] +- *Sample 0.0, 0.001, false, 7736333241016522154 +- *GlobalLimit 5000000 +- Exchange SinglePartition +- *LocalLimit 5000000 +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 15 more fields] +- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner :- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(querydestinationplace#212, 200) : +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 11 more fields] : +- *SortMergeJoin [Agent#187], [id#89], Inner : :- *Sort [Agent#187 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(Agent#187, 200) : : +- *Filter (isnotnull(Agent#187) && isnotnull(querydestinationplace#212)) : : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields] : +- *Sort [id#89 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#89, 200) : +- *Project [cast(id#67L as string) AS id#89, name#68] : +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string))) : +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75] +- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(cast(airportId#38 as int), 200) +- *Project [cast(airportId#18L as string) AS airportId#38, countryName#24, cityName#23, airportName#22] +- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string))) +- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) ... 45 more Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange rangepartitioning(key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST, 200) +- *Project [key#250 AS master_key#588, querydatetime#101 AS master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 19 more fields] +- *BroadcastHashJoin [key#250, querydatetime#101], [key#590, querydatetime#213], LeftOuter, BuildRight :- *Project [key#250, querydatetime#101] : +- BroadcastNestedLoopJoin BuildRight, Cross : :- Generate explode(pythonUDF0#1633), false, false, [querydatetime#101] : : +- BatchEvalPython [generate_date_series(start#94, stop#95)], [start#94, stop#95, pythonUDF0#1633] : : +- Scan ExistingRDD[start#94,stop#95] : +- BroadcastExchange IdentityBroadcastMode : +- *HashAggregate(keys=[key#250], functions=[], output=[key#250]) : +- *HashAggregate(keys=[key#250], functions=[], output=[key#250]) : +- *Sample 0.0, 0.001, false, 7736333241016522154 : +- *GlobalLimit 5000000 : +- Exchange SinglePartition : +- *LocalLimit 5000000 : +- *Project [concat(outboundlegid#190, -, inboundlegid#191, -, agent#187) AS key#250] : +- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner : :- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(querydestinationplace#212, 200) : : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212] : : +- *SortMergeJoin [agent#187], [id#89], Inner : : :- *Sort [agent#187 ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(agent#187, 200) : : : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212] : : : +- *Filter (isnotnull(agent#187) && isnotnull(querydestinationplace#212)) : : : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields] : : +- *Sort [id#89 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(id#89, 200) : : +- *Project [cast(id#67L as string) AS id#89] : : +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string))) : : +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75] : +- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(cast(airportId#38 as int), 200) : +- *Project [cast(airportId#18L as string) AS airportId#38] : +- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string))) : +- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24] +- BroadcastExchange HashedRelationBroadcastMode(List(input[34, string, true], input[33, date, true])) +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, cast(QueryOutboundDate#183 as date) AS queryoutbounddate#334, cast(QueryInboundDate#184 as date) AS queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, cast(unix_timestamp(OutDeparture#192, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outdeparture#416, cast(unix_timestamp(OutArrival#193, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, cast(unix_timestamp(InDeparture#202, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS indeparture#498, cast(unix_timestamp(InArrival#203, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS inarrival#539, ... 15 more fields] +- *Sample 0.0, 0.001, false, 7736333241016522154 +- *GlobalLimit 5000000 +- Exchange SinglePartition +- *LocalLimit 5000000 +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 15 more fields] +- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner :- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(querydestinationplace#212, 200) : +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 11 more fields] : +- *SortMergeJoin [Agent#187], [id#89], Inner : :- *Sort [Agent#187 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(Agent#187, 200) : : +- *Filter (isnotnull(Agent#187) && isnotnull(querydestinationplace#212)) : : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields] : +- *Sort [id#89 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#89, 200) : +- *Project [cast(id#67L as string) AS id#89, name#68] : +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string))) : +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75] +- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(cast(airportId#38 as int), 200) +- *Project [cast(airportId#18L as string) AS airportId#38, countryName#24, cityName#23, airportName#22] +- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string))) +- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) ... 60 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123) at
3 Réponses :
Utilisez glueContext.write_from_options () pour écrire des données
La même erreur se produit. Est-ce censé faire une différence dans la manière dont les données sont traitées?
Les méthodes d'écriture Glue sont optimisées pour l'environnement Glue et n'ont pas de délai d'expiration de 300 secondes. Donc, si vous pensez que le délai d'expiration est pendant l'écriture, je vous suggère d'utiliser l'API Glue.
Le point le plus faible de votre code est le suivant:
getIntervalUdf = spark.udf.register("getInterval", getInterval, StringType())
si vous regardez attentivement le plan d’exécution
spark.udf.register("getInterval", getInterval, StringType()) getIntervalUdf = udf(getInterval)
vous Je vais voir que l'implémentation utilise un processus en deux étapes, où des limites partielles sont collectées sur une seule partition. Quel nombre aussi grand ( LIMIT
n'est tout simplement pas conçu avec un tel scénario à l'esprit) vous pouvez facilement submerger l'exécuteur correspondant.
De plus, LIMIT
dans votre code est redondant, puisque vous le suivez de .sample (False, 0.001)
.
Je vous recommande de supprimer la clause LIMIT
et d'ajuster la fraction en conséquence:
from pyspark.sql.functions import hour ... .withColumn( "out_departure_interval", get_interval(hour("outdeparture"), 0, 24, 4))
De plus, je recommanderais de réécrire generate_date_series
from pyspark.sql.functions import concat, floor from pyspark.sql.functions import Column def get_interval(num, start, stop, incr): assert isinstance(num, Column) lower = floor(num / incr).cast("integer") * incr upper = lower + incr return concat(lit("("), lower, lit(","), upper, lit(")"))
Enfin, je recommande fortement en remplaçant getInterval
UDF par la composition des fonctions intégrées * (arguments inutilisés conservés tels quels):
from pyspark.sql.functions import lit from pyspark.sql import SparkSession def generate_date_series(start, stop): span = (stop - start).days + 1 return (SparkSession.builder.getOrCreate() .range(0, span) .withColumn("start", lit(start)) .selectExpr("date_add(start, id) AS querydatetime")) (generate_date_series(start, seven_days_ago) .createOrReplaceTempView("relaventDates"))
qui pourraient être utilisés ultérieurement comme remplacement direct de UDF, bien qu'il soit peu probable qu'il contribue directement à vos problèmes actuels.
result_full = spark.sql(""" SELECT f.*, CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key, countryName, cityName, airportName, a.name AS agentName FROM flights f INNER JOIN agents a ON f.agent = a.id INNER JOIN airports p ON f.querydestinationplace = p.airportId """) desired_size = (5000000 * 0.001) fraction = desired_size / result_full .count() assert 1 < fraction < 0 result_sample = result_full.sample(False, fraction)
Sur une note latérale UDFRegistration.register
renvoie un objet appelable pour un couple des versions maintenant, vous pourrez donc peut-être remplacer
: +- *GlobalLimit 5000000 : +- Exchange SinglePartition : +- *LocalLimit 5000000
par
LIMIT 5000000
* Vous pouvez également envisager compartimentage à l'aide de fonction window
dédiée :
Répartissez les lignes dans une ou plusieurs fenêtres horaires en fonction d'une colonne de spécification d'horodatage. Les débuts de fenêtre sont inclusifs mais les extrémités de fenêtre sont exclusives, par exemple 12:05 sera dans la fenêtre [12: 05,12: 10) mais pas dans [12: 00,12: 05).
Je trouve que cet échantillon provoque toujours un délai d'expiration pour une raison quelconque, c'est en partie pourquoi j'ai essayé de limiter aussi. La suggestion UDF est plus parce que sa meilleure pratique d'utiliser les fonctions de bibliothèque standard sur personnalisé et pas vraiment des performances correctes?
Aussi pour le fenêtrage, comment l'utiliser ici? Je pensais que c'était plus comme un groupe par remplacement? Je tiens également à mentionner qu'il semble que la meilleure amélioration soit venue de l'ajout d'un indice de requête pour diffuser la table des agents et des aéroports
pyspark.sql.function.window
n'est pas le même type d'outil que les fonctions de fenêtre. C'est juste un utilitaire pratique pour générer des seaux temporels et des fenêtres coulissantes / basculantes. À partir de l'UDF - la première suggestion (plage) est plus fondamentale. Avec udf
(`explode (generate_date (...))`) n'a aucun moyen de comprendre les données et leur montant. Les suggestions restantes liées à udf ne traitent pas directement du délai d'expiration, mais plutôt de la surcharge globale du déplacement des choses entre les contextes - si ce n'était pas clair, je suis désolé - c'était simplement un tueur de performances trop évident pour ne pas être traité.
Cela ressemble à une jointure de diffusion expirée. Le délai d'expiration par défaut est de 300 secondes, et vous pouvez commencer par l'augmenter à heures
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
Maintenant, si la diffusion prend vraiment des heures, envoyer autant de données à tous les nœuds n'est pas une bonne idée, et vous pouvez désactiver les jointures de diffusion
spark.conf.set("spark.sql.broadcastTimeout", 7200)
ou affiner le code pour éviter toute confusion du côté du planificateur de requêtes.