推荐系统数据处理首先是将Hive中的用户app历史下载表与app浏览信息表按照设备id进行关联,然后将关联数据使用python文件进行处理,将数据预处理为label和feature两列的临时数据,后期经过处理转换成逻辑回归 模型的训练集,进而得到模型文件。
创建处理数据时所需要的临时表
1.CREATE TABLE IF NOT EXISTS tmp_dw_rcm_hitop_prepare2train_dm
2.(
3.device_id STRING,
4.label STRING,
5.hitop_id STRING,
6.screen STRING,
7.ch_name STRING,
8.author STRING,
9.sversion STRING,
10.mnc STRING,
11.interface STRING,
12.designer STRING,
13.is_safe INT,
14.icon_count INT,
15.update_date STRING,
16.stars DOUBLE,
17.comment_num INT,
18.font STRING,
19.price INT,
20.file_size INT,
21.ischarge SMALLINT,
22.dlnum INT,
23.idlist STRING,
24.device_name STRING,
25.pay_ability STRING
26.)row format delimited fields terminated by '\t';
最终保存训练集的表
1.CREATE TABLE IF NOT EXISTS dw_rcm_hitop_prepare2train_dm
2.(
3.label STRING,
4.features STRING
5.)row format delimited fields terminated by '\t';
首先将数据从正负例样本和用户历史下载表数据加载到临时表中:
1.INSERT OVERWRITE TABLE tmp_dw_rcm_hitop_prepare2train_dm
2. SELECT
3. t2.device_id,
4. t2.label,
5. t2.hitop_id,
6. t2.screen,
7. t2.ch_name,
8. t2.author,
9. t2.sversion,
10. t2.mnc,
11. t2.interface,
12. t2.designer,
13. t2.is_safe,
14. t2.icon_count,
15. to_date(t2.update_time),
16. t2.stars,
17. t2.comment_num,
18. t2.font,
19. t2.price,
20. t2.file_size,
21. t2.ischarge,
22. t2.dlnum,
23. t1.devid_applist,
24. t1.device_name,
25. t1.pay_ability
26.FROM
27. (
28. SELECT
29. device_id,
30. devid_applist,
31. device_name,
32. pay_ability
33. FROM
34. dw_rcm_hitop_userapps_dm
35. ) t1
36. RIGHT OUTER JOIN
37. (
38. SELECT
39. device_id,
40. label,
41. hitop_id,
42. screen,
43. ch_name,
44. author,
45. sversion,
46. IF (mnc IN ('00','01','02','03','04','05','06','07'), mnc,'x') AS mnc,
47. interface,
48. designer,
49. is_safe,
50. IF (icon_count <= 5,icon_count,6) AS icon_count,
51. update_time,
52. stars,
53. IF ( comment_num IS NULL,0,
54. IF ( comment_num <= 10,comment_num,11)) AS comment_num,
55. font,
56. price,
57. IF (file_size <= 2*1024*1024,2,
58. IF (file_size <= 4*1024*1024,4,
59. IF (file_size <= 6*1024*1024,6,
60. IF (file_size <= 8*1024*1024,8,
61. IF (file_size <= 10*1024*1024,10,
62. IF (file_size <= 12*1024*1024,12,
63. IF (file_size <= 14*1024*1024,14,
64. IF (file_size <= 16*1024*1024,16,
65. IF (file_size <= 18*1024*1024,18,
66. IF (file_size <= 20*1024*1024,20,21)))))))))) AS file_size,
67. ischarge,
68. IF (dlnum IS NULL,0,
69. IF (dlnum <= 50,50,
70. IF (dlnum <= 100,100,
71. IF (dlnum <= 500,500,
72. IF (dlnum <= 1000,1000,
73. IF (dlnum <= 5000,5000,
74. IF (dlnum <= 10000,10000,
75. IF (dlnum <= 20000,20000,20001)))))))) AS dlnum
76. FROM
77. dw_rcm_hitop_sample2learn_dm
78. ) t2
79.ON (t1.device_id = t2.device_id);
针对Hive中“tmp_dw_rcm_hitop_prepare2train_dm”数据,可以使用Hive自定义函数进行预处理,得到逻辑回归模型的训练集,这种方式需要编写代码,并且打包上传集群处理数据。这里,我们也可以在Hive中直接使用python对Hive中的数据进行预处理。
将python文件加载到Hive中:
1.ADD FILE /opt/sxt/recommender/script/dw_rcm_hitop_prepare2train_dm.py;
可以通过list files;查看是不是python文件加载到了hive:
在hive中使用python脚本处理数据的原理:Hive会以输出流的形式将数据交给python脚本,python脚本以输入流的形式来接受数据,接受来数据以后,在python中就可以一行行做一系列的数据处理,处理完毕后,又以输出流的形式交给Hive,交给了hive就说明了就处理后的数据成功保存到hive表中。
1.import sys
2.
3.if __name__ == "__main__":
4. # random.seed(time.time())
5. for l in sys.stdin:
6. d = l.strip().split('\t')
7. if len(d) != 21:
8. continue
9. # Extract data from the line
10. label = d.pop(0)
11. hitop_id = d.pop(0)
12. screen = d.pop(0)
13. ch_name = d.pop(0)
14. author = d.pop(0)
15. sversion = d.pop(0)
16. mnc = d.pop(0)
17. interface = d.pop(0)
18. designer = d.pop(0)
19. icon_count = d.pop(0)
20. update_date = d.pop(0)
21. stars = d.pop(0)
22. comment_num = d.pop(0)
23. font = d.pop(0)
24. price = d.pop(0)
25. file_size = d.pop(0)
26. ischarge = d.pop(0)
27. dlnum = d.pop(0)
28.
29. hitopids = d.pop(0)
30. device_name = d.pop(0)
31. pay_ability = d.pop(0)
32.
33. # Construct feature vector
34. features = []
35. features.append(("Item.id,%s" % hitop_id, 1))
36. features.append(("Item.screen,%s" % screen, 1))
37. features.append(("Item.name,%s" % ch_name, 1))
38. features.append(("Item.author,%s" % author, 1))
39. features.append(("Item.sversion,%s" % sversion, 1))
40. features.append(("Item.network,%s" % mnc, 1))
41. features.append(("Item.dgner,%s" % designer, 1))
42. features.append(("Item.icount,%s" % icon_count, 1))
43. features.append(("Item.stars,%s" % stars, 1))
44. features.append(("Item.comNum,%s" % comment_num,1))
45. features.append(("Item.font,%s" % font,1))
46. features.append(("Item.price,%s" % price,1))
47. features.append(("Item.fsize,%s" % file_size,1))
48. features.append(("Item.ischarge,%s" % ischarge,1))
49. features.append(("Item.downNum,%s" % dlnum,1))
50.
51. #User.Item and User.Item*Item
52. idlist = hitopids.split(',')
53. flag = 0;
54. for id in idlist:
55. features.append(("User.Item*Item,%s" % id +'*'+hitop_id, 1))
56. flag += 1
57. if flag >= 3:
58. break;
59.
60. # Output
61.
62. output = "%s\t%s" % (label, ";".join([ "%s:%d" % (f, v) for f, v in features ]))
63. print(output)
1.INSERT OVERWRITE TABLE dw_rcm_hitop_prepare2train_dm
2. SELECT
3. TRANSFORM (t.*)
4. USING 'python dw_rcm_hitop_prepare2train_dm.py'
5. AS (label,features)
6. FROM
7. (
8. SELECT
9. label,
10. hitop_id,
11. screen,
12. ch_name,
13. author,
14. sversion,
15. mnc,
16. interface,
17. designer,
18. icon_count,
19. update_date,
20. stars,
21. comment_num,
22. font,
23. price,
24. file_size,
25. ischarge,
26. dlnum,
27. idlist,
28. device_name,
29. pay_ability
30. FROM
31. tmp_dw_rcm_hitop_prepare2train_dm
32. ) t;
将“dw_rcm_hitop_prepare2train_dm”表中的数据导入到本地处理,这里可以直接在集群中使用SparkMLlib直接处理,为了方便演示,将数据导入到本地处理。
1.insert overwrite local directory '/opt/data/traindata' row format delimited fields terminated by '\t' select * from dw_rcm_hitop_prepare2train_dm;
注:这里是将数据导出到本地,方便后面再本地模式跑数据,导出模型数据。这里是方便演示真正的生产环境是直接用脚本提交spark任务,从Hive中获取数据经过Spark处理得到模型文件,将模型数据写往Redis中。