一 创建KeyValue对RDDs
使用map()函数,返回key/value
例如,包含数行数据的RDD,把每行数据的第一个单词作为keys
scala> val rdd=sc.textFile("/root/helloSpark.txt")
rdd: org.apache.spark.rdd.RDD[String] = /root/helloSpark.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd.collect()
res0: Array[String] = Array(go to home hello java, so many to hello word kafka java, go to so)
scala> val rdd2=rdd.map(line=>(line.split(" ")(0),line))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at map at <console>:26
scala> rdd2.foreach(println)
(go,go to home hello java)
(so,so many to hello word kafka java)
(go,go to so)
二 常用操作
KeyValue对RDDs的Transformation(example:{(1,2),(3,4)(3,6)}


scala> val rdd3=sc.parallelize(Array((1,2),(3,4),(3,6)))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd3.foreach(println)
(1,2)
(3,4)
(3,6)
scala> val rdd4=rdd3.reduceByKey((x,y)=>x+y)
rdd4: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[5] at reduceByKey at <console>:26
scala> rdd4.foreach(println)
(1,2)
(3,10)
scala> val rdd5=rdd3.groupByKey()
rdd5: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupByKey at <console>:26
scala> rdd5.foreach(println)
(1,CompactBuffer(2))
(3,CompactBuffer(4, 6))
scala> val rdd6=rdd3.keys
rdd6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at keys at <console>:26
scala> rdd6.foreach(println)
1
3
3
scala> val rdd7=rdd3.values
rdd7: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at values at <console>:26
scala> rdd7.foreach(println)
2
4
6
scala> val rdd8=rdd3.sortByKey()
rdd8: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[9] at sortByKey at <console>:26
scala> rdd8.foreach(println)
(1,2)
(3,4)
(3,6)