Pyspark读写csv,txt,json,xlsx,xml,avro等文件

news/2024/5/20 3:39:49 标签: spark, python, big data, hdfs, azure

1. Spark读写txt文件

读:

python">df = spark.read.text("/home/test/testTxt.txt").show()
+-------------+
|        value|
+-------------+
|      a,b,c,d|
|123,345,789,5|
|34,45,90,9878|
+-------------+

2. Spark读写csv文件

读:

python"># 文件在hdfs上的位置
file_path = r"/user/lanyue/data.csv"
# 方法一
# 推荐这种,指定什么文件格式都可以,只需要修改参数format即可
# 不同的格式其load函数会有不同,用的时候请自行搜索。
df = spark.read.format("csv").load(file_path, header=True, inferSchema=True, encoding="utf-8", sep=',') 
# sep=',',表示指定分隔符为逗号,同参数delimiter。
# header=TRUE,表示数据的第一行为列名
# inferSchema,表示是否对字段类型进行推测。=False,默认读取后都按照文本字符处理。=True表示自动推断schema。

# 或者下面这种形式。这两种形式都可以
df = spark.read.format("csv").option("encoding","utf-8").option("header",True).load(file_path, schema=schema)  # 使用指定的schema

# 方法二
df = spark.read.csv(file_path, encoding='utf-8', header=True, inferSchema=True) 
df = spark.read.csv(file_path, encoding='utf-8', header=True, schema=schema) 
# 如果想指定文件格式是json,那就是spark.read.json,其他类似

写:

python"># 保存在【hdfs上】,以csv文件的格式。指定什么文件格式都可以,只需要修改参数format即可
df.repartition(1).write.mode('append').format("csv").option("encoding","utf-8").option("header",True).save("/lanyue/data.csv") 
# mode,保存模式:ovewriter重写、append文件末尾追加、error如果文件存在抛出异常、ignore如果文件存在忽略不更新
# repartition, 在yarn模式下,Spark会根据hdfs文件的块数据大小来划分默认的分区数目,但是我们也可以自己设置分区数目,使用参数repartition。=1表示只保存成一个数据块

# 或者
df.write.csv("/lanyue/data.csv", sep="\t", encoding="utf-8", mode='overwrite') 
# 如果想指定文件格式是json,那就是df.write.json,其他类似
# 通过指定参数sep,来指定分隔符,可以是",", "\t","\x01"等。同参数delimiter。

3. Spark读写parquet文件

读:

python">file = "/user/muzili/data.parquet"
spark_df=spark.read.parquet(file)
df.show()

写:

python">spark_df.write.parquet(path=file,mode='overwrite')

4. Spark读写json文件

读:

python">file = "/user/muzili/data.json"
df = spark.read.json(file)
df.show()

写:

python">df.repartition(1).write.mode('append').format("json").option("encoding","utf-8").option("header",True).save("/user/muzili/data.json")

5. Spark读写excel文件

读:

写:

6. Spark读写xml文件

读:

写:

7. Spark读写orc文件

读:

写:

8. Spark读写avro文件

读:

写:

9. Spark读写mysql中的表

读:

python">url="jdbc:mysql://host:port/database"
table="table_name"
driver="com.mysql.jdbc.Driver"
user="XXX"
password="XXX"

df = spark.read.format("jdbc")
      .option("url",url) # database地址,格式为jdbc:mysql://主机:端口/数据库
      .option("dbtable",table) # 表名
      .option("user",user)
      .option("password",password)
      .option("driver",driver)
      .load()
      
# 或者以下形式
df = spark.read.format('jdbc').options(url="jdbc:mysql://host:port/database", # database地址
									 driver="com.mysql.jdbc.Driver",
                                     dbtable="table_name", 
                                     user="XXX",
                                     password="XXX").load()

# 或者以下形式
# mysql的相关配置
prop = {'user': 'xxx', 
        'password': 'xxx', 
        'driver': 'com.mysql.jdbc.Driver'}
url = 'jdbc:mysql://host:port/database' # database地址
df = spark.read.jdbc(url=url, table='mysql_table_name', properties=prop)

写:

python"># 会自动对齐字段,也就是说,spark_df 的列不一定要全部包含MySQL的表的全部列才行
prop = {'user': 'xxx', 
        'password': 'xxx', 
        'driver': 'com.mysql.jdbc.Driver'}
url = 'jdbc:mysql://host:port/database' # database地址
df.write.jdbc(url=url, table='table_name', mode='append', properties=prop)
# append 追加方式

# 或者以下形式
df.write.format("jdbc")
  .option("url","jdbc:mysql://host:port/database") # database地址
  .option("dbtable","table_name")
  .option("user",user)
  .option("password",password)
  .option("driver",driver)
  .option("batchsize","1000").mode("overwrite") # overwrite 清空表再导入
  .save()


http://www.niftyadmin.cn/n/5055262.html

相关文章

linux部分答案

一、所有LINUX配置 1、主机名 #hostnamectl set-hostname linux1.skills.lan 2、更改ROOT密码 #passwd root Pass-1234 Pass-1234

Python实用技术——爬虫(二):爬虫需要使用的库

一,Requests库 1,主要使用方法: 1)get()方法: 这个Response对象中包含爬虫返回的内容。 除了request方法是基础方法外,其他都是通过调用request方法来实现的。 所以,我…

CoreData/数据存储管理, CoreDataRelationships/关系型数据结构存储管理 的使用

1. CoreData 数据的增删改查操作 1.1 创建数据实体管理文件 点击菜单栏 File -> New -> File... -> Core Data 栏下,选择 Data Mode,创建 FruitsContainer.xcdatamodeld 文件 1.2 创建 FruitEntity 实体表 打开 FruitsContainer.xcdatamodeld 文…

MR混合现实在军事课堂教学中的应用演示

战场模拟:利用MR技术可以创建逼真的战场模拟环境,将学生置身于真实的战场场景中,可以体验和学习各种作战技巧和战术策略。学生可以通过佩戴MR头盔或眼镜,观察虚拟的场景,并与虚拟对象进行互动,如操作武器、…

【MATLAB源码-第39期】基于m序列/gold序列的直接扩频通信仿真,编码方式采用卷积码,调制方式采用BPSK。

1、算法描述 直接序列扩频通信系统的仿真一般包括以下几个主要步骤:信号产生、扩频、卷积编码、BPSK调制、信道传输、BPSK解调、卷积码译码和解扩。 信号产生: 首先,产生一个二进制数据序列作为待发送的信息位。 扩频: 采用m序列…

Spring Framework 学习笔记2:注解开发

Spring Framework 学习笔记2&#xff1a;注解开发 本文使用的示例项目是 spring-demo。 1.Component Component注解的用途与<bean/>标签的用途是相同的&#xff0c;都用于向 IoC 容器添加一个 Bean 定义。 比如&#xff1a; Component public class UserDaoImpl imp…

基于微信小程序的高校暑期社会实践小程序设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言系统主要功能&#xff1a;具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计…

循环神经网络(Recurrent Neural Networks,RNN)模型深度学习

循环神经网络&#xff08;Recurrent Neural Networks&#xff0c;RNN&#xff09;是深度学习领域中一种重要的模型&#xff0c;尤其在处理序列数据方面具有显著的优势。本文将介绍循环神经网络的基本概念、工作原理、应用场景以及与其他神经网络的区别&#xff0c;并通过具体案…