IT虾米网

SparkSQL---实战应用详解

qq123 2018年07月04日 大数据 106 0

SparkSQL---实战应用

 

数据集 :http://grouplens.org/datasets/movielens/ MovieLens 1M Datase

相关数据文件 :

users.dat ---UserID::Gender::Age::Occupation::Zip-code

movies.dat --- MovieID::Title::Genres

ratings.dat ---UserID::MovieID::Rating::Timestamp

SogouQ.mini

完成以下业务需求:

1. 年龄段在“18-24”的男性年轻人,最喜欢看哪10部

2.得分最高的10部电影;看过电影最多的前10个人;女性看多最多的10部电影;男性看过最多 的10部电影

3.利用数据集SogouQ2012.mini.tar.gz 将数据按照访问次数进行排序,求访问量前10的网站

 

代码如下:

import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.Dataset 
 
 
object hw_SparkSql { 
    case class User(uid: String, xb: String,age:Int,V4:String,V5:String) 
    case class Movie(mid:String,name:String,t:String) 
    case class Rating(uid: String, mid: String,V3:Double,V4:String) 
    case class Brower(V1: String, V2: String,V3:String,V4:String,V5:String,V6:String) 
 
    def main(args: Array[String]): Unit = { 
 
    val conf = new SparkConf().setAppName("ReadJSON").setMaster("local").set("spark.executor.memory","50g").set("spark.driver.maxResultSize","50g") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 
    
  //隐式转换 import sqlContext.implicits._ val UserInfo = sc.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\data\\data\\users.dat").map(_.split("::")).map(p => User(p(0), p(1),p(2).trim().toInt,p(3),p(4))).toDF() UserInfo.registerTempTable("User") val MovieInfo = sc.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\data\\data\\movies.dat").map(_.split("::")).map(p => Movie(p(0),p(1),p(2))).toDF() MovieInfo.registerTempTable("Movie") val RatingsInfo = sc.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\data\\data\\ratings.dat").map(_.split("::")).map(p => Rating(p(0), p(1),p(2).toDouble,p(3))).toDF() RatingsInfo.registerTempTable("Rating") val BrowerInfo = sc.textFile("C:\\Users\\BIGDATA\\Desktop\\文件\\BigData\\Spark\\3.SparkCore_2\\SogouQ2012.mini\\SogouQ.mini").map(_.split("\t")).map(p =>Brower(p(0), p(1),p(2),p(3),p(4),p(5))).toDF() BrowerInfo.registerTempTable("Brower") //年龄段在“18-24”的男性年轻人,最喜欢看哪10部 val top10_M_18_24 = sqlContext.sql("select x.n as name,count(*) as count from ( select distinct Rating.mid as m, Rating.uid as u, Movie.name as n FROM Rating,User,Movie WHERE User.age>=18 and User.age<=24 and User.xb=\"M\" and User.uid=Rating.uid and Movie.mid=Rating.mid)as x group by x.n order by count desc ") top10_M_18_24.show(10) //看过电影最多的前10个人 val top10_pepole= sqlContext.sql("select uid,count(uid)as count from Rating group by uid order by count desc"); top10_pepole.show(10); //得分最高的10部电影 val top10M_score=sqlContext.sql("select mid,(sum(V3)/count(V3)) as av from Rating group by mid order by av desc") top10M_score.show(10) //女性看的最多的10部电影 val top10_Female = sqlContext.sql("select x.n,count(*) as c from ( select distinct Rating.mid as m, Rating.uid as u, Movie.name as n FROM Rating,User,Movie WHERE User.xb=\"F\" and User.uid=Rating.uid and Movie.mid=Rating.mid)as x group by x.n order by c desc ") top10_Female.show(10) //男性看的最多的10部电影 val top10_Male = sqlContext.sql("select x.n,count(*) as c from ( select distinct Rating.mid as m, Rating.uid as u, Movie.name as n FROM Rating,User,Movie WHERE User.xb=\"M\" and User.uid=Rating.uid and Movie.mid=Rating.mid)as x group by x.n order by c desc ") top10_Male.show(10) //访问量前10的网站 val Top10_brower = sqlContext.sql("select V6 as name,count(*) as count from Brower group by V6 order by count desc ") Top10_brower.show(10) } }

  

发布评论

分享到:

IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

Spark Streaming中的操作函数讲解详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。