数据分析实战

pyspark分析源码

%pyspark
def map_nginx(value):
    value = value.split('\t')[2]
    return eval(value)
def add_two_value(a,b):
    return a+b
def map_pv(value):
    return (value['path'],1)
def uv_combine_path_with_cookie(value):
    return '\t'.join((value['path'],value['agent']))
def uv_revert_path(value):
    return (value.split('\t')[0],1)

rdd = sc.textFile("hdfs://localhost:9000/tmp/access.log").map(map_nginx)
rdd.cache()

rdd_pv = rdd.map(map_pv)
result_pv = rdd_pv.reduceByKey(add_two_value) 
result_uv = rdd.map(uv_combine_path_with_cookie)\
                .distinct()\
                .map(uv_revert_path)\
                .reduceByKey(add_two_value)
result = result_pv.join(result_uv).map(lambda x:(x[0],x[1][0],x[1][1]))

sqlContext.createDataFrame(result,['path','pv','uv']).registerTempTable("result")

Spark-SQL源码

%sql
select * from result

在zeppelin运行结果如下:


最后用数据流来解释一下PV/UV分析的算法,如果用Hadoop我们可能最少需要50行代码,但是用pyspark我们可以最简浓缩到2行

我们知道nginx原生日志格式是这样的

172.17.9.199 - - [05/Apr/2016:12:04:55 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.110 Safari/537.36" "-"

fluntd帮我们吐到HDFS时是这样的

2016-04-05T04:04:55Z    nginx.access    {"remote_addr":"172.17.9.199","time_local":"05/Apr/2016:12:04:55 +0800","method":"GET","path":"/","status":"304","size":"0","refer":"-","agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.110 Safari/537.36","forward":"-"}

它帮我们做了序列化,用\t分隔,我们对原始数据做一次map,去除fluntd冗余数据,就得到了我们想要的数据

rdd = sc.textFile("hdfs://localhost:9000/tmp/access.log").map(lambda x:eval(x.split('\t')[2]))
print rcc.collect()
[{'path':'/','agent':'a'},{'path':'/','agent':'a'},{'path':'/','agent':'b'},...]

数据元素是一个字典,接下来我们算法分析主要关心agent和path,我们跳过其他数据,将数据简化来描述。

pv的计算比较简单,我们对path做一次map

tmp = rdd.map(lambda x:(x['path'],1))
print tmp.collect()
[('/',1),('/',1),('/',1)...]

做一次reduceByKey,我们就可以得到每个path的PV

result_pv = tmp.reduceByKey(lambda x,y:x+y)
print result_pv.collect()
[('/',3),...]

UV算法相对复杂一点,首先我们将agent和path拼接起来,由于后面我们拼完还要还原,而agent里面字符比较乱,我们要选一个不常见的字符来做拼接,这里我们用了\t

tmp = rdd.map(lambda x:'\t'.join((x['path'],x['agent'])))
print tmp.collect()
['/\ta','/\ta','/\tb',...]

做一次去重

tmp = tmp.distinct()
print tmp.collect()
['/\ta','/\tb',...]

再做一次map,这里我们就不需要agent了

tmp = tmp.map(lambda x:(x.split('\t')[0],1))
print tmp.collect()
[('/',1),('/',1)...]

做一下reduceByKey,UV就算出来了

result_uv = tmp.reduceByKey(lambda x,y:x+y)
print result_uv.collect()
[('/',2)...]

最后我们把代码整理一下,看看整体效果,最后2行与算法无关,只是把结果输出到DataFrame并保存到临时表方便查询

rdd = sc.textFile("hdfs://localhost:9000/tmp/access.log").map(lambda x:eval(x.split('\t')[2]))

result_pv = rdd.map(lambda x:(x['path'],1)).reduceByKey(lambda x,y:x+y)
result_uv =  rdd.map(lambda x:'\t'.join((x['path'],x['agent']))).distinct().map(lambda x:(x.split('\t')[0],1)).reduceByKey(lambda x,y:x+y)

sqlContext.createDataFrame(result_pv.join(result_uv).map(lambda x:(x[0],x[1][0],x[1][1])),['path','pv','uv']).registerTempTable("result")
sqlContext.sql("select * from result").show()

真正的算法是不是就2行代码?