scala - How does a spark java program work -
i new spark. in hadoop mapreduce, map function called each time each line of file , setup function called once each mapper.
how spark map function work.
i have below code of mapper , want implement in spark.
public static class pairsmapper extends mapper<longwritable, text, text,text> { private string firstfilepath; private list<string> sourcerecords; private string line; //declare variables column positions in record private int pos_last_name=0; public void setup(context context) throws ioexception,interruptedexception { firstfilepath = context.getconfiguration().get("firstfilepath"); new arraylist<string>(); sourcerecords = new arraylist<string>(); filesystem fs = filesystem.get(context.getconfiguration()); filestatus[] status = fs.liststatus(new path(firstfilepath)); for(int i=0;i<status.length;i++){ bufferedreader bufferedreader = new bufferedreader(new inputstreamreader(fs.open(status[i].getpath()))); string line; while((line= bufferedreader.readline())!=null){ sourcerecords.add(line.tostring()); } } } public void map(longwritable key, text value,context context) throws ioexception, interruptedexception { line = value.tostring(); string[] target = line.split(","); if (target.length==21){ for(string inputline : sourcerecords){ if ((!(inputline.equals(line))) && inputline.length() > 0 && line.length() > 0) { string[] source = inputline.split(","); if (source.length == 21){ //blocking using string comparison of _lastname, _firstname, gender , _firstname, gender, dob if (source[pos_last_name].equals(target[pos_last_name])) { context.write(new text(inputline), new text(line)); } } } } } } }
i try convert mapper spark application:
import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import scala.io.source import java.io.file val sc = new sparkcontext("your_spark_master", "your_application_name") // read local dir val sourcerecords = new file("your_dir_path").listfiles().flatmap(file => source.fromfile(file).getlines.tolist).filter(_.length > 0) val sourcerecordsbc = sc.broadcast(sourcerecords) val inputrdd = sc.textfile("hdfs://your_input_path") val outputrdd = inputrdd.filter { _.length > 0 } .map { line => (line, line.split(",")) } .filter { _._2.size == 21 } .flatmap { case (line, target) => { val pos_last_name = 0 ( inputline <- sourcerecordsbc.value if inputline != line; val source = inputline.split(",") if source.size == 21 && source(pos_last_name) == target(pos_last_name) ) yield (inputline, line) } } outputrdd.saveastextfile("hdfs://your_output_path")
this example. have not tested yet. if find problem, please let me know.
Comments
Post a Comment