IT虾米网

KeyValue对RDDs(一)

xmjava 2022年06月19日 大数据 270 0
一 创建KeyValue对RDDs
使用map()函数,返回key/value
例如,包含数行数据的RDD,把每行数据的第一个单词作为keys
scala> val rdd=sc.textFile("/root/helloSpark.txt") 
rdd: org.apache.spark.rdd.RDD[String] = /root/helloSpark.txt MapPartitionsRDD[1] at textFile at <console>:24 
scala> rdd.collect() 
res0: Array[String] = Array(go to home hello java, so many to hello word kafka java, go to so) 
scala> val rdd2=rdd.map(line=>(line.split(" ")(0),line)) 
rdd2: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at map at <console>:26 
scala> rdd2.foreach(println) 
(go,go to home hello java) 
(so,so many to hello word kafka java) 
(go,go to so)

二 常用操作
KeyValue对RDDs的Transformation(example:{(1,2),(3,4)(3,6)}
scala> val rdd3=sc.parallelize(Array((1,2),(3,4),(3,6))) 
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24 
scala> rdd3.foreach(println) 
(1,2) 
(3,4) 
(3,6) 
scala> val rdd4=rdd3.reduceByKey((x,y)=>x+y) 
rdd4: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[5] at reduceByKey at <console>:26 
scala> rdd4.foreach(println) 
(1,2) 
(3,10) 
scala> val rdd5=rdd3.groupByKey() 
rdd5: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupByKey at <console>:26 
scala> rdd5.foreach(println) 
(1,CompactBuffer(2)) 
(3,CompactBuffer(4, 6)) 
scala> val rdd6=rdd3.keys 
rdd6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at keys at <console>:26 
scala> rdd6.foreach(println) 
1 
3 
3 
scala> val rdd7=rdd3.values 
rdd7: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at values at <console>:26 
scala> rdd7.foreach(println) 
2 
4 
6 
scala> val rdd8=rdd3.sortByKey() 
rdd8: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[9] at sortByKey at <console>:26 
scala> rdd8.foreach(println) 
(1,2) 
(3,4) 
(3,6)



评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

RDDs的特性