scala - How does a spark java program work -


i new spark. in hadoop mapreduce, map function called each time each line of file , setup function called once each mapper.

how spark map function work.

i have below code of mapper , want implement in spark.

public static class pairsmapper extends mapper<longwritable, text, text,text> { private string firstfilepath; private list<string> sourcerecords; private string line; //declare variables column positions in record           private int pos_last_name=0;  public void setup(context context) throws ioexception,interruptedexception {               firstfilepath = context.getconfiguration().get("firstfilepath");         new arraylist<string>();         sourcerecords = new arraylist<string>();         filesystem fs = filesystem.get(context.getconfiguration());         filestatus[] status = fs.liststatus(new path(firstfilepath));          for(int i=0;i<status.length;i++){         bufferedreader bufferedreader = new bufferedreader(new inputstreamreader(fs.open(status[i].getpath())));         string line;                         while((line= bufferedreader.readline())!=null){         sourcerecords.add(line.tostring());          }         }     }      public void map(longwritable key, text value,context context)                 throws ioexception, interruptedexception {         line = value.tostring();          string[] target = line.split(",");          if (target.length==21){             for(string inputline : sourcerecords){                      if ((!(inputline.equals(line))) && inputline.length() > 0 && line.length() > 0)                  {                     string[] source = inputline.split(",");                    if (source.length == 21){                     //blocking using string comparison of _lastname, _firstname, gender , _firstname, gender, dob             if (source[pos_last_name].equals(target[pos_last_name]))                     {                         context.write(new text(inputline), new text(line));                     }                     }                  }             }             }     }  } 

i try convert mapper spark application:

import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import scala.io.source import java.io.file    val sc = new sparkcontext("your_spark_master", "your_application_name")   // read local dir   val sourcerecords = new file("your_dir_path").listfiles().flatmap(file => source.fromfile(file).getlines.tolist).filter(_.length > 0)   val sourcerecordsbc = sc.broadcast(sourcerecords)   val inputrdd = sc.textfile("hdfs://your_input_path")   val outputrdd = inputrdd.filter { _.length > 0 }     .map { line => (line, line.split(",")) }     .filter { _._2.size == 21 }     .flatmap {       case (line, target) => {         val pos_last_name = 0         (           inputline <- sourcerecordsbc.value if inputline != line;           val source = inputline.split(",") if source.size == 21 && source(pos_last_name) == target(pos_last_name)         ) yield (inputline, line)       }     }   outputrdd.saveastextfile("hdfs://your_output_path") 

this example. have not tested yet. if find problem, please let me know.


Comments

Popular posts from this blog

Android layout hidden on keyboard show -

google app engine - 403 Forbidden POST - Flask WTForms -

c - Why would PK11_GenerateRandom() return an error -8023? -