Scala Spark SQLContext Program throwing array out of bound exception -
i new apache spark. trying create schema , load data hdfs. below code:
// importing sqlcontext val sqlcontext = new org.apache.spark.sql.sqlcontext(sc) import sqlcontext.createschemardd //defining schema case class author1(author_key: long, author_id: long, author: string, first_name: string, last_name: string, middle_name: string, full_name: string, institution_full_name: string, country: string, dias_id: int, r_id: string) val d_authors1 = sc.textfile("hdfs:///user/d_authors.txt") .map(_.split("\\|")) .map(auth => author1(auth(0).trim.tolong, auth(1).trim.tolong, auth(2), auth(3), auth(4), auth(5), auth(6), auth(7), auth(8), auth(9).trim.toint, auth(10))) //register table d_authors1.registerastable("d_authors1") val auth = sqlcontext.sql("select * d_authors1") sqlcontext.sql("select * d_authors").collect().foreach(println)
when executing code throwing array out of bound exception. below error:
14/08/18 06:57:14 info analyzer: max iterations (2) reached batch multiinstancerelations 14/08/18 06:57:14 info analyzer: max iterations (2) reached batch caseinsensitiveattributereferences 14/08/18 06:57:14 info sqlcontext$$anon$1: max iterations (2) reached batch add exchange 14/08/18 06:57:14 info sqlcontext$$anon$1: max iterations (2) reached batch prepare expressions 14/08/18 06:57:14 info fileinputformat: total input paths process : 1 14/08/18 06:57:14 info sparkcontext: starting job: collect @ <console>:24 14/08/18 06:57:14 info dagscheduler: got job 5 (collect @ <console>:24) 2 output partitions (allowlocal=false) 14/08/18 06:57:14 info dagscheduler: final stage: stage 5(collect @ <console>:24) 14/08/18 06:57:14 info dagscheduler: parents of final stage: list() 14/08/18 06:57:14 info dagscheduler: missing parents: list() 14/08/18 06:57:14 info dagscheduler: submitting stage 5 (schemardd[26] @ rdd @ schemardd.scala:98 == query plan == existingrdd [author_key#22l,author_id#23l,author#24,first_name#25,last_name#26,middle_name#27,full_name#28,institution_full_name#29,country#30,dias_id#31,r_id#32], mappartitionsrdd[23] @ mappartitions @ basicoperators.scala:174), has no missing parents 14/08/18 06:57:14 info dagscheduler: submitting 2 missing tasks stage 5 (schemardd[26] @ rdd @ schemardd.scala:98 == query plan == existingrdd [author_key#22l,author_id#23l,author#24,first_name#25,last_name#26,middle_name#27,full_name#28,institution_full_name#29,country#30,dias_id#31,r_id#32], mappartitionsrdd[23] @ mappartitions @ basicoperators.scala:174) 14/08/18 06:57:14 info yarnclientclusterscheduler: adding task set 5.0 2 tasks 14/08/18 06:57:14 info tasksetmanager: starting task 5.0:0 tid 38 on executor 1: orf-bat.int..com (node_local) 14/08/18 06:57:14 info tasksetmanager: serialized task 5.0:0 4401 bytes in 1 ms 14/08/18 06:57:15 info tasksetmanager: starting task 5.0:1 tid 39 on executor 1: orf-bat.int..com (node_local) 14/08/18 06:57:15 info tasksetmanager: serialized task 5.0:1 4401 bytes in 0 ms 14/08/18 06:57:15 warn tasksetmanager: lost tid 38 (task 5.0:0) 14/08/18 06:57:15 warn tasksetmanager: loss due java.lang.arrayindexoutofboundsexception java.lang.arrayindexoutofboundsexception: 10 @ $line39.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$2.apply(<console>:27) @ $line39.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$2.apply(<console>:27) @ scala.collection.iterator$$anon$11.next(iterator.scala:328) @ scala.collection.iterator$$anon$1.next(iterator.scala:853) @ scala.collection.iterator$$anon$1.head(iterator.scala:840) @ org.apache.spark.sql.execution.existingrdd$$anonfun$producttorowrdd$1.apply(basicoperators.scala:179) @ org.apache.spark.sql.execution.existingrdd$$anonfun$producttorowrdd$1.apply(basicoperators.scala:174) @ org.apache.spark.rdd.rdd$$anonfun$12.apply(rdd.scala:559) @ org.apache.spark.rdd.rdd$$anonfun$12.apply(rdd.scala:559) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:35) @ org.apache.spark.sql.schemardd.compute(schemardd.scala:110) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:262) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:229) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:111) @ org.apache.spark.scheduler.task.run(task.scala:51) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:187) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) 14/08/18 06:57:15 warn tasksetmanager: lost tid 39 (task 5.0:1) 14/08/18 06:57:15 warn tasksetmanager: loss due java.lang.arrayindexoutofboundsexception java.lang.arrayindexoutofboundsexception: 9 @ $line39.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$2.apply(<console>:27) @ $line39.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$2.apply(<console>:27) @ scala.collection.iterator$$anon$11.next(iterator.scala:328) @ scala.collection.iterator$$anon$1.next(iterator.scala:853) @ scala.collection.iterator$$anon$1.head(iterator.scala:840) @ org.apache.spark.sql.execution.existingrdd$$anonfun$producttorowrdd$1.apply(basicoperators.scala:179) @ org.apache.spark.sql.execution.existingrdd$$anonfun$producttorowrdd$1.apply(basicoperators.scala:174) @ org.apache.spark.rdd.rdd$$anonfun$12.apply(rdd.scala:559) @ org.apache.spark.rdd.rdd$$anonfun$12.apply(rdd.scala:559) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:35) @ org.apache.spark.sql.schemardd.compute(schemardd.scala:110) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:262) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:229) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:111) @ org.apache.spark.scheduler.task.run(task.scala:51) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:187) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745)
your problem has nothing spark.
- format code correctly (i have corrected)
- don't mix camel & underscore naming - use underscore sql fields, use camel scala vals,
- when exception read it tells doing wrong, in case it's of records in
hdfs:///user/d_authors.txt
not how expect them - when exception debug it, try catching exception , printing out records fail parse
_.split("\\|")
ignores empty leading , trailing strings, use_.split("\\|", -1)
- in scala don't need magic numbers manually access elements of array, it's ugly , more prone error, use pattern match ...
here simple example which includes unusual record handling!:
case class author(author: string, authorage: int) mydata.map(_.split("\t", -1) match { case array(author, authorage) => author(author, authorage.toint) case unexpectedarrayform => throw new runtimeexception("record did not have correct number of fields: " + unexpectedarrayform.mkstring("\t")) })
now if coded this, exception tell straight away wrong data.
one final point/concern; why using spark sql? data in text form, trying transform into, say, parquet? if not, why not use regular scala api perform analysis, it's type checked , compile checked, unlike sql.
Comments
Post a Comment