新闻网站关键指标离线统计
背景
新闻网站
版块
新闻页面
新用户注册
用户跳出
需求分析
每天每个页面的PV:PV是Page View,是指一个页面被所有用户访问次数的总和,页面被访问一次就被记录1次PV
每天每个页面的UV:UV是User View,是指一个页面被多少个用户访问了,一个用户访问一次是1次UV,一个用户访问多次还是1次UV
新用户注册比率:当天注册用户数 / 当天未注册用户数
用户跳出率:IP只浏览了一个页面就离开网站的次数/网站总访问数(PV)
版块热度排行榜:根据每个版块每天被访问的次数,做出一个排行榜
网站日志格式
date timestamp userid pageid section action ...
date: 日期,yyyy-MM-dd格式
timestamp: 时间戳
userid: 用户id
pageid: 页面id
section: 新闻版块
action: 用户行为,两类,点击页面和注册
模拟数据生成程序模式数据演示
在hive中创建访问日志表
create table news_access ( date string, timestamp bigint, userid bigint, pageid bigint, section string, action string)
将模拟数据导入hive表中
load data local inpath '/usr/local/test/news_access.log' into table news_access;
案例代码
package cn.spark.study.sql.upgrade; import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.hive.HiveContext; /** * 新闻网站关键指标离线统计Spark作业 * @author Administrator * */ public class NewsOfflineStatSpark { public static void main(String[] args) { // 拿到昨天的日期,去hive表中,针对昨天的数据执行SQL语句 String yesterday = "2016-02-20"; // String yesterday = getYesterday(); // 创建SparkConf以及Spark上下文 SparkConf conf = new SparkConf() .setAppName("NewsOfflineStatSpark") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); // 开发第一个关键指标:页面pv统计以及排序 calculateDailyPagePv(hiveContext, yesterday); // 开发第二个关键指标:页面uv统计以及排序 calculateDailyPageUv(hiveContext, yesterday); // 开发第三个关键指标:新用户注册比率统计 calculateDailyNewUserRegisterRate(hiveContext, yesterday); // 开发第四个关键指标:用户跳出率统计 calculateDailyUserJumpRate(hiveContext, yesterday); // 开发第五个关键指标:版块热度排行榜 calculateDailySectionPvSort(hiveContext, yesterday); // 关闭Spark上下文 sc.close(); } /** * 获取昨天的字符串类型的日期 * @return 日期 */ private static String getYesterday() { Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); cal.add(Calendar.DAY_OF_YEAR, -1); Date yesterday = cal.getTime(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); return sdf.format(yesterday); } /** * 计算每天每个页面的pv以及排序 * 排序的好处:排序后,插入mysql,java web系统要查询每天pv top10的页面,直接查询mysql表limit 10就可以 * 如果我们这里不排序,那么java web系统就要做排序,反而会影响java web系统的性能,以及用户响应时间 */ private static void calculateDailyPagePv( HiveContext hiveContext, String date) { String sql = "SELECT " + "date," + "pageid," + "pv " + "FROM ( " + "SELECT " + "date," + "pageid," + "count(*) pv " + "FROM news_access " + "WHERE action='view' " + "AND date='" + date + "' " + "GROUP BY date,pageid " + ") t " + "ORDER BY pv DESC "; DataFrame df = hiveContext.sql(sql); // 在这里,我们也可以转换成一个RDD,然后对RDD执行一个foreach算子 // 在foreach算子中,将数据写入mysql中 df.show(); } /** * 计算每天每个页面的uv以及排序 * Spark SQL的count(distinct)语句,有bug,默认会产生严重的数据倾斜 * 只会用一个task,来做去重和汇总计数,性能很差 * @param hiveContext * @param date */ private static void calculateDailyPageUv( HiveContext hiveContext, String date) { String sql = "SELECT " + "date," + "pageid," + "uv " + "FROM ( " + "SELECT " + "date," + "pageid," + "count(*) uv " + "FROM ( " + "SELECT " + "date," + "pageid," + "userid " + "FROM news_access " + "WHERE action='view' " + "AND date='" + date + "' " + "GROUP BY date,pageid,userid " + ") t2 " + "GROUP BY date,pageid " + ") t " + "ORDER BY uv DESC "; DataFrame df = hiveContext.sql(sql); df.show(); } /** * 计算每天的新用户注册比例 * @param hiveContext * @param date */ private static void calculateDailyNewUserRegisterRate( HiveContext hiveContext, String date) { // 昨天所有访问行为中,userid为null,新用户的访问总数 String sql1 = "SELECT count(*) FROM news_access WHERE action='view' AND date='" + date + "' AND userid IS NULL"; // 昨天的总注册用户数 String sql2 = "SELECT count(*) FROM news_access WHERE action='register' AND date='" + date + "' "; // 执行两条SQL,获取结果 Object result1 = hiveContext.sql(sql1).collect()[0].get(0); long number1 = 0L; if(result1 != null) { number1 = Long.valueOf(String.valueOf(result1)); } Object result2 = hiveContext.sql(sql2).collect()[0].get(0); long number2 = 0L; if(result2 != null) { number2 = Long.valueOf(String.valueOf(result2)); } // 计算结果 System.out.println("======================" + number1 + "======================"); System.out.println("======================" + number2 + "======================"); double rate = (double)number2 / (double)number1; System.out.println("======================" + formatDouble(rate, 2) + "======================"); } /** * 计算每天的用户跳出率 * @param hiveContext * @param date */ private static void calculateDailyUserJumpRate( HiveContext hiveContext, String date) { // 计算已注册用户的昨天的总的访问pv String sql1 = "SELECT count(*) FROM news_access WHERE action='view' AND date='" + date + "' AND userid IS NOT NULL "; // 已注册用户的昨天跳出的总数 String sql2 = "SELECT count(*) FROM ( SELECT count(*) cnt FROM news_access WHERE action='view' AND date='" + date + "' AND userid IS NOT NULL GROUP BY userid HAVING cnt=1 ) t "; // 执行两条SQL,获取结果 Object result1 = hiveContext.sql(sql1).collect()[0].get(0); long number1 = 0L; if(result1 != null) { number1 = Long.valueOf(String.valueOf(result1)); } Object result2 = hiveContext.sql(sql2).collect()[0].get(0); long number2 = 0L; if(result2 != null) { number2 = Long.valueOf(String.valueOf(result2)); } // 计算结果 System.out.println("======================" + number1 + "======================"); System.out.println("======================" + number2 + "======================"); double rate = (double)number2 / (double)number1; System.out.println("======================" + formatDouble(rate, 2) + "======================"); } /** * 计算每天的版块热度排行榜 * @param hiveContext * @param date */ private static void calculateDailySectionPvSort( HiveContext hiveContext, String date) { String sql = "SELECT " + "date," + "section," + "pv " + "FROM ( " + "SELECT " + "date," + "section," + "count(*) pv " + "FROM news_access " + "WHERE action='view' " + "AND date='" + date + "' " + "GROUP BY date,section " + ") t " + "ORDER BY pv DESC "; DataFrame df = hiveContext.sql(sql); df.show(); } /** * 格式化小数 * @param str 字符串 * @param scale 四舍五入的位数 * @return 格式化小数 */ private static double formatDouble(double num, int scale) { BigDecimal bd = new BigDecimal(num); return bd.setScale(scale, BigDecimal.ROUND_HALF_UP).doubleValue(); } }