6
votes

PySpark timeout essayant de repartitionner / écrire sur le parquet (Futures expiré après [300 secondes])?

J'utilise PySpark (sur AWS Glue, si cela compte). J'obtiens des erreurs de temporisation: (il semble que l'écriture sur le parquet échoue)

Les journaux complets sur https: // pastebin. com / TmuAcFx7

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import UserDefinedFunction, udf, regexp_replace, to_timestamp, date_format, lit
from datetime import datetime, timedelta
from pyspark.sql.types import ArrayType, StringType, DateType, Row
import math

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# READ IN FLIGHTS, AIRPORTS, AGENTS TABLES
# NOTE: Bookmarks enabled for flights data catalog
airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "airports")
airportsDF = airportsGDF.toDF().select("airportId", "countryName", "cityName", "airportName")
airportsDF = airportsDF.withColumn("airportId", airportsDF["airportId"].cast("string"))
airportsDF.createOrReplaceTempView("airports")

agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "agents")
agentsRawDF = agentsGDF.toDF().select("id", "name")
agentsRawDF = agentsRawDF.withColumn("id", agentsRawDF["id"].cast("string"))
agentsRawDF.createOrReplaceTempView("agents")

def batch(iterable, n=1):
    l = len(iterable)
    for ndx in range(0, l, n):
        yield iterable[ndx:min(ndx + n, l)]

arr = [13301,12929,14511,9968,15280,10193,13531,13439,16122,9498,16162,17210,12728,14534,12542,13303,16716,13311,12913,11036,17471,16240,10902,15526,17294,15671,10858,17482,12071,12337,17521,12274,10032,17396,11052,9970,12917,12195,10658,17409,13078,17416,17388,12118,10438,13113,11170,14213,9762,10871,11780,12392,15518,13536,10724,14260,16747,18490,17402,10284,10982,10431,16743,12482,10497,15168,16587,15412,17106,11017,17368,13804,15461,19461,16923,9794,12795,25396,12952,15422,10101,14147,10485,12210,25336,9449,15395,13947,11893,11109,9921,9799,15253,16945,13164,10031,17002,17152,16516,13180,16451,16437,11336,13428,10182,25405,16955,10180,12191]

def generate_date_series(start, stop):
    return [start + timedelta(days=x) for x in range(0, (stop-start).days + 1)]    

spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()))

def getInterval(num, start, stop, incr): 
    if (num is None):
        return ""

    lower = math.floor(num / incr) * incr
    upper = lower + incr
    return "(%d,%d]" % (lower, upper) 

spark.udf.register("getInterval", getInterval, StringType())
getIntervalUdf = udf(getInterval)

# CREATE DF FOR PAST 90 DAYS EXCLUDING PAST 7 DAYS
today = datetime.utcnow().date()
start = today - timedelta(days = 14) # TODO: CHANGE TO 90
sevenDaysAgo = today - timedelta(days = 7)
print(">>> Generate data frame for ", start, " to ", sevenDaysAgo, "... ")
relaventDatesDf = spark.createDataFrame([
    Row(start=start, stop=sevenDaysAgo)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")

relaventDatesDf = spark.sql("SELECT explode(generate_date_series(start, stop)) AS querydatetime FROM relaventDates")
relaventDatesDf.createOrReplaceTempView("relaventDates")
print("===LOG:Dates===")
relaventDatesDf.show()

flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
    querydatetime BETWEEN '%s' AND '%s' 
    AND querydestinationplace IN (%s)
""" % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), ",".join(map(lambda s: str(s), arr))))

flightsDf = flightsGDF.toDF()
flightsDf.createOrReplaceTempView("flights")
print("===LOG:STARTING_QUERY===")

resultDf = spark.sql("""
    SELECT 
        f.*, 
        CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key,
        countryName, cityName, airportName, a.name AS agentName
    FROM flights f
    INNER JOIN agents a
    ON f.agent = a.id
    INNER JOIN airports p
    ON f.querydestinationplace = p.airportId
    LIMIT 5000000
""") \
    .sample(False, 0.001)
resultDf.explain(True)

print("===LOG:ADDING_COLUMNS===")
resultDf = resultDf \
    .withColumn("querydatetime", resultDf["querydatetime"].cast("date")) \
    .withColumn("queryoutbounddate", resultDf["queryoutbounddate"].cast("date")) \
    .withColumn("queryinbounddate", resultDf["queryinbounddate"].cast("date")) \
    .withColumn("outdeparture", to_timestamp(resultDf["outdeparture"], "yyyy-MM-dd'T'HH:mm:ss")) \
    .withColumn("outarrival", to_timestamp(resultDf["outarrival"], "yyyy-MM-dd'T'HH:mm:ss")) \
    .withColumn("indeparture", to_timestamp(resultDf["indeparture"], "yyyy-MM-dd'T'HH:mm:ss")) \
    .withColumn("inarrival", to_timestamp(resultDf["inarrival"], "yyyy-MM-dd'T'HH:mm:ss"))

print("===LOG:WRITING_RAW===")
print("===LOG:DONE_WRITING_RAW===")
resultDf.createOrReplaceTempView("flights")

# GET DISTINCT DATASET
# distinctKeysDf = resultDf.select("outboundlegid", "inboundlegid", "agent").groupBy(["outboundlegid", "inboundlegid", "agent"])
distinctKeysDf = spark.sql("""
    SELECT key
    FROM flights
    GROUP BY key
""")
distinctKeysDf.createOrReplaceTempView("distinctKeys")

# GET RELAVENT DATES DATASET
print("===LOG:WRITING_EXPANDED===")
expandedKeyDatesDf = spark.sql("""
    SELECT key, querydatetime
    FROM relaventDates
    CROSS JOIN distinctKeys
""")

expandedKeyDatesDf.createOrReplaceTempView("expandedKeyDates")
print("===LOG:DONE_WRITING_EXPANDED===")

cleanedFlightsDf = spark.sql("""
    SELECT 
        e.key AS master_key, 
        e.querydatetime AS master_querydatetime, 
        f.*
    FROM expandedKeyDates e
    LEFT JOIN flights f
    ON e.key = f.key
    AND e.querydatetime = f.querydatetime
    ORDER BY e.key, e.querydatetime
""")
cleanedFlightsDf = cleanedFlightsDf \
    .withColumn("created_day", date_format(cleanedFlightsDf["querydatetime"], "EEEE")) \
    .withColumn("created_month", date_format(cleanedFlightsDf["querydatetime"], "yyyy-MM")) \
    .withColumn("created_month_m", date_format(cleanedFlightsDf["querydatetime"], "M").cast("int")) \
    .withColumn("created_week", date_format(cleanedFlightsDf["querydatetime"], "w").cast("int")) \
    .withColumn("out_day", date_format(cleanedFlightsDf["outdeparture"], "EEE")) \
    .withColumn("out_month", date_format(cleanedFlightsDf["outdeparture"], "yyyy-MM")) \
    .withColumn("out_month_m", date_format(cleanedFlightsDf["outdeparture"], "M").cast("int")) \
    .withColumn("out_week", date_format(cleanedFlightsDf["outdeparture"], "w").cast("int")) \
    .withColumn("out_departure_interval", getIntervalUdf(date_format(cleanedFlightsDf["outdeparture"], "H").cast("int"), lit(0), lit(24), lit(4))) \
    .withColumn("out_hour", date_format(cleanedFlightsDf["outdeparture"], "k").cast("int")) \
    .withColumn("in_day", date_format(cleanedFlightsDf["indeparture"], "EEE")) \
    .withColumn("in_month", date_format(cleanedFlightsDf["indeparture"], "yyyy-MM")) \
    .withColumn("in_month_m", date_format(cleanedFlightsDf["indeparture"], "M").cast("int")) \
    .withColumn("in_week", date_format(cleanedFlightsDf["indeparture"], "w").cast("int")) \
    .withColumn("in_departure_interval", getIntervalUdf(date_format(cleanedFlightsDf["indeparture"], "H").cast("int"), lit(0), lit(24), lit(4))) \
    .withColumn("in_hour", date_format(cleanedFlightsDf["indeparture"], "k").cast("int"))

print("===LOG:WRITING_CLEANED===")
cleanedFlightsDf \
    .repartition("countryName", "querydatetime") \
    .write \
    .mode("overwrite") \
    .partitionBy(["countryName", "querydatetime"]) \
    .parquet("s3://xxx-glue/cleanedFlights")
print("===LOG:DONE_WRITING_CLEANED===")

print("===LOG:DONE BATCH %s" % (batch))

job.commit()

Certains googlages suggèrent qu'il a échoué en raison d'un délai d'expiration pendant la diffusion?

Mon code ressemble à ci-dessous. Je pense qu'il a échoué près de la dernière partie, écrire au parquet? Mais les journaux d'explication suggèrent qu'il exécute la requête alors aussi?

File "script_2019-02-06-02-32-43.py", line 197, in <module>
.parquet("s3://xxx-glue/cleanedFlights")
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 691, in parquet
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/mnt/yarn/usercache/root/appcache/application_1549418793443_0001/container_1549418793443_0001_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o246.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(countryName#24, querydatetime#213, 200)
+- *Project [master_key#588, master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 33 more fields]
+- BatchEvalPython [getInterval(cast(date_format(outdeparture#416, H, Some(Zulu)) as int), 0, 24, 4), getInterval(cast(date_format(indeparture#498, H, Some(Zulu)) as int), 0, 24, 4)], [master_key#588, master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 21 more fields]
+- *Sort [key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST, 200)
+- *Project [key#250 AS master_key#588, querydatetime#101 AS master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 19 more fields]
+- *BroadcastHashJoin [key#250, querydatetime#101], [key#590, querydatetime#213], LeftOuter, BuildRight
:- *Project [key#250, querydatetime#101]
: +- BroadcastNestedLoopJoin BuildRight, Cross
: :- Generate explode(pythonUDF0#1633), false, false, [querydatetime#101]
: : +- BatchEvalPython [generate_date_series(start#94, stop#95)], [start#94, stop#95, pythonUDF0#1633]
: : +- Scan ExistingRDD[start#94,stop#95]
: +- BroadcastExchange IdentityBroadcastMode
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *Sample 0.0, 0.001, false, 7736333241016522154
: +- *GlobalLimit 5000000
: +- Exchange SinglePartition
: +- *LocalLimit 5000000
: +- *Project [concat(outboundlegid#190, -, inboundlegid#191, -, agent#187) AS key#250]
: +- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
: :- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(querydestinationplace#212, 200)
: : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : +- *SortMergeJoin [agent#187], [id#89], Inner
: : :- *Sort [agent#187 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(agent#187, 200)
: : : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : : +- *Filter (isnotnull(agent#187) && isnotnull(querydestinationplace#212))
: : : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: : +- *Sort [id#89 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#89, 200)
: : +- *Project [cast(id#67L as string) AS id#89]
: : +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: : +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
: +- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(airportId#38 as int), 200)
: +- *Project [cast(airportId#18L as string) AS airportId#38]
: +- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
: +- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[34, string, true], input[33, date, true]))
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, cast(QueryOutboundDate#183 as date) AS queryoutbounddate#334, cast(QueryInboundDate#184 as date) AS queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, cast(unix_timestamp(OutDeparture#192, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outdeparture#416, cast(unix_timestamp(OutArrival#193, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, cast(unix_timestamp(InDeparture#202, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS indeparture#498, cast(unix_timestamp(InArrival#203, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS inarrival#539, ... 15 more fields]
+- *Sample 0.0, 0.001, false, 7736333241016522154
+- *GlobalLimit 5000000
+- Exchange SinglePartition
+- *LocalLimit 5000000
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 15 more fields]
+- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
:- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(querydestinationplace#212, 200)
: +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 11 more fields]
: +- *SortMergeJoin [Agent#187], [id#89], Inner
: :- *Sort [Agent#187 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(Agent#187, 200)
: : +- *Filter (isnotnull(Agent#187) && isnotnull(querydestinationplace#212))
: : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: +- *Sort [id#89 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#89, 200)
: +- *Project [cast(id#67L as string) AS id#89, name#68]
: +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
+- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(airportId#38 as int), 200)
+- *Project [cast(airportId#18L as string) AS airportId#38, countryName#24, cityName#23, airportName#22]
+- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
+- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
... 45 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(key#250 ASC NULLS FIRST, querydatetime#101 ASC NULLS FIRST, 200)
+- *Project [key#250 AS master_key#588, querydatetime#101 AS master_querydatetime#589, Id#180, QueryTaskId#181, QueryOriginPlace#182, queryoutbounddate#334, queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, outdeparture#416, outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, ... 19 more fields]
+- *BroadcastHashJoin [key#250, querydatetime#101], [key#590, querydatetime#213], LeftOuter, BuildRight
:- *Project [key#250, querydatetime#101]
: +- BroadcastNestedLoopJoin BuildRight, Cross
: :- Generate explode(pythonUDF0#1633), false, false, [querydatetime#101]
: : +- BatchEvalPython [generate_date_series(start#94, stop#95)], [start#94, stop#95, pythonUDF0#1633]
: : +- Scan ExistingRDD[start#94,stop#95]
: +- BroadcastExchange IdentityBroadcastMode
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *HashAggregate(keys=[key#250], functions=[], output=[key#250])
: +- *Sample 0.0, 0.001, false, 7736333241016522154
: +- *GlobalLimit 5000000
: +- Exchange SinglePartition
: +- *LocalLimit 5000000
: +- *Project [concat(outboundlegid#190, -, inboundlegid#191, -, agent#187) AS key#250]
: +- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
: :- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(querydestinationplace#212, 200)
: : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : +- *SortMergeJoin [agent#187], [id#89], Inner
: : :- *Sort [agent#187 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(agent#187, 200)
: : : +- *Project [Agent#187, OutboundLegId#190, InboundLegId#191, querydestinationplace#212]
: : : +- *Filter (isnotnull(agent#187) && isnotnull(querydestinationplace#212))
: : : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: : +- *Sort [id#89 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#89, 200)
: : +- *Project [cast(id#67L as string) AS id#89]
: : +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: : +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
: +- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(airportId#38 as int), 200)
: +- *Project [cast(airportId#18L as string) AS airportId#38]
: +- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
: +- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[34, string, true], input[33, date, true]))
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, cast(QueryOutboundDate#183 as date) AS queryoutbounddate#334, cast(QueryInboundDate#184 as date) AS queryinbounddate#375, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, cast(unix_timestamp(OutDeparture#192, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outdeparture#416, cast(unix_timestamp(OutArrival#193, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS outarrival#457, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, cast(unix_timestamp(InDeparture#202, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS indeparture#498, cast(unix_timestamp(InArrival#203, yyyy-MM-dd'T'HH:mm:ss, Some(Zulu)) as timestamp) AS inarrival#539, ... 15 more fields]
+- *Sample 0.0, 0.001, false, 7736333241016522154
+- *GlobalLimit 5000000
+- Exchange SinglePartition
+- *LocalLimit 5000000
+- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 15 more fields]
+- *SortMergeJoin [querydestinationplace#212], [cast(airportId#38 as int)], Inner
:- *Sort [querydestinationplace#212 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(querydestinationplace#212, 200)
: +- *Project [Id#180, QueryTaskId#181, QueryOriginPlace#182, QueryOutboundDate#183, QueryInboundDate#184, QueryCabinClass#185, QueryCurrency#186, Agent#187, QuoteAgeInMinutes#188, Price#189, OutboundLegId#190, InboundLegId#191, OutDeparture#192, OutArrival#193, OutDuration#194, OutJourneyMode#195, OutStops#196, OutCarriers#197, OutOperatingCarriers#198, NumberOutStops#199, NumberOutCarriers#200, NumberOutOperatingCarriers#201, InDeparture#202, InArrival#203, ... 11 more fields]
: +- *SortMergeJoin [Agent#187], [id#89], Inner
: :- *Sort [Agent#187 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(Agent#187, 200)
: : +- *Filter (isnotnull(Agent#187) && isnotnull(querydestinationplace#212))
: : +- Scan ExistingRDD[Id#180,QueryTaskId#181,QueryOriginPlace#182,QueryOutboundDate#183,QueryInboundDate#184,QueryCabinClass#185,QueryCurrency#186,Agent#187,QuoteAgeInMinutes#188,Price#189,OutboundLegId#190,InboundLegId#191,OutDeparture#192,OutArrival#193,OutDuration#194,OutJourneyMode#195,OutStops#196,OutCarriers#197,OutOperatingCarriers#198,NumberOutStops#199,NumberOutCarriers#200,NumberOutOperatingCarriers#201,InDeparture#202,InArrival#203,... 10 more fields]
: +- *Sort [id#89 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#89, 200)
: +- *Project [cast(id#67L as string) AS id#89, name#68]
: +- *Filter (isnotnull(id#67L) && isnotnull(cast(id#67L as string)))
: +- Scan ExistingRDD[id#67L,name#68,imageurl#69,status#70,optimisedformobile#71,type#72,bookingnumber#73,createdat#74,updatedat#75]
+- *Sort [cast(airportId#38 as int) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(airportId#38 as int), 200)
+- *Project [cast(airportId#18L as string) AS airportId#38, countryName#24, cityName#23, airportName#22]
+- *Filter (isnotnull(airportId#18L) && isnotnull(cast(airportId#18L as string)))
+- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
... 60 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
at 


0 commentaires

3 Réponses :


-1
votes

Utilisez glueContext.write_from_options () pour écrire des données


2 commentaires

La même erreur se produit. Est-ce censé faire une différence dans la manière dont les données sont traitées?


Les méthodes d'écriture Glue sont optimisées pour l'environnement Glue et n'ont pas de délai d'expiration de 300 secondes. Donc, si vous pensez que le délai d'expiration est pendant l'écriture, je vous suggère d'utiliser l'API Glue.



3
votes

Le point le plus faible de votre code est le suivant:

getIntervalUdf = spark.udf.register("getInterval", getInterval, StringType())

si vous regardez attentivement le plan d’exécution

spark.udf.register("getInterval", getInterval, StringType())
getIntervalUdf = udf(getInterval)

vous Je vais voir que l'implémentation utilise un processus en deux étapes, où des limites partielles sont collectées sur une seule partition. Quel nombre aussi grand ( LIMIT n'est tout simplement pas conçu avec un tel scénario à l'esprit) vous pouvez facilement submerger l'exécuteur correspondant.

De plus, LIMIT dans votre code est redondant, puisque vous le suivez de .sample (False, 0.001) .

Je vous recommande de supprimer la clause LIMIT et d'ajuster la fraction en conséquence:

from pyspark.sql.functions import hour

...
    .withColumn(
        "out_departure_interval",
        get_interval(hour("outdeparture"), 0, 24, 4))

De plus, je recommanderais de réécrire generate_date_series

from pyspark.sql.functions import concat, floor
from pyspark.sql.functions import Column


def get_interval(num, start, stop, incr):
    assert isinstance(num, Column)

    lower = floor(num / incr).cast("integer") * incr
    upper = lower + incr
    return concat(lit("("), lower, lit(","), upper, lit(")"))

Enfin, je recommande fortement en remplaçant getInterval UDF par la composition des fonctions intégrées * (arguments inutilisés conservés tels quels):

from pyspark.sql.functions import lit
from pyspark.sql import SparkSession


def generate_date_series(start, stop):
    span = (stop - start).days + 1
    return (SparkSession.builder.getOrCreate()
               .range(0, span)
               .withColumn("start", lit(start))
               .selectExpr("date_add(start, id) AS querydatetime"))


(generate_date_series(start, seven_days_ago)
    .createOrReplaceTempView("relaventDates"))

qui pourraient être utilisés ultérieurement comme remplacement direct de UDF, bien qu'il soit peu probable qu'il contribue directement à vos problèmes actuels.

result_full = spark.sql("""
    SELECT 
        f.*, 
        CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key,
        countryName, cityName, airportName, a.name AS agentName
    FROM flights f
    INNER JOIN agents a
    ON f.agent = a.id
    INNER JOIN airports p
    ON f.querydestinationplace = p.airportId
""")

desired_size = (5000000 * 0.001)
fraction = desired_size / result_full .count()
assert 1 < fraction < 0  

result_sample = result_full.sample(False, fraction)

Sur une note latérale UDFRegistration.register renvoie un objet appelable pour un couple des versions maintenant, vous pourrez donc peut-être remplacer

: +- *GlobalLimit 5000000
: +- Exchange SinglePartition
: +- *LocalLimit 5000000

par

LIMIT 5000000

* Vous pouvez également envisager compartimentage à l'aide de fonction window dédiée :

Répartissez les lignes dans une ou plusieurs fenêtres horaires en fonction d'une colonne de spécification d'horodatage. Les débuts de fenêtre sont inclusifs mais les extrémités de fenêtre sont exclusives, par exemple 12:05 sera dans la fenêtre [12: 05,12: 10) mais pas dans [12: 00,12: 05).


3 commentaires

Je trouve que cet échantillon provoque toujours un délai d'expiration pour une raison quelconque, c'est en partie pourquoi j'ai essayé de limiter aussi. La suggestion UDF est plus parce que sa meilleure pratique d'utiliser les fonctions de bibliothèque standard sur personnalisé et pas vraiment des performances correctes?


Aussi pour le fenêtrage, comment l'utiliser ici? Je pensais que c'était plus comme un groupe par remplacement? Je tiens également à mentionner qu'il semble que la meilleure amélioration soit venue de l'ajout d'un indice de requête pour diffuser la table des agents et des aéroports


pyspark.sql.function.window n'est pas le même type d'outil que les fonctions de fenêtre. C'est juste un utilitaire pratique pour générer des seaux temporels et des fenêtres coulissantes / basculantes. À partir de l'UDF - la première suggestion (plage) est plus fondamentale. Avec udf (`explode (generate_date (...))`) n'a aucun moyen de comprendre les données et leur montant. Les suggestions restantes liées à udf ne traitent pas directement du délai d'expiration, mais plutôt de la surcharge globale du déplacement des choses entre les contextes - si ce n'était pas clair, je suis désolé - c'était simplement un tueur de performances trop évident pour ne pas être traité.



1
votes

Cela ressemble à une jointure de diffusion expirée. Le délai d'expiration par défaut est de 300 secondes, et vous pouvez commencer par l'augmenter à heures

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

Maintenant, si la diffusion prend vraiment des heures, envoyer autant de données à tous les nœuds n'est pas une bonne idée, et vous pouvez désactiver les jointures de diffusion

spark.conf.set("spark.sql.broadcastTimeout", 7200)

ou affiner le code pour éviter toute confusion du côté du planificateur de requêtes.


0 commentaires