java - Can emit from cleanup method in storm? -
i have bolt gets data db , process on data , sends bolt double array.
the new bolts needs data of arrays find value. , need value once. , should done after process mentioned.
how can make sure bolt emits data when process done?
and how can use output of it?
and here topology:
//topology definition topologybuilder builder = new topologybuilder(); builder.setspout ("sample-split",new samplesplit(), 1); builder.setbolt ("tuple-split", new tuplesplit(), 1) .shufflegrouping("sample-split"); builder.setbolt ("read-images", new readimage (), 10) .shufflegrouping("tuple-split"); builder.setbolt ("filter-image", new filterimage (), 10) .shufflegrouping("read-images"); builder.setbolt ("wavelet-transformation", new wavelettransformation (), 5) .shufflegrouping("filter-image"); //builder.setbolt ("pnmstd", new pnmstd (), 5 ). // shufflegrouping ("wavelet-transformation");
i want implement code on storm :
public static double[][] pnmstd(string ms,double []mp, double []mn,string trainingsetfile,string wvdecfiledir, string testannotationlevelfromoronly,int testannotationid,int testannotationlevel) { bufferedreader infile; int psamples=0; int nsamples=0; int n=0; try { infile=new bufferedreader( new filereader(trainingsetfile)); string strsqls=infile.readline(); string tset[]=strsqls.split(", "); string datafile=wvdecfiledir+"/e"+tset[1]+"_1.dat"; double c[]=fileio.xread(datafile); n=c.length; infile.close(); } catch (filenotfoundexception e) { e.printstacktrace(); } catch (ioexception e) { e.printstacktrace(); } double []msp=new double[n]; double []msn=new double[n]; double [][] meanstd=new double[2][n]; for(int i=0;i<n;i++) { msp[i]=0; msn[i]=0; } try { infile=new bufferedreader( new filereader(trainingsetfile)); string strsqls; while ((strsqls=infile.readline() )!= null){ //string strsqls=infile.readline(); //system.out.println(strsqls); string tset[]=strsqls.split(", "); string datafile=wvdecfiledir+"/e"+tset[1]+"_1.dat"; double c[]=fileio.xread(datafile); if(testannotationlevelfromoronly.equals("from")) { if (integer.parseint(tset[testannotationid+4])>=testannotationlevel) { for(int i=0;i<c.length;i++) { if (ms.equals("mean")){ msp[i]=msp[i]+c[i];} else{msp[i]=msp[i]+(c[i]-mp[i])*(c[i]-mp[i]); } } psamples++; } else { for(int i=0;i<c.length;i++) { if (ms.equals("mean")){msn[i]=msn[i]+c[i];} else{msn[i]=msn[i]+(c[i]-mn[i])*(c[i]-mn[i]);} } nsamples++; } } else { if (integer.parseint(tset[testannotationid+4])==testannotationlevel) { for(int i=0;i<c.length;i++) {if (ms.equals("mean")){msp[i]=msp[i]+c[i];} else{msp[i]=msp[i]+(c[i]-mp[i])*(c[i]-mp[i]);} } psamples++; } else { for(int i=0;i<c.length;i++) { if (ms.equals("mean")){msn[i]=msn[i]+c[i];} else{msn[i]=msn[i]+(c[i]-mn[i])*(c[i]-mn[i]);} } nsamples++; } } } infile.close(); } catch (filenotfoundexception e) { e.printstacktrace(); } catch (ioexception e) { e.printstacktrace(); } for(int i=1;i<n;i++) { meanstd[0][i]=msp[i]/psamples; meanstd[1][i]=msn[i]/psamples; // meaningless should asked!!! } return meanstd; }
for function work, need files created. , previous bolt processing , creating files. , next bolt need bolt finish process on inputs. (so can use output in next bolt.)
Comments
Post a Comment