Parse CSV as DataFrame/DataSet with Apache Spark and Java -
i new spark, , want use group-by & reduce find following csv (one line employed):
department, designation, costtocompany, state sales, trainee, 12000, sales, lead, 32000, ap sales, lead, 32000, la sales, lead, 32000, tn sales, lead, 32000, ap sales, lead, 32000, tn sales, lead, 32000, la sales, lead, 32000, la marketing, associate, 18000, tn marketing, associate, 18000, tn hr, manager, 58000, tn
i simplify csv group department, designation, state additional columns sum(costtocompany) , totalemployeecount
should result like:
dept, desg, state, empcount, totalcost sales,lead,ap,2,64000 sales,lead,la,3,96000 sales,lead,tn,2,64000
is there way achieve using transformations , actions. or should go rdd operations?
procedure
create class (schema) encapsulate structure (it’s not required approach b, make code easier read if using java)
public class record implements serializable { string department; string designation; long costtocompany; string state; // constructor , getters , setters }
loading cvs (json) file
javasparkcontext sc; javardd<string> data = sc.textfile("path/input.csv"); javasqlcontext sqlcontext = new javasqlcontext(sc); javardd<record> rdd_records = sc.textfile(data).map( new function<string, record>() { public record call(string line) throws exception { // here can use json // gson gson = new gson(); // gson.fromjson(line, record.class); string[] fields = line.split(","); record sd = new record(fields[0], fields[1], fields[2].trim(), fields[3]); return sd; } });
at point have 2 approaches:
a. sparksql
register table (using defined schema class)
javaschemardd table = sqlcontext.applyschema(rdd_records, record.class); table.registerastable("record_table"); table.printschema();
query table desired query-group-by
javaschemardd res = sqlcontext.sql(" select department,designation,state,sum(costtocompany),count(*) record_table group department,designation,state ");
here able other query desire, using sql approach
b. spark
mapping using composite key:
department
,designation
,state
javapairrdd<string, tuple2<long, integer>> records_jprdd = rdd_records.maptopair(new pairfunction<record, string, tuple2<long, integer>>(){ public tuple2<string, tuple2<long, integer>> call(record record){ tuple2<string, tuple2<long, integer>> t2 = new tuple2<string, tuple2<long,integer>>( record.department + record.designation + record.state, new tuple2<long, integer>(record.costtocompany,1) ); return t2; }
});
reducebykey using composite key, summing
costtocompany
column, , accumulating number of records keyjavapairrdd<string, tuple2<long, integer>> final_rdd_records = records_jprdd.reducebykey(new function2<tuple2<long, integer>, tuple2<long, integer>, tuple2<long, integer>>() { public tuple2<long, integer> call(tuple2<long, integer> v1, tuple2<long, integer> v2) throws exception { return new tuple2<long, integer>(v1._1 + v2._1, v1._2+ v2._2); } });
Comments
Post a Comment