groovy - How to aggregate CSV lines with Apache Camel? -


i have csv similar this:

county  city  area  street county1 city1 area1 street1 county1 city1 area2 street2 county1 city1 area3 street7 county1 city2 area2 street2 county1 city2 area6 street1 county2 city1 area3 street3 county2 city1 area3 street2 ... 

during csv parsing, need aggregate same county/city create final structure this:

county1/city1: [ [area1, street1], [area2, street2], [area3, street7] ] county1/city2: [ [area2, street2], [area6, street1] ] county2/city1: [ [area3, street3], [area3, street2] ] 

basically grouping county/city.

i tried different things camel, latest:

class csvappender {     csvrow append(csvrow existing, csvrow next) {         next.previous = existing         next     } }  @csvrecord(separator = "\\t") class csvrow {     @datafield(pos = 1)     private string county      @datafield(pos = 2)     private string city      @datafield(pos = 3)     private string area      @datafield(pos = 4)     private string street      csvrow previous      boolean sameaggregatewithprevious() {         previous?.county == county && previous?.city == city     }      public string tostring() {         "${county} ${city} ${area} ${street}"     } }  class csvroutebuilder extends routebuilder {      void configure() {         csvappender appender = new csvappender()          closure predicate = { exchange ->             def body = exchange.getin().getbody(csvrow.class)             def currentaggregate = exchange.getin().getheader('currentaggregate')             def nextaggregate = exchange.getin().getheader('nextaggregate')              if (!currentaggregate) {                 currentaggregate = body.previous ? [ body.previous ] : []                 nextaggregate = []             } else if (exchange.getin().getheader('aggregatecomplete')) {                 currentaggregate = nextaggregate                 nextaggregate = []             }              def aggregatecomplete = body.sameaggregatewithprevious()             if (aggregatecomplete) {                 nextaggregate << body             } else {                 currentaggregate << body             }             exchange.getin().setheaders(['currentaggregate': currentaggregate,                                          'nextaggregate': nextaggregate,                                          'aggregatecomplete': aggregatecomplete])             aggregatecomplete         }          from("file:/tmp/folder?noop=true")             .split(body().tokenize('\n')).streaming()             .unmarshal().bindy(bindytype.csv, csvrow.class)                 .aggregate(constant(true), aggregationstrategies.bean(appender, "append")).completionpredicate(predicate)                 .process({                     it.getout().setbody(it.getin().getheader('currentaggregate')) })                 .convertbodyto(string.class)             .to("jms:mycsvsplitter")     } } 

anyway solution doesn't work sometime "previous" element null , code looks verbose.

any idea how aggregate csv file properly?

i've got rough code works should enough along. it's in java rather groovy, on grounds groovy isn't much. should easy enough translate though.

firstly aggregator:

public class myagregationstrategy implements aggregationstrategy {     @override     public exchange aggregate(exchange oldexchange, exchange newexchange) {         csvrow newbody = (csvrow)newexchange.getin().getbody();         map<string, list<csvrow>> map = null;         if (oldexchange == null) {             map = new hashmap<string, list<csvrow>>();             arraylist list = new arraylist<csvrow>();             list.add(newbody);             map.put(newbody.getcounty(), list);             newexchange.getin().setbody(map);             return newexchange;         } else {             map = oldexchange.getin().getbody(map.class);             list list = map.get(newbody.getcounty());             if ( list == null ) {                 list = new arraylist<csvrow>();             }             list.add(newbody);             map.put(newbody.getcounty(), list);              oldexchange.setproperty("camelsplitcomplete", newexchange.getproperty("camelsplitcomplete"));             return oldexchange;         }     } } 

this stores rows in list in map, keyed county.

then route:

public class myroutebuilder extends routebuilder {     @override     public void configure() throws exception {         from("file:/c:/dev/test?noop=true")         .split(body().tokenize("\n"))         .log("read line ${body}")         .unmarshal()         .bindy(bindytype.csv, csvrow.class)             .aggregate(constant(true), new myagregationstrategy()).completionpredicate(simple("${property.camelsplitcomplete} == true"))         .process(new processor() {             @override             public void process(exchange exchange) throws exception {                 map results = (map) exchange.getin().getbody();                 system.out.println("got results " + results.size() + " counties");             }         });     } } 

it uses camelsplitcomplete property detect when splitting finished. in processpr @ end can map. alternatively can change aggregator strategy aggregate need results.

hope helps.


Comments

Popular posts from this blog

javascript - Jquery show_hide, what to add in order to make the page scroll to the bottom of the hidden field once button is clicked -

javascript - Highcharts multi-color line -

javascript - Enter key does not work in search box -