groovy - How to aggregate CSV lines with Apache Camel? -
i have csv similar this:
county city area street county1 city1 area1 street1 county1 city1 area2 street2 county1 city1 area3 street7 county1 city2 area2 street2 county1 city2 area6 street1 county2 city1 area3 street3 county2 city1 area3 street2 ...
during csv parsing, need aggregate same county/city create final structure this:
county1/city1: [ [area1, street1], [area2, street2], [area3, street7] ] county1/city2: [ [area2, street2], [area6, street1] ] county2/city1: [ [area3, street3], [area3, street2] ]
basically grouping county/city.
i tried different things camel, latest:
class csvappender { csvrow append(csvrow existing, csvrow next) { next.previous = existing next } } @csvrecord(separator = "\\t") class csvrow { @datafield(pos = 1) private string county @datafield(pos = 2) private string city @datafield(pos = 3) private string area @datafield(pos = 4) private string street csvrow previous boolean sameaggregatewithprevious() { previous?.county == county && previous?.city == city } public string tostring() { "${county} ${city} ${area} ${street}" } } class csvroutebuilder extends routebuilder { void configure() { csvappender appender = new csvappender() closure predicate = { exchange -> def body = exchange.getin().getbody(csvrow.class) def currentaggregate = exchange.getin().getheader('currentaggregate') def nextaggregate = exchange.getin().getheader('nextaggregate') if (!currentaggregate) { currentaggregate = body.previous ? [ body.previous ] : [] nextaggregate = [] } else if (exchange.getin().getheader('aggregatecomplete')) { currentaggregate = nextaggregate nextaggregate = [] } def aggregatecomplete = body.sameaggregatewithprevious() if (aggregatecomplete) { nextaggregate << body } else { currentaggregate << body } exchange.getin().setheaders(['currentaggregate': currentaggregate, 'nextaggregate': nextaggregate, 'aggregatecomplete': aggregatecomplete]) aggregatecomplete } from("file:/tmp/folder?noop=true") .split(body().tokenize('\n')).streaming() .unmarshal().bindy(bindytype.csv, csvrow.class) .aggregate(constant(true), aggregationstrategies.bean(appender, "append")).completionpredicate(predicate) .process({ it.getout().setbody(it.getin().getheader('currentaggregate')) }) .convertbodyto(string.class) .to("jms:mycsvsplitter") } }
anyway solution doesn't work sometime "previous" element null , code looks verbose.
any idea how aggregate csv file properly?
i've got rough code works should enough along. it's in java rather groovy, on grounds groovy isn't much. should easy enough translate though.
firstly aggregator:
public class myagregationstrategy implements aggregationstrategy { @override public exchange aggregate(exchange oldexchange, exchange newexchange) { csvrow newbody = (csvrow)newexchange.getin().getbody(); map<string, list<csvrow>> map = null; if (oldexchange == null) { map = new hashmap<string, list<csvrow>>(); arraylist list = new arraylist<csvrow>(); list.add(newbody); map.put(newbody.getcounty(), list); newexchange.getin().setbody(map); return newexchange; } else { map = oldexchange.getin().getbody(map.class); list list = map.get(newbody.getcounty()); if ( list == null ) { list = new arraylist<csvrow>(); } list.add(newbody); map.put(newbody.getcounty(), list); oldexchange.setproperty("camelsplitcomplete", newexchange.getproperty("camelsplitcomplete")); return oldexchange; } } }
this stores rows in list in map, keyed county.
then route:
public class myroutebuilder extends routebuilder { @override public void configure() throws exception { from("file:/c:/dev/test?noop=true") .split(body().tokenize("\n")) .log("read line ${body}") .unmarshal() .bindy(bindytype.csv, csvrow.class) .aggregate(constant(true), new myagregationstrategy()).completionpredicate(simple("${property.camelsplitcomplete} == true")) .process(new processor() { @override public void process(exchange exchange) throws exception { map results = (map) exchange.getin().getbody(); system.out.println("got results " + results.size() + " counties"); } }); } }
it uses camelsplitcomplete property detect when splitting finished. in processpr @ end can map. alternatively can change aggregator strategy aggregate need results.
hope helps.
Comments
Post a Comment