目录
一、Row对象常见操作
二、Row、RDD、DataFrame互相转换
1、RDD—>DataFrame
2、DataFrame—>RDD
3、DataFrame—>Row
4、Row—>DataFrame
一、Row对象常见操作
python">from pyspark.sql import Row
# 创建一个Row对象
row = Row(name="张三", age=25)
# 使用索引、字段名访问字段
print(row[0], row.name)
# 修改Row对象(通过转换为字典的方式进行修改)
dict_ = row.asDict()
dict_['age'] = 26
del dict_['name']
dict_['姓名'] = "李四"
new_row = Row(**dict_)
# 值迭代
for field in row:
print(field)
#判断是否包含某个字段
print("name" in row)
# 获取字段数量
len(row)
二、Row、RDD、DataFrame互相转换
1、RDD—>DataFrame
python">from pyspark.sql import SparkSession
from pyspark.sql import Row
# 初始化SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()
# 创建一个RDD
rdd = sc.parallelize([("Alice", 25), ("Bob", 30)])
# 将RDD的元素转换为Row对象
row_rdd = rdd.map(lambda x: Row(name=x[0], age=x[1]))
# 将Row RDD转换为DataFrame
df = spark.createDataFrame(row_rdd)
df.show()
2、DataFrame—>RDD
python"># 从DataFrame获取RDD
rdd_from_df = df.rdd
# 进一步将RDD的元素转换为元组或其他格式
rdd_as_tuples = rdd_from_df.map(lambda row: (row.name, row.age))
rdd_as_tuples.collect()
3、DataFrame—>Row
DataFrame的每一行都是一个Row对象。
python"># 迭代DataFrame获取Row
for row in df.collect():
print(f"name:{row.name} age:{row.age}")
# 以下都会生成Row对象
df.limit(1)
df.first
4、Row—>DataFrame
python"># Row对象列表
rows = [Row(name="Alice", age=25), Row(name="Bob", age=30)]
# 创建DataFrame
df = spark.createDataFrame(rows)
df.show()