编程那点事编程那点事

专注编程入门及提高
探究程序员职业规划之道!

与Spark Core整合之每日top3热点搜索词统计案例实战

案例需求

  • 数据格式:

    日期 用户 搜索词 城市 平台 版本

  • 需求:

  1. 筛选出符合查询条件(城市、平台、版本)的数据

  2. 统计出每天搜索uv排名前3的搜索词

  3. 按照每天的top3搜索词的uv搜索总次数,倒序排序

  4. 将数据保存到hive表中

实现思路分析

  • 针对原始数据(HDFS文件),获取输入的RDD

  • 使用filter算子,去针对输入RDD中的数据,进行数据过滤,过滤出符合查询条件的数据。

    • 普通的做法:直接在fitler算子函数中,使用外部的查询条件(Map),但是,这样做的话,是不是查询条件Map,会发送到每一个task上一份副本。(性能并不好)

    • 优化后的做法:将查询条件,封装为Broadcast广播变量,在filter算子中使用Broadcast广播变量进行数据筛选。

  • 将数据转换为“(日期_搜索词, 用户)”格式,然后呢,对它进行分组,然后再次进行映射,对每天每个搜索词的搜索用户进行去重操作,并统计去重后的数量,即为每天每个搜索词的uv。最后,获得“(日期_搜索词, uv)”

  • 将得到的每天每个搜索词的uv,RDD,映射为元素类型为Row的RDD,将该RDD转换为DataFrame

  • 将DataFrame注册为临时表,使用Spark SQL的开窗函数,来统计每天的uv数量排名前3的搜索词,以及它的搜索uv,最后获取,是一个DataFrame

  • 将DataFrame转换为RDD,继续操作,按照每天日期来进行分组,并进行映射,计算出每天的top3搜索词的搜索uv的总数,然后将uv总数作为key,将每天的top3搜索词以及搜索次数,拼接为一个字符串

  • 按照每天的top3搜索总uv,进行排序,倒序排序

  • 将排好序的数据,再次映射回来,变成“日期_搜索词_uv”的格式

  • 再次映射为DataFrame,并将数据保存到Hive中即可

实战

  • 用Java来实现,是因为整个案例过于复杂,如果再用Scala来实现的话,那么时间会耗费的很长,而且意义并不大

  • 而且,我们通过Java开讲解,已经把数据格式、需求、具体实现思路、如何优化(broatcast),都讲解的非常清晰了,而且开发过程中,也做了大量的讲解;相信大家通过目前为止的讲解,已经知道应该如何开发这个复杂的案例和类似的需求了

  • 所以,更好的一个做法,是将Scala版本实现,留给大家自己去做,作为课后作业

  • 实际上,如果大家之前能够掌握我们讲的所有的内容,应该完全,是可以用Scala开发出这个程序的

这个案例,是完全从实际企业需求改造了一点点,抽取出来的,完全企业级实战。这种需求在实际工作中,可能并不是某个大数据分析系统的模块。但是更多的是,PM或老大的需求,要求每个月,跑一次,统计上个月,每天搜索uv前3的热词。那么其实,你可以用crontab来定时调度该shell脚本,每个月1号跑一次,跑上个月的数据。只要在filter中,过滤数据,即可。

Java版本实例代码

package cn.spark.study.sql;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
/**
 * 每日top3热点搜索词统计案例
 * @author Administrator
 *
 */
public class DailyTop3Keyword {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("DailyTop3Keyword");  
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext sqlContext = new HiveContext(sc.sc());  
// 伪造出一份数据,查询条件
// 备注:实际上,在实际的企业项目开发中,很可能,这个查询条件,是通过J2EE平台插入到某个MySQL表中的
// 然后,这里呢,实际上,通常是会用Spring框架和ORM框架(MyBatis)的,去提取MySQL表中的查询条件
Map<String, List> queryParamMap = new HashMap<String, List>();
queryParamMap.put("city", Arrays.asList("beijing"));  
queryParamMap.put("platform", Arrays.asList("android"));  
queryParamMap.put("version", Arrays.asList("1.0", "1.2", "1.5", "2.0"));  
// 根据我们实现思路中的分析,这里最合适的方式,是将该查询参数Map封装为一个Broadcast广播变量
// 这样可以进行优化,每个Worker节点,就拷贝一份数据即可
final Broadcast<Map<String, List>> queryParamMapBroadcast = 
sc.broadcast(queryParamMap);
// 针对HDFS文件中的日志,获取输入RDD
JavaRDD rawRDD = sc.textFile("hdfs://spark1:9000/spark-study/keyword.txt"); 
// 使用查询参数Map广播变量,进行筛选
JavaRDD filterRDD = rawRDD.filter(new Function() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(String log) throws Exception {
// 切割原始日志,获取城市、平台和版本
String[] logSplited = log.split("\t");  
String city = logSplited[3];
String platform = logSplited[4];
String version = logSplited[5];
// 与查询条件进行比对,任何一个条件,只要该条件设置了,且日志中的数据没有满足条件
// 则直接返回false,过滤该日志
// 否则,如果所有设置的条件,都有日志中的数据,则返回true,保留日志
Map<String, List> queryParamMap = queryParamMapBroadcast.value();
List cities = queryParamMap.get("city");  
if(cities.size() > 0 && !cities.contains(city)) {
return false;
}
List platforms = queryParamMap.get("platform");  
if(platforms.size() > 0 && !platforms.contains(platform)) {
return false;
}
List versions = queryParamMap.get("version");  
if(versions.size() > 0 && !versions.contains(version)) {
return false;
}
return true;
}
});
// 过滤出来的原始日志,映射为(日期_搜索词, 用户)的格式
JavaPairRDD dateKeywordUserRDD = filterRDD.mapToPair(
new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String log) throws Exception {
String[] logSplited = log.split("\t");  
String date = logSplited[0];
String user = logSplited[1];
String keyword = logSplited[2];
return new Tuple2(date + "_" + keyword, user);
}
});
// 进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)
JavaPairRDD<String, Iterable> dateKeywordUsersRDD = dateKeywordUserRDD.groupByKey();
// 对每天每个搜索词的搜索用户,执行去重操作,获得其uv
JavaPairRDD dateKeywordUvRDD = dateKeywordUsersRDD.mapToPair(
new PairFunction<Tuple2<String,Iterable>, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(
Tuple2<String, Iterable> dateKeywordUsers) throws Exception {
String dateKeyword = dateKeywordUsers._1;
Iterator users = dateKeywordUsers._2.iterator();
// 对用户进行去重,并统计去重后的数量
List distinctUsers = new ArrayList();
while(users.hasNext()) {
String user = users.next();
if(!distinctUsers.contains(user)) {
distinctUsers.add(user);
}
}
// 获取uv
long uv = distinctUsers.size();
return new Tuple2(dateKeyword, uv);  
}
});
// 将每天每个搜索词的uv数据,转换成DataFrame
JavaRDD dateKeywordUvRowRDD = dateKeywordUvRDD.map(
new Function<Tuple2, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(Tuple2 dateKeywordUv) throws Exception {
String date = dateKeywordUv._1.split("_")[0];
String keyword = dateKeywordUv._1.split("_")[1];
long uv = dateKeywordUv._2;
return RowFactory.create(date, keyword, uv);
}
});
List structFields = Arrays.asList(
DataTypes.createStructField("date", DataTypes.StringType, true),
DataTypes.createStructField("keyword", DataTypes.StringType, true),
DataTypes.createStructField("uv", DataTypes.LongType, true));
StructType structType = DataTypes.createStructType(structFields);
DataFrame dateKeywordUvDF = sqlContext.createDataFrame(dateKeywordUvRowRDD, structType);
// 使用Spark SQL的开窗函数,统计每天搜索uv排名前3的热点搜索词
dateKeywordUvDF.registerTempTable("daily_keyword_uv");  
DataFrame dailyTop3KeywordDF = sqlContext.sql(""
+ "SELECT date,keyword,uv "
+ "FROM ("
+ "SELECT "
+ "date,"
+ "keyword,"
+ "uv,"
+ "row_number() OVER (PARTITION BY date ORDER BY uv DESC) rank "
+ "FROM daily_keyword_uv"  
+ ") tmp "
+ "WHERE rank<=3");  
// 将DataFrame转换为RDD,然后映射,计算出每天的top3搜索词的搜索uv总数
JavaRDD dailyTop3KeywordRDD = dailyTop3KeywordDF.javaRDD();
JavaPairRDD top3DateKeywordUvRDD = dailyTop3KeywordRDD.mapToPair(
new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Row row)
throws Exception {
String date = String.valueOf(row.get(0));  
String keyword = String.valueOf(row.get(1));  
Long uv = Long.valueOf(String.valueOf(row.get(2)));  
return new Tuple2(date, keyword + "_" + uv);
}
});
JavaPairRDD<String, Iterable> top3DateKeywordsRDD = top3DateKeywordUvRDD.groupByKey();
JavaPairRDD uvDateKeywordsRDD = top3DateKeywordsRDD.mapToPair(
new PairFunction<Tuple2<String,Iterable>, Long, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(
Tuple2<String, Iterable> tuple)
throws Exception {
String date = tuple._1;
Long totalUv = 0L;
String dateKeywords = date;  
Iterator keywordUvIterator = tuple._2.iterator();
while(keywordUvIterator.hasNext()) {
String keywordUv = keywordUvIterator.next();
Long uv = Long.valueOf(keywordUv.split("_")[1]);  
totalUv += uv;
dateKeywords += "," + keywordUv;
}
return new Tuple2(totalUv, dateKeywords);
}
});
// 按照每天的总搜索uv进行倒序排序
JavaPairRDD sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false);
// 再次进行映射,将排序后的数据,映射回原始的格式,IterableJavaRDD sortedRowRDD = sortedUvDateKeywordsRDD.flatMap(
new FlatMapFunction<Tuple2, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable call(Tuple2 tuple)
throws Exception {
String dateKeywords = tuple._2;
String[] dateKeywordsSplited = dateKeywords.split(",");  
String date = dateKeywordsSplited[0];
List rows = new ArrayList();
rows.add(RowFactory.create(date, 
dateKeywordsSplited[1].split("_")[0],
Long.valueOf(dateKeywordsSplited[1].split("_")[1]))); 
rows.add(RowFactory.create(date, 
dateKeywordsSplited[2].split("_")[0],
Long.valueOf(dateKeywordsSplited[2].split("_")[1]))); 
rows.add(RowFactory.create(date, 
dateKeywordsSplited[3].split("_")[0],
Long.valueOf(dateKeywordsSplited[3].split("_")[1]))); 
return rows;
}
});
// 将最终的数据,转换为DataFrame,并保存到Hive表中
DataFrame finalDF = sqlContext.createDataFrame(sortedRowRDD, structType);
finalDF.saveAsTable("daily_top3_keyword_uv");
sc.close();
}
}


未经允许不得转载: 技术文章 » 大数据 » 与Spark Core整合之每日top3热点搜索词统计案例实战