1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkRDD { private case class Score(classId: Int, name: String, age: Int, gender: String, subject: String, score: Int, _type: String) def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("spark_rdd") .setMaster("local[4]") val sc = SparkContext.getOrCreate(conf) val path = "file:///D:\\myOwnProject\\spark_first\\data\\scores.txt" val scores: RDD[Score] = sc.textFile(path, 4) .mapPartitionsWithIndex { case (index, iterator) => if (index == 0) iterator.drop(1) else iterator } .mapPartitions( _.map(line => { val a = line.split("\\s+") val age = a(2).toInt val _type = age match { case age if age > 20 => "GT20" case age if age == 20 => "EQ20" case age if age < 20 => "LT20" } Score(a(0).toInt, a(1), a(2).toInt, a(3), a(4), a(5).toInt, _type) }) ).cache() scores.foreach(println) sc.stop() } }
|