java - Can emit from cleanup method in storm? -


i have bolt gets data db , process on data , sends bolt double array.

the new bolts needs data of arrays find value. , need value once. , should done after process mentioned.

how can make sure bolt emits data when process done?

and how can use output of it?

and here topology:

//topology definition topologybuilder builder = new topologybuilder();  builder.setspout    ("sample-split",new samplesplit(), 1); builder.setbolt     ("tuple-split", new tuplesplit(), 1)             .shufflegrouping("sample-split"); builder.setbolt     ("read-images", new readimage (), 10)             .shufflegrouping("tuple-split"); builder.setbolt     ("filter-image", new filterimage (), 10)         .shufflegrouping("read-images"); builder.setbolt     ("wavelet-transformation", new wavelettransformation (), 5)         .shufflegrouping("filter-image"); //builder.setbolt     ("pnmstd", new pnmstd (), 5 ). //    shufflegrouping ("wavelet-transformation"); 

i want implement code on storm :

public static double[][] pnmstd(string ms,double []mp, double []mn,string trainingsetfile,string wvdecfiledir,             string testannotationlevelfromoronly,int testannotationid,int testannotationlevel)     {     bufferedreader infile;     int psamples=0;     int nsamples=0;     int n=0;     try {         infile=new bufferedreader(                 new filereader(trainingsetfile));         string strsqls=infile.readline();         string tset[]=strsqls.split(", ");         string datafile=wvdecfiledir+"/e"+tset[1]+"_1.dat";         double c[]=fileio.xread(datafile);         n=c.length;         infile.close();     }     catch (filenotfoundexception e)     {         e.printstacktrace();     }     catch (ioexception e)     {         e.printstacktrace();     }     double []msp=new double[n];     double []msn=new double[n];     double [][] meanstd=new double[2][n];     for(int i=0;i<n;i++)     {         msp[i]=0;         msn[i]=0;     }     try {         infile=new bufferedreader(                 new filereader(trainingsetfile));         string strsqls;         while ((strsqls=infile.readline() )!= null){             //string strsqls=infile.readline();             //system.out.println(strsqls);             string tset[]=strsqls.split(", ");             string datafile=wvdecfiledir+"/e"+tset[1]+"_1.dat";             double c[]=fileio.xread(datafile);               if(testannotationlevelfromoronly.equals("from"))             {                 if (integer.parseint(tset[testannotationid+4])>=testannotationlevel)                 {                     for(int i=0;i<c.length;i++)                     {                         if (ms.equals("mean")){                         msp[i]=msp[i]+c[i];}                         else{msp[i]=msp[i]+(c[i]-mp[i])*(c[i]-mp[i]);                         }                     }                     psamples++;                 }                 else                 {                     for(int i=0;i<c.length;i++)                     {                         if (ms.equals("mean")){msn[i]=msn[i]+c[i];}                         else{msn[i]=msn[i]+(c[i]-mn[i])*(c[i]-mn[i]);}                      }                     nsamples++;                 }             }             else             {                 if (integer.parseint(tset[testannotationid+4])==testannotationlevel)                 {                     for(int i=0;i<c.length;i++)                     {if (ms.equals("mean")){msp[i]=msp[i]+c[i];}                     else{msp[i]=msp[i]+(c[i]-mp[i])*(c[i]-mp[i]);}                     }                     psamples++;                 }                 else                 {                     for(int i=0;i<c.length;i++)                     {                         if (ms.equals("mean")){msn[i]=msn[i]+c[i];}                         else{msn[i]=msn[i]+(c[i]-mn[i])*(c[i]-mn[i]);}                     }                     nsamples++;                 }             }            }         infile.close();     }     catch (filenotfoundexception e)     {       e.printstacktrace();     }      catch (ioexception e)     {       e.printstacktrace();            }        for(int i=1;i<n;i++)          {              meanstd[0][i]=msp[i]/psamples;              meanstd[1][i]=msn[i]/psamples; // meaningless should asked!!!          }           return meanstd; } 

for function work, need files created. , previous bolt processing , creating files. , next bolt need bolt finish process on inputs. (so can use output in next bolt.)


Comments

Popular posts from this blog

javascript - Jquery show_hide, what to add in order to make the page scroll to the bottom of the hidden field once button is clicked -

python - Django-cities exits with "killed" -

python - How to get a widget position inside it's layout in Kivy? -