Spark解析JSON文件,写入hdfs

news/2024/5/20 1:56:25 标签: spark, json, hdfs, Gson, rdd转Dataframe

一、用Sparkcontext读入文件,map逐行用Gson解析,输出转成一个caseclass类,填充各字段,输出。

解析JSON这里没有什么问题。

RDD覆盖写的时候碰到了一些问题 :

1.直接saveAsTextFile没有覆盖true参数;

2.转dataframe时,还得一个一个字段显化才能转成dataframe;

3.write时,一开始打算写text,说字段里不能含有long和int,换成string后,说不能有多个字段,只能有一个string。刚好用parquet存储省空间,就存parquet了。

跑通代码如下:

package XXX

import com.google.gson.JsonParser
import org.apache.spark.sql.SparkSession
import schema.caseClass1

object ParsecaseClass1Json {
  def main(args: Array[String]): Unit = {
    val inputPath = args(0)
    val outputPath = args(1)
    val sparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).getOrCreate()
    import sparkSession.implicits._

    val lines = sparkSession.sparkContext.textFile(inputPath)
    val result = lines.map(=> parseJsonStr(str))

    val df = result.map(
      x => (
        x.adId
        , x.campaignId
        , x.settlementType
        , x.billingType
        , x.billingTypeCode
        , x.packageName
      ))
      .toDF()
    df.coalesce(1).write.format("parquet").mode("overwrite").save(outputPath)

  }

  def parseJsonStr(str: String): caseClass1 = {
      val inputJson = new JsonParser().parse(str).getAsJsonObject
      val object = new caseClass1

      //1.campaignId
      if (inputJson.has("campaign")) {
        val campaign = inputJson.getAsJsonObject("campaign")
        var campaignId: Long = 0
        if (campaign.has("id"))
          campaignId = campaign.getAsJsonPrimitive("id").getAsLong
        else if (campaign.has("campaignId")) {
          campaignId = campaign.getAsJsonPrimitive("campaignId").getAsLong
        }
        else {
          System.err.println("No campaignId, inputJson: {}", str)
        }
        object.campaignId = campaignId
      }

      //2.creativeId
      if (inputJson.has("creative")) {
        val creative = inputJson.getAsJsonObject("creative")
        var adId: Long = 0
        if (creative.has("id"))
          adId = creative.getAsJsonPrimitive("id").getAsLong
        else if (creative.has("creativeId"))
          adId = creative.getAsJsonPrimitive("creativeId").getAsLong
        else
          System.err.println("No adId, inputJson: {}", str)
        object.adId = adId

      }

      /*
      3.settlementType
      4.billingType
      5.billingTypeCode
      6.appId -> packageName
      */
      if (inputJson.has("group")) {
        val group = inputJson.getAsJsonObject("group")

        object.settlementType = group.getAsJsonPrimitive("settlementType").getAsString
        object.billingType = group.getAsJsonPrimitive("billingType").getAsString
        object.billingTypeCode = group.getAsJsonPrimitive("billingTypeCode").getAsInt
        object.packageName = group.getAsJsonPrimitive("appId").getAsString
      }
    System.err.println(object.toString)
    object
  }

}

caseClass如下

package XXX;

import java.io.Serializable;

public class caseClass1 implements Serializable {

    private static final long serialVersionUID = ***;

    public long adId = 0;
    public long campaignId = 0;
    public String settlementType;
    public String billingType;
    public Integer billingTypeCode;
    public String packageName;

    @Override
    public String toString() {
        return "caseClass1{" +
                "adId=" + adId +
                ", campaignId=" + campaignId +
                ", settlementType='" + settlementType + '\'' +
                ", billingType='" + billingType + '\'' +
                ", billingTypeCode=" + billingTypeCode +
                ", packageName='" + packageName + '\'' +
                '}';
    }
    public caseClass1() {

    }
}


http://www.niftyadmin.cn/n/5348200.html

相关文章

MySQL十部曲之四:MySQL中的数据类型

文章目录 前言概述数字类型数字类型语法数字类型字面量十六进制字面量位字面量布尔字面量 数字类型的属性超出范围和溢出处理 时间和日期类型时间和日期类型语法DATE、DATETIME和TIMESTAMP的异同TIMESTAMP和DATETIME的自动初始化和更新时间和日期字面量 字符串类型字符串类型语…

Go 的命令行解析 flag 包如何扩展新类型呢?

上篇文章 说到,除布尔类型 Flag,flag 支持的还有整型(int、int64、uint、uint64)、浮点型(float64)、字符串(string)和时长(duration)。 flag 内置支持能满足…

本地Vscode使用SSH连接Linux虚拟机循环输入密码,无法登陆

今天在工作的时候没有在本地关闭Vscode的前提下,重启了虚拟机后,发现ssh连接不上了,症状就是反复输入密码就是进不去系统,查了很多网上的教程都没啥用; 最后就一招彻底解决问题: 第一步:打开虚…

渲染农场哪家好?渲染农场怎么用?

渲染农场也可以叫做分布式并行集群计算系统,这是一种利用现成的CPU、以太网和操作系统构建的超级计算机,它使用主流的商业计算机硬件设备达到或接近超级计算机的计算能力,提供动画、电影、视觉效果以及建筑可视化等渲染服务。 渲染农场哪家…

提高多旋翼无人机的悬停控制精度

要提高多旋翼无人机的悬停控制精度,可以从以下几个方面进行优化: 优化传感器配置:选用高精度的传感器,如激光雷达、红外传感器等,可以提供更准确的姿态和位置信息。同时,对传感器进行定期标定和校准&#…

Vite+Electron快速构建一个VUE3桌面应用(二)——动态模块热重载

一. 简介 在上一篇文章ViteElectron快速构建一个VUE3桌面应用中,我们了解了如何使用Vite和Electron来快速构建一个Vue3桌面应用。但是,之前构建的应用仅仅是一个简单的版本。在开发过程中,为了更好的开发体验,在开发electron的时…

windows环境下配置tensorflow_gpu版本——无需更改本地的cuda

大家可以在tensorflow学习中,可能会遇到使用tensorflow_gpu版本的安装,但是一般涉及到gpu的安装,就需要配置cuda,这个过程很麻烦且浪费时间,下面给出一个简单的方法配置环境。 假设已经创建好虚拟环境,这里…

《动手学深度学习(PyTorch版)》笔记3.5

注:书中对代码的讲解并不详细,本文对很多细节做了详细注释。另外,书上的源代码是在Jupyter Notebook上运行的,较为分散,本文将代码集中起来,并加以完善,全部用vscode在python 3.9.18下测试通过。…