Scala Spark SQLContext Program throwing array out of bound exception -


i new apache spark. trying create schema , load data hdfs. below code:

// importing sqlcontext   val sqlcontext = new org.apache.spark.sql.sqlcontext(sc) import sqlcontext.createschemardd  //defining schema case class author1(author_key: long, author_id: long, author: string, first_name: string, last_name: string, middle_name: string, full_name: string, institution_full_name: string, country: string, dias_id: int, r_id: string)  val d_authors1 =    sc.textfile("hdfs:///user/d_authors.txt")   .map(_.split("\\|"))   .map(auth => author1(auth(0).trim.tolong, auth(1).trim.tolong, auth(2), auth(3), auth(4), auth(5), auth(6), auth(7), auth(8), auth(9).trim.toint, auth(10)))  //register table d_authors1.registerastable("d_authors1") val auth = sqlcontext.sql("select * d_authors1") sqlcontext.sql("select * d_authors").collect().foreach(println) 

when executing code throwing array out of bound exception. below error:

    14/08/18 06:57:14 info analyzer: max iterations (2) reached batch multiinstancerelations     14/08/18 06:57:14 info analyzer: max iterations (2) reached batch caseinsensitiveattributereferences     14/08/18 06:57:14 info sqlcontext$$anon$1: max iterations (2) reached batch add exchange     14/08/18 06:57:14 info sqlcontext$$anon$1: max iterations (2) reached batch prepare expressions     14/08/18 06:57:14 info fileinputformat: total input paths process : 1     14/08/18 06:57:14 info sparkcontext: starting job: collect @ <console>:24     14/08/18 06:57:14 info dagscheduler: got job 5 (collect @ <console>:24) 2 output partitions (allowlocal=false)     14/08/18 06:57:14 info dagscheduler: final stage: stage 5(collect @ <console>:24)     14/08/18 06:57:14 info dagscheduler: parents of final stage: list()     14/08/18 06:57:14 info dagscheduler: missing parents: list()     14/08/18 06:57:14 info dagscheduler: submitting stage 5 (schemardd[26] @ rdd @ schemardd.scala:98     == query plan ==     existingrdd [author_key#22l,author_id#23l,author#24,first_name#25,last_name#26,middle_name#27,full_name#28,institution_full_name#29,country#30,dias_id#31,r_id#32], mappartitionsrdd[23] @ mappartitions @ basicoperators.scala:174), has no missing parents     14/08/18 06:57:14 info dagscheduler: submitting 2 missing tasks stage 5 (schemardd[26] @ rdd @ schemardd.scala:98     == query plan ==     existingrdd [author_key#22l,author_id#23l,author#24,first_name#25,last_name#26,middle_name#27,full_name#28,institution_full_name#29,country#30,dias_id#31,r_id#32], mappartitionsrdd[23] @ mappartitions @ basicoperators.scala:174)     14/08/18 06:57:14 info yarnclientclusterscheduler: adding task set 5.0 2 tasks     14/08/18 06:57:14 info tasksetmanager: starting task 5.0:0 tid 38 on executor 1: orf-bat.int..com (node_local)     14/08/18 06:57:14 info tasksetmanager: serialized task 5.0:0 4401 bytes in 1 ms     14/08/18 06:57:15 info tasksetmanager: starting task 5.0:1 tid 39 on executor 1: orf-bat.int..com (node_local)     14/08/18 06:57:15 info tasksetmanager: serialized task 5.0:1 4401 bytes in 0 ms     14/08/18 06:57:15 warn tasksetmanager: lost tid 38 (task 5.0:0)     14/08/18 06:57:15 warn tasksetmanager: loss due java.lang.arrayindexoutofboundsexception     java.lang.arrayindexoutofboundsexception: 10             @ $line39.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$2.apply(<console>:27)             @ $line39.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$2.apply(<console>:27)             @ scala.collection.iterator$$anon$11.next(iterator.scala:328)             @ scala.collection.iterator$$anon$1.next(iterator.scala:853)             @ scala.collection.iterator$$anon$1.head(iterator.scala:840)             @ org.apache.spark.sql.execution.existingrdd$$anonfun$producttorowrdd$1.apply(basicoperators.scala:179)             @ org.apache.spark.sql.execution.existingrdd$$anonfun$producttorowrdd$1.apply(basicoperators.scala:174)             @ org.apache.spark.rdd.rdd$$anonfun$12.apply(rdd.scala:559)             @ org.apache.spark.rdd.rdd$$anonfun$12.apply(rdd.scala:559)             @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:35)             @ org.apache.spark.sql.schemardd.compute(schemardd.scala:110)             @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:262)             @ org.apache.spark.rdd.rdd.iterator(rdd.scala:229)             @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:111)             @ org.apache.spark.scheduler.task.run(task.scala:51)             @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:187)             @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)             @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)             @ java.lang.thread.run(thread.java:745)     14/08/18 06:57:15 warn tasksetmanager: lost tid 39 (task 5.0:1)     14/08/18 06:57:15 warn tasksetmanager: loss due java.lang.arrayindexoutofboundsexception     java.lang.arrayindexoutofboundsexception: 9             @ $line39.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$2.apply(<console>:27)             @ $line39.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$2.apply(<console>:27)             @ scala.collection.iterator$$anon$11.next(iterator.scala:328)             @ scala.collection.iterator$$anon$1.next(iterator.scala:853)             @ scala.collection.iterator$$anon$1.head(iterator.scala:840)             @ org.apache.spark.sql.execution.existingrdd$$anonfun$producttorowrdd$1.apply(basicoperators.scala:179)             @ org.apache.spark.sql.execution.existingrdd$$anonfun$producttorowrdd$1.apply(basicoperators.scala:174)             @ org.apache.spark.rdd.rdd$$anonfun$12.apply(rdd.scala:559)             @ org.apache.spark.rdd.rdd$$anonfun$12.apply(rdd.scala:559)             @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:35)             @ org.apache.spark.sql.schemardd.compute(schemardd.scala:110)             @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:262)             @ org.apache.spark.rdd.rdd.iterator(rdd.scala:229)             @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:111)             @ org.apache.spark.scheduler.task.run(task.scala:51)             @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:187)             @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)             @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)             @ java.lang.thread.run(thread.java:745) 

your problem has nothing spark.

  1. format code correctly (i have corrected)
  2. don't mix camel & underscore naming - use underscore sql fields, use camel scala vals,
  3. when exception read it tells doing wrong, in case it's of records in hdfs:///user/d_authors.txt not how expect them
  4. when exception debug it, try catching exception , printing out records fail parse
  5. _.split("\\|") ignores empty leading , trailing strings, use _.split("\\|", -1)
  6. in scala don't need magic numbers manually access elements of array, it's ugly , more prone error, use pattern match ...

here simple example which includes unusual record handling!:

case class author(author: string, authorage: int)  mydata.map(_.split("\t", -1) match {   case array(author, authorage) => author(author, authorage.toint)   case unexpectedarrayform =>      throw new runtimeexception("record did not have correct number of fields: " +       unexpectedarrayform.mkstring("\t")) }) 

now if coded this, exception tell straight away wrong data.

one final point/concern; why using spark sql? data in text form, trying transform into, say, parquet? if not, why not use regular scala api perform analysis, it's type checked , compile checked, unlike sql.


Comments

Popular posts from this blog

javascript - Jquery show_hide, what to add in order to make the page scroll to the bottom of the hidden field once button is clicked -

python - Django-cities exits with "killed" -

python - How to get a widget position inside it's layout in Kivy? -