pyspark 检测任务输出目录是否空,避免读取报错

news/2024/5/20 4:03:12 标签: python, java, hadoop, hdfs

前言

在跑调度任务时候,有时候子任务需要依赖前置任务的输出,但类似读取 Parquet 或者 Orc 文件时,如果不判断目录是否为空,在输出为空时会报错,所以需要 check 一下,此外Hadoop通常在写入数据时会在目录中生成一个名为_SUCCESS的文件来表示写入操作已成功完成,我们在检测时要排除这个文件

HDFS API 判断

from py4j.java_gateway import java_import
from pyspark.sql import SparkSession

# 初始化SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()

# 导入Hadoop FileSystem类
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
java_import(spark._jvm, 'org.apache.hadoop.fs.FileSystem')

# 定义要检查的路径
FEATURE_OUTPUT_PATH = "your_path_here"

# 获取Hadoop Configuration
hadoop_conf = spark._jsc.hadoopConfiguration()

# 获取FileSystem对象
fs = spark._jvm.FileSystem.get(hadoop_conf)

# 检查路径是否存在
path = spark._jvm.Path(FEATURE_OUTPUT_PATH)

if fs.exists(path):
    # 获取目录下所有的文件和子目录
    status_list = fs.listStatus(path)
    non_success_files = [file_status.getPath().getName() for file_status in status_list if
                         file_status.getPath().getName() != "_SUCCESS"]

    # 检查除_SUCCESS文件外是否还有其他文件
    if non_success_files:
        # 读取Parquet文件
        table = spark.read.format('parquet').option('header', 'true').load(FEATURE_OUTPUT_PATH)
    else:
        print("The directory is empty or only contains a _SUCCESS file.")
else:
    print("The path does not exist.")

本地 Shell 判断

注意这段脚本能使用的前提是,执行的机器上已经安装和配置了 HDFS 的 shell 命令

import subprocess

out=subprocess.check_output("hadoop fs -ls /tmp/file.txt",shell=True)

out=out.strip()

out=out.split("\n")

for l in out:

if l.endswith(".txt"):

print "file exit"
    else:
        print "file not exit"

http://www.niftyadmin.cn/n/5057915.html

相关文章

源码编译postgresql

没什么好研究的了,就试试编译Postgresql源码,按照网站查的资料一步步测试的,方便后期定制数据库时候用,也算是开源的大优势了,只要你愿意折腾,可以自己定制或改进一个数据库来满足特殊业务。后面研究一下他…

K8S-CNI

CNI的设计思想即为:Kubernetes在启动Pod的pause容器之后,直接调用CNI网络插件,从而实现为Pod内部应用容器月在的Network Namespace配置符合预期的网络信息。 这里面需要特别关注两个方面:Container必须有自己的网络命名空间的环境,也就是end…

【RabbitMQ】常用消息模型详解

文章目录 AMQP协议的回顾RabbitMQ支持的消息模型第一种模型(直连)开发生产者开发消费者生产者、消费者开发优化API参数细节 第二种模型(work quene)开发生产者开发消费者消息自动确认机制 第三种模型(fanout)开发生产者开发消费者 第四种模型(Routing)开发生产者开发消费者 第五…

Flutter笔记:手写一个简单的画板工具

Flutter笔记 手写一个简单的画板工具 作者:李俊才 (jcLee95):https://blog.csdn.net/qq_28550263 邮箱 :291148484163.com 本文地址:https://blog.csdn.net/qq_28550263/article/details/133418742 目 录 1…

白盒 SDK 加密 —— Go 语言中直调 C 动态库实现

文章目录 1.背景2.实现方式2.1.C 库 .so 文件生成2.2.C 库 .h 文件2.3.Goland 调用实现2.3.1 整体2.3.2 注释块部分2.3.3 逻辑实现部分 3.小结 1.背景 在重构的历史项目中,有一点是语言转换:从 PHP 转至 Goland ,在压缩资源的同时&#xff0…

Linux性能优化--性能工具-系统CPU

2.0.概述 本章概述了系统级的Linux性能工具。这些工具是你追踪性能问题时的第一道防线。 它们能展示整个系统的性能情况和哪些部分表现不好。 1.理解系统级性能的基本指标,包括CPU的使用情况。 2.明白哪些工具可以检索这些系统级性能指标。2.1CPU性能统计信息 为了…

【模拟实现C语言库函数】atoi的模拟实现

#include <stdio.h> #include <assert.h> #include <string.h> #include <math.h> int my_atoi(const char* str) {assert(str);size_t len strlen(str);size_t j len - 1;// 个位&#xff08;1234中的4&#xff09;int ret str[j--] - 0;// 十位百…

Unity实现设计模式——解释器模式

Unity实现设计模式——解释器模式 解释器模式&#xff08;Interpreter Pattern&#xff09;是一种按照规定语法进行解析的模式&#xff0c;现实项目中用得较少。 给定一门语言&#xff0c;定义它的文法的一种表示&#xff0c;并定义一个解释器&#xff0c;该解释器使用该表示来…