[Hadoop实现Springboot之HDFS数据查询和插入 ]

news/2024/5/20 3:39:51 标签: hdfs, hive, 数据库

目录

🎃前言: 

🎃Spring Boot项目中添加Hadoop和HDFS的依赖。可以使用Apache Hadoop的Java API或者使用Spring Hadoop来简化操作。

🎃 需要配置Hadoop和HDFS的连接信息,包括Hadoop的配置文件和HDFS的连接地址等。

🎃可以使用Hadoop的API来实现数据的查询和添加。例如,使用HDFS的FileSystem API来读取和写入文件,使用MapReduce来处理数据等。

🎃MapReduce处理数据:

创建一个MapReduce任务,用于统计HDFS中文本文件中单词的出现次数

🎃实现MapReduce任务的API

🎃main方法测试

api请求:


🎃前言: 

  🎃这只是一个笔记而已

🎃Spring Boot项目中添加Hadoop和HDFS的依赖。可以使用Apache Hadoop的Java API或者使用Spring Hadoop来简化操作。

根据情况选取依赖

 
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>3.3.1</version>
    </dependency>
</dependencies>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.3.1</version>
</dependency>

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-hadoop</artifactId>
    <version>2.5.0.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>3.3.1</version>
</dependency>


🎃 需要配置Hadoop和HDFS的连接信息,包括Hadoop的配置文件和HDFS的连接地址等。

# Hadoop configuration
spring.hadoop.config.fs.defaultFS=hdfs://localhost:9000
spring.hadoop.config.dfs.replication=1
spring.hadoop.config.dfs.blocksize=128m
spring.hadoop.config.dfs.client.use.datanode.hostname=true
spring.hadoop.config.dfs.client.read.shortcircuit=true
spring.hadoop.config.dfs.domain.socket.path=/var/run/hadoop-hdfs/dn._PORT

# HDFS configuration
spring.hadoop.fsUri=hdfs://localhost:9000
spring.hadoop.fsUser=root

🎃可以使用Hadoop的API来实现数据的查询和添加。例如,使用HDFS的FileSystem API来读取和写入文件,使用MapReduce来处理数据等。

@RestController
@RequestMapping("/hdfs")
public class HdfsController {

    @Autowired
    private FileSystem fileSystem;

    @GetMapping("/read/{path}")
    public ResponseEntity<String> read(@PathVariable String path) throws IOException {
        Path filePath = new Path(path);
        if (!fileSystem.exists(filePath)) {
            return ResponseEntity.notFound().build();
        }
        FSDataInputStream inputStream = fileSystem.open(filePath);
        String content = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
        inputStream.close();
        return ResponseEntity.ok(content);
    }

    @PostMapping("/write/{path}")
    public ResponseEntity<Void> write(@PathVariable String path, @RequestBody String content) throws IOException {
        Path filePath = new Path(path);
        if (fileSystem.exists(filePath)) {
            return ResponseEntity.badRequest().build();
        }
        FSDataOutputStream outputStream = fileSystem.create(filePath);
        IOUtils.write(content, outputStream, StandardCharsets.UTF_8);
        outputStream.close();
        return ResponseEntity.ok().build();
    }
}

使用curl或其他HTTP客户端发送GET和POST请求来测试API: 或者postman去测试

# 读取文件
curl http://localhost:8080/hdfs/read/test.txt

# 写入文件
curl -X POST -H "Content-Type: text/plain" -d "Hello, HDFS!" http://localhost:8080/hdfs/write/test.txt
 

🎃MapReduce处理数据:

创建一个MapReduce任务,用于统计HDFS中文本文件中单词的出现次数

public class WordCount {

    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
}

🎃实现MapReduce任务的API

@RestController
@RequestMapping("/hdfs")
public class HdfsController {

    @Autowired
    private FileSystem fileSystem;

    @Autowired
    private Configuration configuration;

    @PostMapping("/wordcount")
    public ResponseEntity<Void> wordCount(@RequestParam String inputPath, @RequestParam String outputPath) throws Exception {
        Job job = Job.getInstance(configuration, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCount.WordCountMapper.class);
        job.setCombinerClass(WordCount.WordCountReducer.class);
        job.setReducerClass(WordCount.WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        boolean success = job.waitForCompletion(true);
        return success ? ResponseEntity.ok().build() : ResponseEntity.badRequest().build();
    }
}

🎃main方法测试

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public FileSystem fileSystem() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://localhost:9000");
        return FileSystem.get(configuration);
    }

    @Bean
    public Configuration configuration() {
        return new Configuration();
    }
}

api请求:

curl -X POST http://localhost:8080/hdfs/wordcount?inputPath=/user/input&outputPath=/user/output

最后,需要注意Hadoop和HDFS的安全性和性能问题,例如数据的加密和压缩,数据的分片和并行处理等。可以使用Hadoop的安全和性能优化工具来提高系统的稳定性和效率。


http://www.niftyadmin.cn/n/441572.html

相关文章

用GPT4写2023高考语文作文,新课标I卷,Ⅱ卷

文章目录 新课标Ⅰ卷新课标Ⅱ卷总结 每年的高考语文题目都会是热议的话题&#xff0c;今年同样也不例外。但是今年讨论的话题除了作文题目本身之外&#xff0c;对于chatgpt写出的作文会是什么样子的也​让广大网友同样期待 新课标Ⅰ卷 好的故事&#xff0c;可以帮我们更好地表达…

揭密ChatGPT背后团队鲜为人知的小秘密

ChatGPT引领的人工智能技术浪潮还在持续火爆&#xff0c;可是做出这款产品的OpenAI公司&#xff0c;熬得住多年的冷板凳&#xff0c;最终一飞冲天&#xff0c;他们是怎么做到的呢&#xff1f; 因此&#xff0c;我对这家企业的组织建设产生了浓厚的兴趣。我找啊找&#xff0c;最…

金融大数据平台是怎么构建的?

大数据对银行业的价值不言而喻。 在业务上,如何去挖掘客户的内在需求,为客户提供更有价值的服务是目前金融机构的战略转型和业务创新的关键。大数据技术正是金融机构深挖数据资产、实现差异化竞争、推动业务创新的重要工具。 在运营上,通过大数据应用和分析,金融机构能够定位…

一文了解Docker安装和入门

一文详解Docker 什么是Docker? Docker 是一种开源的容器化平台&#xff0c;旨在简化应用程序的部署、运行和管理过程。它基于容器化技术&#xff0c;可以将应用程序及其依赖项打包到一个独立的、可移植的容器中&#xff0c;从而实现应用程序在不同环境中的一致性运行。 容器…

【Dart】Dart学习(三)函数

函数 Dart 是一种真正面向对象的语言&#xff0c;所以即便函数也是对象并且类型为 Function&#xff0c;这意味着函数可以被赋值给变量或者作为其它函数的参数。你也可以像调用函数一样调用 Dart 类的实例。 下面是定义一个函数的例子&#xff1a; bool isNoble(int atomicN…

【Biomechanics】1 Biomechanics as an Interdiscipline

无回到目录第2章 文章目录 1.0 Introduction1.1 Measurement, Description, Analysis, and Assessment1.1.1 Measurement, Description, and Monitoring1.1.2 Analysis1.1.3 Assessment and Interpretation 1.2 Biomechanics and its Relationship with Physiology and Anatomy…

数组的原型方法-es6

数组的原型方法-es6Array.form()Array.of() find() 和 findIndex()copyWithin()fill()entries(),keys()和values()includes()flat()和flatMap()扩展运算符at()reduce()和reduceRight()some()判断数组中是否存在满足条件的项 18、Array.form() Array.from方法用于将两类对象转…

栈和队列(队列的应用)[三]

文章目录 一、滑动窗口最大值滑动窗口最大值(leetcode 239. ) 二、求前 K 个高频元素前 K 个高频元素(leetcode 347.) 一、滑动窗口最大值 思想&#xff1a;这道题属于困难题&#xff0c;不容易想到解决办法。对于“最大值”&#xff0c;我们可以想到一种非常合适的数据结构&a…