Parse CSV as DataFrame/DataSet with Apache Spark and Java -


i new spark, , want use group-by & reduce find following csv (one line employed):

  department, designation, costtocompany, state   sales, trainee, 12000,   sales, lead, 32000, ap   sales, lead, 32000, la   sales, lead, 32000, tn   sales, lead, 32000, ap   sales, lead, 32000, tn    sales, lead, 32000, la   sales, lead, 32000, la   marketing, associate, 18000, tn   marketing, associate, 18000, tn   hr, manager, 58000, tn 

i simplify csv group department, designation, state additional columns sum(costtocompany) , totalemployeecount

should result like:

  dept, desg, state, empcount, totalcost   sales,lead,ap,2,64000   sales,lead,la,3,96000     sales,lead,tn,2,64000 

is there way achieve using transformations , actions. or should go rdd operations?

procedure

  • create class (schema) encapsulate structure (it’s not required approach b, make code easier read if using java)

    public class record implements serializable {   string department;   string designation;   long costtocompany;   string state;   // constructor , getters , setters   } 
  • loading cvs (json) file

    javasparkcontext sc; javardd<string> data = sc.textfile("path/input.csv"); javasqlcontext sqlcontext = new javasqlcontext(sc);  javardd<record> rdd_records = sc.textfile(data).map(   new function<string, record>() {       public record call(string line) throws exception {          // here can use json          // gson gson = new gson();          // gson.fromjson(line, record.class);          string[] fields = line.split(",");          record sd = new record(fields[0], fields[1], fields[2].trim(), fields[3]);          return sd;       } }); 

at point have 2 approaches:

a. sparksql

  • register table (using defined schema class)

    javaschemardd table = sqlcontext.applyschema(rdd_records, record.class); table.registerastable("record_table"); table.printschema(); 
  • query table desired query-group-by

    javaschemardd res = sqlcontext.sql("   select department,designation,state,sum(costtocompany),count(*)    record_table    group department,designation,state "); 
  • here able other query desire, using sql approach

b. spark

  • mapping using composite key: department,designation,state

    javapairrdd<string, tuple2<long, integer>> records_jprdd =  rdd_records.maptopair(new   pairfunction<record, string, tuple2<long, integer>>(){     public tuple2<string, tuple2<long, integer>> call(record record){       tuple2<string, tuple2<long, integer>> t2 =        new tuple2<string, tuple2<long,integer>>(         record.department + record.designation + record.state,         new tuple2<long, integer>(record.costtocompany,1)       );       return t2; } 

    });

  • reducebykey using composite key, summing costtocompany column, , accumulating number of records key

    javapairrdd<string, tuple2<long, integer>> final_rdd_records =   records_jprdd.reducebykey(new function2<tuple2<long, integer>, tuple2<long,  integer>, tuple2<long, integer>>() {     public tuple2<long, integer> call(tuple2<long, integer> v1,     tuple2<long, integer> v2) throws exception {         return new tuple2<long, integer>(v1._1 + v2._1, v1._2+ v2._2);     } }); 

Comments

Popular posts from this blog

javascript - Jquery show_hide, what to add in order to make the page scroll to the bottom of the hidden field once button is clicked -

python - Django-cities exits with "killed" -

python - How to get a widget position inside it's layout in Kivy? -