scala - Low performance reduceByKey() in Spark -
i writing program on spark, aggregate key. program pretty simple. input data 2gb, running on multi-core server (8cores, 32gb ram) setting local[2]. using 2 cores parallelization. however, found performance pretty bad. takes 2 hours complete. using kryoserializer. guess might caused serializer. how solve problem?
val datapoints = sparkcontextmanager.textfile(datalocation) .map(x => { val delimited = x.split(",") (delimited(columnindices.home_id_column).tolong, delimited(columnindices.usage_reading_column).todouble) }) def process(step: int): array[(long, list[double])] = { val resultrdd = new pairrddfunctions(datapoints.map(x =>(x._1, list[double](x._2)))) resultrdd.reducebykey((x, y) => x++y).collect() } the output be:
1, [1, 3, 13, 21, ..., 111] // size of list 4000 2, [24,34,65, 24, ..., 245] ....
it looks you're trying write spark job groups values associated same key. pairrddfunctions has groupbykey operation this. spark's implementation of groupbykey takes advantage of several performance optimizations create fewer temporary objects , shuffle less data on network (since each value won't wrapped in list).
if import spark's implicit conversions, using
import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ then won't need manually wrap mapped rdd pairrddfunctions in order access functions groupbykey. doesn't have performance impact , makes large spark programs easier read.
using groupbykey, think process function can rewritten as
def process(step: int): array[(long, seq[double])] = { datapoints.groupbykey().collect() } i'd consider increasing degree of parallelism: both groupbykey , reducebykey take optional numtasks argument controls number of reducers; default, spark uses 8 parallel tasks groupbykey , reducebykey. described in spark scala programming guide, in scaladoc.
Comments
Post a Comment