利用Spark将Kafka数据流写入HDFS

news/2024/5/20 4:51:42 标签: spark, kafka, hdfs, 大数据, 大作业

利用Spark将Kafka数据流写入HDFS

在当今的大数据时代,实时数据处理和分析变得越来越重要。Apache Kafka作为一个分布式流处理平台,已经成为处理实时数据的事实标准。而Apache Spark则是一个强大的大数据处理框架,它提供了对数据进行复杂处理的能力。
本篇博客将介绍如何使用Spark来读取Kafka中的数据流,并将这些数据以CSV格式写入到HDFS中。
环境准备
在开始之前,确保你的开发环境中已经安装了以下软件:

Apache Kafka

#启动zookeeper
zkServer start
#启动kafka服务
kafka-server-start /opt/homebrew/etc/kafka/server.properties

Apache Spark

<properties>
      <scala.version>2.12.17</scala.version>
      <spark.version>3.0.0</spark.version>
 <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- Kafka Streaming dependency -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

Hadoop HDFS

#启动hdfs
start-dfs.sh

Java开发环境
此外,你需要在项目中包含Spark和Kafka的依赖库。

代码实现
首先,我们定义一个Scala case class Job 来表示从Kafka读取的每条记录的数据结构。

case class Job(
  Position: String,
  Company: String,
  Salary: String,
  Location: String,
  Experience: String,
  Education: String,
  Detail: String
)

接下来,我们编写一个Kafka2Hdfs对象,并在其中实现main方法。这个方法将创建一个SparkSession,配置Kafka读取选项,并从Kafka中读取数据流。

object Kafka2Hdfs {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Kafka2Hdfs")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val kafkaOptions = Map[String, String](
      "kafka.bootstrap.servers" -> "127.0.0.1:9092",
      "subscribe" -> "flume",
      "startingOffsets" -> "earliest"
    )

    val stream = spark.readStream
      .format("kafka")
      .options(kafkaOptions)
      .load()

我们使用subscribe选项指定Kafka中的topic名称,这里我们使用的是flume。startingOffsets选项设置为earliest,意味着我们从最早的记录开始读取数据。

接下来,我们将Kafka中的数据转换成DataFrame。我们首先将每条记录的value字段转换为字符串,然后使用map函数将每条记录解析为Job对象。

val jobDs = stream.selectExpr("CAST(value AS STRING)")
  .as[String]
  .map(line => {
    val fields = line.split(",")
    Job(
      Position = fields(0),
      Company = fields(1).trim,
      Salary = fields(2).trim,
      Location = fields(3).trim,
      Experience = fields(4).trim,
      Education = fields(5).trim,
      Detail = fields(6).trim
    )
  }).toDF()

现在,我们已经有了一个包含Job对象的DataFrame。接下来,我们将这个DataFrame以CSV格式写入到HDFS中。我们使用writeStream方法,并设置format为csv,同时指定输出路径和检查点位置。

val query: StreamingQuery = jobDs.writeStream
  .format("csv")
  .option("header", "false")
  .option("path", "/")
  .option("checkpointLocation", "/ck")
  .start()

注意,我们在这里将header选项设置为false,因为我们不打算在CSV文件中包含列名。path选项指定了输出文件的存储路径,而checkpointLocation选项指定了检查点的存储路径,这对于流处理的可靠性非常重要。

最后,我们调用awaitTermination方法来等待流处理的结束。在实际的生产环境中,你可能希望将这个流处理任务部署到一个集群上,并让它持续运行。

query.awaitTermination()

总结
在这篇博客中,我们介绍了如何使用Spark读取Kafka中的数据流,并将这些数据以CSV格式写入到HDFS中。这种方法可以用于各种实时数据处理场景,例如日志分析、事件监控等。通过这种方式,我们可以将实时数据转换为静态数据,以便进行更深入的分析和处理。

完整代码:

package com.lhy.sparkkafka2hdfs

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Row, SparkSession}



case class Job(Position:String,Company:String,Salary:String,Location:String,Experience:String,Education:String,Detail:String)
object Kafka2Hdfs{
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Kafka2Hdfs")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val kafkaOptions = Map[String, String](
      "kafka.bootstrap.servers" -> "127.0.0.1:9092",
      "subscribe" -> "flume",
      "startingOffsets" -> "earliest"
    )

    val stream = spark.readStream
      .format("kafka")
      .options(kafkaOptions)
      .load()


    val jobDs = stream.selectExpr("CAST(value AS STRING)")
      .as[String]
      .map(line => {
        val fields = line.split(",")
        Job(
          Position = fields(0),
          Company = fields(1).trim,
          Salary = fields(2).trim,
          Location = fields(3).trim,
          Experience = fields(4).trim,
          Education = fields(5).trim,
          Detail = fields(6).trim
        )
      }).toDF()
//    val query = jobDs.writeStream.format("console").start()

    val query: StreamingQuery = jobDs.writeStream
      .format("csv")
      .option("header", "false")
      .option("path", "/")
      .option("checkpointLocation", "/ck")
      .start()

    query.awaitTermination()

  }

在这里插入图片描述
如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述


http://www.niftyadmin.cn/n/5469861.html

相关文章

代码随想录算法训练营DAY15|C++二叉树Part.2|102.二叉树的层序遍历、226.翻转二叉树、101. 对称二叉树

文章目录 102.二叉树的层序遍历思路伪代码迭代法递归法 CPP代码拓展题 226.翻转二叉树思路CPP代码 101. 对称二叉树伪代码CPP代码 102.二叉树的层序遍历 力扣题目链接 文章讲解&#xff1a;102.二叉树的层序遍历 视频讲解&#xff1a;讲透二叉树的层序遍历 | 广度优先搜索 | Le…

希尔排序算法(Java实现)

1.希尔排序&#xff08;Shell Sort&#xff09; &#xff08;1&#xff09;算法思想 先追求表中元素部分有序&#xff0c;再逐渐逼近全局有序。先将待排序表分割成若干形如 L [ i , i d , i 2 d , . . . , i k d ] L[i,id,i2d,...,ikd] L[i,id,i2d,...,ikd]的子表&#xff…

基于单片机地铁自动报站系统

**单片机设计介绍&#xff0c;基于单片机地铁自动报站系统 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的地铁自动报站系统概要主要涉及利用单片机技术实现地铁列车的自动报站功能。以下是关于该设计的详细概述&…

电商技术揭秘六:前端技术与用户体验优化

文章目录 引言一、前端技术在电商中的重要性1.1 前端技术概述1.2 用户体验与前端技术的关系 二、响应式设计与移动优化2.1 响应式设计的原则2.2 移动设备优化策略2.3 响应式设计的工具和框架 三、交互设计与用户体验提升3.1 交互设计的重要性3.2 用户体验的量化与优化3.3 通过前…

基于单片机的全自动洗衣机系统仿真设计

**单片机设计介绍&#xff0c;基于单片机的全自动洗衣机系统仿真设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的全自动洗衣机系统仿真设计概要是关于利用单片机技术实现全自动洗衣机控制功能的系统设计概述。以…

图DP

目录 有向无环图DP 力扣 329. 矩阵中的最长递增路径 力扣 2192. 有向无环图中一个节点的所有祖先 有向有环图DP 力扣 1306. 跳跃游戏 III 有向无环图DP 力扣 329. 矩阵中的最长递增路径 给定一个 m x n 整数矩阵 matrix &#xff0c;找出其中 最长递增路径 的长度。 对…

ubuntu20.04执行sudo apt-get update失败的解决方法

参考&#xff1a;执行sudo apt-get update失败的解决方案 1、换源型错误 &#xff08;1&#xff09;编辑/etc/apt/sources.list文件 在命令行中输入&#xff1a; sudo vim /etc/apt/sources.list 或者 sudo gedit /etc/apt/sources.list 推荐使用后者 &#xff08;2&#xf…

基于单片机的智能报站系统仿真设计

**单片机设计介绍&#xff0c;基于单片机的智能报站系统仿真设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的智能报站系统仿真设计概要是关于采用单片机技术实现公交车报站功能的系统设计概述。以下是对该设计的…