数据分析实战
pyspark分析源码
%pyspark
def map_nginx(value):
value = value.split('\t')[2]
return eval(value)
def add_two_value(a,b):
return a+b
def map_pv(value):
return (value['path'],1)
def uv_combine_path_with_cookie(value):
return '\t'.join((value['path'],value['agent']))
def uv_revert_path(value):
return (value.split('\t')[0],1)
rdd = sc.textFile("hdfs://localhost:9000/tmp/access.log").map(map_nginx)
rdd.cache()
rdd_pv = rdd.map(map_pv)
result_pv = rdd_pv.reduceByKey(add_two_value)
result_uv = rdd.map(uv_combine_path_with_cookie)\
.distinct()\
.map(uv_revert_path)\
.reduceByKey(add_two_value)
result = result_pv.join(result_uv).map(lambda x:(x[0],x[1][0],x[1][1]))
sqlContext.createDataFrame(result,['path','pv','uv']).registerTempTable("result")
Spark-SQL源码
%sql
select * from result
在zeppelin运行结果如下:
最后用数据流来解释一下PV/UV分析的算法,如果用Hadoop我们可能最少需要50行代码,但是用pyspark我们可以最简浓缩到2行
我们知道nginx原生日志格式是这样的
172.17.9.199 - - [05/Apr/2016:12:04:55 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.110 Safari/537.36" "-"
fluntd帮我们吐到HDFS时是这样的
2016-04-05T04:04:55Z nginx.access {"remote_addr":"172.17.9.199","time_local":"05/Apr/2016:12:04:55 +0800","method":"GET","path":"/","status":"304","size":"0","refer":"-","agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.110 Safari/537.36","forward":"-"}
它帮我们做了序列化,用\t分隔,我们对原始数据做一次map,去除fluntd冗余数据,就得到了我们想要的数据
rdd = sc.textFile("hdfs://localhost:9000/tmp/access.log").map(lambda x:eval(x.split('\t')[2]))
print rcc.collect()
[{'path':'/','agent':'a'},{'path':'/','agent':'a'},{'path':'/','agent':'b'},...]
数据元素是一个字典,接下来我们算法分析主要关心agent和path,我们跳过其他数据,将数据简化来描述。
pv的计算比较简单,我们对path做一次map
tmp = rdd.map(lambda x:(x['path'],1))
print tmp.collect()
[('/',1),('/',1),('/',1)...]
做一次reduceByKey,我们就可以得到每个path的PV
result_pv = tmp.reduceByKey(lambda x,y:x+y)
print result_pv.collect()
[('/',3),...]
UV算法相对复杂一点,首先我们将agent和path拼接起来,由于后面我们拼完还要还原,而agent里面字符比较乱,我们要选一个不常见的字符来做拼接,这里我们用了\t
tmp = rdd.map(lambda x:'\t'.join((x['path'],x['agent'])))
print tmp.collect()
['/\ta','/\ta','/\tb',...]
做一次去重
tmp = tmp.distinct()
print tmp.collect()
['/\ta','/\tb',...]
再做一次map,这里我们就不需要agent了
tmp = tmp.map(lambda x:(x.split('\t')[0],1))
print tmp.collect()
[('/',1),('/',1)...]
做一下reduceByKey,UV就算出来了
result_uv = tmp.reduceByKey(lambda x,y:x+y)
print result_uv.collect()
[('/',2)...]
最后我们把代码整理一下,看看整体效果,最后2行与算法无关,只是把结果输出到DataFrame并保存到临时表方便查询
rdd = sc.textFile("hdfs://localhost:9000/tmp/access.log").map(lambda x:eval(x.split('\t')[2]))
result_pv = rdd.map(lambda x:(x['path'],1)).reduceByKey(lambda x,y:x+y)
result_uv = rdd.map(lambda x:'\t'.join((x['path'],x['agent']))).distinct().map(lambda x:(x.split('\t')[0],1)).reduceByKey(lambda x,y:x+y)
sqlContext.createDataFrame(result_pv.join(result_uv).map(lambda x:(x[0],x[1][0],x[1][1])),['path','pv','uv']).registerTempTable("result")
sqlContext.sql("select * from result").show()
真正的算法是不是就2行代码?