将需要计算的数据转为rdd的数据,就可以利用spark的内存计算方法进行分布式计算操作,这些计算方法就是有rdd提供的
rdd数据的转发方法是有sparkcontext提供的,所以需要先生成sparkcontext,
SparkContext称为Spark的入口类
只要是能被遍历的,都能转化为RDD数据
# 导入sparkcontext
from pyspark import SparkContext
# 创建SparkContext对象
sc = SparkContext()
# 将Python数据转为rdd
# data_int = 10 # 数值类型不能转化rdd
# 能for循环遍历的数据都能转为rdd
data_str = 'abc'
data_list = [1, 2, 3, 4]
data_dict = {'a': 1, 'b': 2}
data_set = {1, 2, 3, 4}
data_tuple = (1, 2, 3, 4)
rdd = sc.parallelize(data_tuple)
# rdd的数据输出展示
# 获取所有rdd数据
res = rdd.collect()
print(res)
8020是namenode默认的端口号
# 将读取的hdfs文件数据转为rdd
from pyspark import SparkContext
# 生成SparkContext类对象
sc = SparkContext()
# 读取文件数据转为rdd
rdd1 = sc.textFile('hdfs://node1:8020/data') # 8020是namenode端口号?
# 读取目录下的所有文件 简写如果报错就写全写,也就是上面的内容
rdd3 = sc.textFile('/data')
# 只读取单独文件
rdd2 = sc.textFile('/data/words.txt')
# 查看数据
res = rdd1.collect()
print(res)
res = rdd2.collect()
print(res)
python数据转发的分区数指定
# RDD分区使用
# 导入sparkcontext
from pyspark import SparkContext
# 创建SparkContext对象
sc = SparkContext()
# 创建生成rdd是可以指定分区数
# Python数据转为rdd指定
# numSlices 可以指定分区数
rdd_py = sc.parallelize([1,2,3,4,5,6],numSlices=10)
# 查看rdd分区数据
res1 = rdd_py.glom().collect()
print(res1)
读取的文件数据进行分区数指定
# RDD分区使用
# 导入sparkcontext
from pyspark import SparkContext
# 创建SparkContext对象
sc = SparkContext()
# 创建生成rdd是可以指定分区数
# file文件读取数据指定分区数据
# minPartitions 指定分区
# 文件大小/分区数 = 值 -----余数
# 余数/值 * 100%=百分比 百分比大于10% 会多创建一个分区
rdd_file = sc.textFile('hdfs://node1:8020/data',minPartitions=1)
# 在spark并行度部分会讲解如何根据资源设置分区数
# rdd计算
# 查看rdd分区数据
res2 = rdd_file.glom().collect()
print(res2)
一个分区对应一个task线程,当小文件过多时,会占用大量的线程,造成资源浪费
使用wholeTextFiles方法可以解决
该方法会现将读取到的数据合并在一起,然后重新进行分区
# 导入sparkcontext
from pyspark import SparkContext
# 创建SparkContext对象
sc = SparkContext(master='yarn')
# rdd = sc.textFile('hdfs://node1:8020/data')
# rdd计算
# wholeTextFiles 会合并小文件数据
# minPartitions 指定分区数
rdd_mini = sc.wholeTextFiles('hdfs://node1:8020/data',minPartitions=1)
# 展示数据
# res1 = rdd.glom().collect()
# print(res1)
res2 = rdd_mini.glom().collect()
print(res2)
rdd中封装了各种算子方便进行计算,主要分为两类
新的rdd
,定义了一个线程任务