Flink+hadoop部署及Demo

news/2024/5/20 1:30:21 标签: hadoop, flink, hdfs

Hadoop集群高可用部署

hadoop_1">下载hadoop包地址

https://dlcdn.apache.org/hadoop/common/hadoop-3.2.4/hadoop-3.2.4.tar.gz
上传并解压到3台服务器
配置3台主机的hosts和免密登录
在这里插入图片描述

1.修改.bash_profile

vi .bash_profile
# HADOOP_HOME
export HADOOP_HOME=/apps/svr/hadoop-3.2.4
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

source .bash_profile
hadoop version查看hadoop下载版本
在这里插入图片描述

架构如下:

hadoop01hadoop02hadoop03
HDFSNameNode,DataNodeDataNodeNameNode,DataNode
YARNResouceManager,NodeManagerNodeManagerResouceManager,NodeManager

NameNode 通过rpc高可用
ResouceManager通过zk高可用
主备目录

在集群各节点创建目录

mkdir -p /apps/svr/hadoop-3.2.4/tmp
mkdir -p /apps/svr/hadoop-3.2.4/dfs/name
mkdir -p /apps/svr/hadoop-3.2.4/dfs/data
mkdir -p /apps/svr/hadoop-3.2.4/journaldata

2.配置core-site.xml

        <property>
         <name>fs.defaultFS</name>
         <value>hdfs://mycluster/</value>
        </property>
        
        <!-- 指定hadoop工作目录 -->
        <property>
          <name>hadoop.tmp.dir</name>
          <value>/apps/svr/hadoop-3.2.4/tmp</value>
        </property>
      
        <!-- 指定zookeeper集群访问地址 -->
        <property>
          <name>ha.zookeeper.quorum</name>
          <value>10.251.75.112:2181,10.251.75.113:2181,10.251.75.114:2181</value>
        </property>
      
        <!-- 配置为了解决以后其他组件连接HDFS集群  -->
        <property>
          <name>hadoop.proxyuser.bigdata.hosts</name>
          <value>*</value>
        </property>
      
        <property>
          <name>hadoop.proxyuser.bigdata.groups</name>
          <value>*</value>
        </property>

hdfssitexml_67">3.修改配置文件hdfs-site.xml

<!-- NameNode 存放的位置 -->
  <property>
        <name>dfs.namenode.name.dir</name>
        <value>/apps/svr/hadoop-3.2.4/dfs/name</value>
    </property>
  <!-- DataNode 存放的位置 -->
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/apps/svr/hadoop-3.2.4/dfs/data</value>
    </property>
<!-- 指定HDFS的nameservices为mycluster,需要跟core-site.xml中保持一致 -->
  <property>

    <name>dfs.replication</name>

    <value>2</value>

  </property>
<!-- 指定HDFS的nameservices为mycluster,需要跟core-site.xml中保持一致 -->
  <property>
          <name>dfs.nameservices</name>
          <value>mycluster</value>
  </property>
      
        <!-- 设置mycluster集群有两个namenode, 分别为nn1,nn2 -->
  <property>
          <name>dfs.ha.namenodes.mycluster</name>
          <value>nn1,nn2</value>
  </property>
      
        <!-- 配置nn1 的RPC通信地址 -->
   <property>
          <name>dfs.namenode.rpc-address.mycluster.nn1</name>
          <value>10.251.75.112:9000</value>
   </property>
        
        <!-- 配置nn1的http通信地址 -->
   <property>
          <name>dfs.namenode.http-address.mycluster.nn1</name>
          <value>10.251.75.112:50070</value>
   </property>
 
 <!-- 配置nn2 的RPC通信地址 -->
   <property>
          <name>dfs.namenode.rpc-address.mycluster.nn2</name>
          <value>10.251.75.114:9000</value>
   </property>
      
        <!-- 配置nn2的http通信地址 -->
   <property>
          <name>dfs.namenode.http-address.mycluster.nn2</name>
          <value>10.251.75.114:50070</value>
   </property>
      
        <!-- 指定JournalNode 在本地磁盘存放数据的位置 -->
   <property>
          <name>dfs.journalnode.edits.dir</name>
          <value>/apps/svr/hadoop-3.2.4/journaldata</value>
   </property>
      
        <!-- 指定NameNode的edits元数据在journalNode上的服务器 -->
   <property>
          <name>dfs.namenode.shared.edits.dir</name>
          <value>qjournal://10.251.75.112:8485;10.251.75.113:8485;10.251.75.114:8485/mycluster</value>
   </property>
 
 <!-- 开启NameNode 自动切换 -->
   <property>
          <name>dfs.ha.automatic-failover.enabled</name>
          <value>true</value>
   </property>
      
        <!-- 配置nameNode失败自动切换的实现方式 -->
   <property>
          <name>dfs.client.failover.proxy.provider.mycluster</name>
          <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
   </property>
      
        <!-- 配置隔离机制方法 -->
   <property>
          <name>dfs.ha.fencing.methods</name>
          <value>
            sshfence
            shell(/bin/true)
          </value>
   </property>
      
        <!-- 使用sshfence隔离机制时需要ssh免密登陆 -->
   <property>
          <name>dfs.ha.fencing.ssh.private-key-files</name>
          <value>/apps/.ssh/id_rsa</value>
   </property>
 
  <!-- 配置sshfence隔离机制超时时间 -->
   <property>
          <name>dfs.ha.fencing.ssh.connect-timeout</name>
          <value>30000</value>
   </property>
     
   <property>
         <name>dfs.webhdfs.enabled</name>
         <value>true</value>
   </property>

4.配置mapred-site.xml

    <!-- 指定mapreduce运算时资源调度为 yarn 模式 -->
       <property>
          <name>mapreduce.framework.name</name>
         <value>yarn</value>
        </property>
      
        <!-- 配置mapreduce历史服务器地址 端口号 -->
        <property>
          <name>mapreduce.jobhistory.address</name>
          <value>10.251.75.112:10020</value>
        </property>
      
        <!-- 配置mapreduce历史服务器WEB访问地址 -->
        <property>
          <name>mapreduce.jobhistory.webapp.address</name>
          <value>10.251.75.112:19888</value>
        </property>

5.配置yarn-site.xml

<!-- Site specific YARN configuration properties -->
 
 <!-- 开启高可用 -->
  <property>
    <name>yarn.resourcemanager.ha.enabled</name>
    <value>true</value>
  </property>
 
  <!-- 指定ResourceManager的标识:yrc -->
  <property>
    <name>yarn.resourcemanager.cluster-id</name>
    <value>yrc</value>
  </property>
 
  <!-- 指定RM的名字-->
  <property>
    <name>yarn.resourcemanager.ha.rm-ids</name>
    <value>rm1,rm2</value>
  </property>
 
 <!-- 指定rm1服务器 -->
  <property>
    <name>yarn.resourcemanager.hostname.rm1</name>
    <value>10.251.75.112</value>
  </property>
 
  <!-- 指定rm2服务器 -->
  <property>
    <name>yarn.resourcemanager.hostname.rm2</name>
    <value>10.251.75.114</value>
  </property>
 
  <!-- 指定rm 被管理的zk 地址 -->
  <property>
    <name>yarn.resourcemanager.zk-address</name>
    <value>10.251.75.112:2181,10.251.75.113:2181,10.251.75.114:2181</value>
  </property>
 
<!-- 运行mapreduce任务需要使用的服务 -->
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
 
  <!-- 开启yarn集群的日志聚合功能 -->
  <property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
  </property>
 
  <!-- 设置日志保存时间 -->
  <property>
    <name>yarn.log-aggregation.retain-seconds</name>
    <value>86400</value>
  </property>
 
  <!-- 启动rm自动恢复功能 -->
  <property>
    <name>yarn.resourcemanager.recovery.enabled</name>
    <value>true</value>
  </property>
 
 <!-- 制定rm 状态信息存储在zookeeper集群上 -->
  <property>
    <name>yarn.resourcemanager.store.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
  </property>
  <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
  <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
  <property>
          <name>yarn.nodemanager.pmem-check-enabled</name>
          <value>false</value>
  </property>
 <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
  <property>
          <name>yarn.nodemanager.vmem-check-enabled</name>
          <value>false</value>
  </property>

6.修改文件workers文件

10.251.75.112
10.251.75.113
10.251.75.114

hadoopenvsh_281">7.修改hadoop-env.sh

配置JAVA_HOME和主机上的jdk环境变量一样

hdfssitexmlmapredsitexmlyarnsitexml2_284">6.将core-site.xml,hdfs-site.xml,mapred-site.xml,yarn-site.xml传到另外2台主机

scp core-site.xml apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop
scp hdfs-site.xml apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop
scp mapred-site.xml apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop
scp yarn-site.xml apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop
scp workers apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop
scp hadoop-env.sh apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop

scp core-site.xml apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop
scp hdfs-site.xml apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop
scp mapred-site.xml apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop
scp yarn-site.xml apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop
scp workers apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop
scp hadoop-env.sh apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop

启动集群各个节点监控NameNode的管理日志的JournalNode
在各节点执行
hdfs --daemon start journalnode
在这里插入图片描述
在node01上格式化namenode
hdfs namenode -format
在node1上启动namenode
hdfs --daemon start namenode
在这里插入图片描述
同步nn2 及node03上同步nn1及node01的源数据
hdfs namenode -bootstrapStandby
在node1节点上格式化ZKFC
hdfs zkfc -formatZK

node1节点上启动HDFS和Yarn
start-dfs.sh
在这里插入图片描述
查看zk,namenode在node03上
在这里插入图片描述

查看hdfs管理页面

http://10.251.75.112:50070/
http://10.251.75.114:50070/
在这里插入图片描述
在这里插入图片描述

start-yarn.sh
进程在node01和node03上
在这里插入图片描述

查看HA zk节点数据 resourceMaster在rm2上
在这里插入图片描述
启动后查看yarn页面 node03页面
http://10.251.75.114:8088/cluster
在这里插入图片描述

Flink部署

下载包地址

flink下载地址
https://www.apache.org/dyn/closer.lua/flink/flink-1.16.1/flink-1.16.1-bin-scala_2.12.tgz

节点服务器hadoop01hadoop02hadoop03
角色JobManager

解压后
配置source

export FLINK_HOME=/apps/svr/flink-1.16.1
export PATH=export PATH=$FLINK_HOME/bin:$PATH

source .bash_profile

Flink Per-Job模式

Per-Job 模式是指每个Flink Job都是一组独立集群,即有自己的JobManager和TaskManager。提交任务后,YARN首先会为该任务分派一个AM容器,该容器内会运行一个JobManager进程,之后JobManager会向Yarn申请运行TaskManager所需要的container,container的数量和container的配置(CPU、内存)会基于客户端的需求来确定,当JobManager和TaskManager进程都拉起来之后,则会执行相应的Flink Job。这样,与Standalone以及yarn-session不同的是,我们不需要准备一个常驻的Flink 集群进程,只需要保证本地有一个Flink环境即可,Flink集群是在我们提交Job时动态创建出来的。
这种方式的优势在于每个任务独立运行,相互不会收到干扰,这个不干扰还包括了运行日志也是隔离的。另外,借助YARN集群的能力,提供了对Flink Job的全方位保障,包括JobManager的高可用,TaskManager的恢复,然后再结合Flink自身提供的健壮性,包括检查点、保存点机制,从而能够很好的保障Flink Job的高可用和健壮性。劣势的话,就是保证了资源隔离的同时也占用了更多的资源,因为每个Job都需要一个JobManager,每个JobManager都会消耗一个AM进程资源。

-yn,--container <arg> 表示分配容器的数量,也就是 TaskManager 的数量。
-d,--detached:设置在后台运行。
-yjm,--jobManagerMemory<arg>:设置 JobManager 的内存,单位是 MB。
-ytm,--taskManagerMemory<arg>:设置每个 TaskManager 的内存,单位是 MB。
-ynm,--name:给当前 Flink application 在 Yarn 上指定名称。
-yq,--query:显示 yarn 中可用的资源(内存、cpu 核数)
-yqu,--queue<arg> :指定 yarn 资源队列
-ys,--slots<arg> :每个 TaskManager 使用的 Slot 数量。
-yz,--zookeeperNamespace<arg>:针对 HA 模式在 Zookeeper 上创建 NameSpace
-yid,--applicationID<yarnAppId> : 指定 Yarn 集群上的任务 ID,附着到一个后台独 立运行的 Yarn Session 中。

使用flink run -m yarn-cluster --help 可查看可用命令

在这里插入图片描述

Demo01

flink上执行命令跑demo
flink run -m yarn-cluster -t yarn-per-job -yjm 1024 -ytm 1024 /apps/svr/flink-1.16.1/examples/streaming/WordCount.jar
任务执行完成并且
在这里插入图片描述
通过yarn页面查看到任务已完成,并且hdfs上有记录
在这里插入图片描述
在这里插入图片描述

Demo02

登录10.251.75.112 nc -lk 9999开启服务端端口监听
nc -kl 9999
在这里插入图片描述
flink run -m yarn-cluster -t yarn-per-job -yjm 1024 -ytm 1024 /apps/svr/flink-1.16.1/examples/streaming/SocketWindowWordCount.jar --hostname 10.251.75.112 --port 9999
在这里插入图片描述
打开flink集群管理页面
在这里插入图片描述
在服务端nc窗口输入hello word即可看到flink管理页面有对应输出
在这里插入图片描述

flink taskmanager即可看到对应输出
在这里插入图片描述

将任务关闭

查看管理页面
在这里插入图片描述
在这里插入图片描述
yarn application -kill application_1684908112422_0008
在这里插入图片描述
查看管理页面任务已被关闭
在这里插入图片描述


http://www.niftyadmin.cn/n/359389.html

相关文章

SeaTunnel本地运行以及kafka发送到redis说明

下载 Seatunnel2.3.1源码 Idea中的目录结构 编译 通过maven进行代码编译 编译命令 mvn clean package -pl seatunnel-dist -am -Dmaven.test.skiptrue 编译单个模块命令 mvn clean package -pl seatunnel-examples/seatunnel-engine-examples -am -Dmaven.test.skiptrue …

Linux - 第18节 - 网络基础(传输层)

1.传输层 在学习HTTP等应用层协议时&#xff0c;为了便于理解&#xff0c;可以简单的认为HTTP协议是将请求和响应直接发送到了网络当中。但实际应用层需要先将数据交给传输层&#xff0c;由传输层对数据做进一步处理后再将数据继续向下进行交付&#xff0c;该过程贯穿整个网络协…

使用自己的数据利用pytorch搭建自编码器提取特征

使用自己的数据利用pytorch搭建自编码器提取特征 1、定义编码器&#xff08;Encoder&#xff09;和解码器&#xff08;Decoder&#xff09;模型2、定义自编码器&#xff08;Autoencoder&#xff09;模型3、定义自定义数据集&#xff08;MyDataset&#xff09;4、参数初始化5、加…

Redis7实战加面试题-基础篇(Redis复制(replica),Redis哨兵(sentinel),Redis集群(cluster))

Redis复制(replica) 就是主从复制&#xff0c;master以写为主&#xff0c;Slave以读为主。当master数据变化的时候&#xff0c;自动将新的数据异步同步到其它slave数据库。 能干嘛&#xff1a;读写分离&#xff0c;容灾恢复&#xff0c;数据备份&#xff0c;水平扩容支撑高并发…

《论文阅读》验证离散提示模板的鲁棒性 EACL2023

《论文阅读》验证离散提示模板的鲁棒性 EACL2023 前言动机手工 prompt 和 离散 prompt质疑实验设置模型数据集评估指标实验验证实验1: 训练数据集大小实验2: 离散 prompt 的泛化能力实验3: 干扰之字符重新排序实验4: 干扰之删除字符实验5: 干扰之对抗性干扰前言 你是否也对于理…

Tensorflow2基础代码实战系列之CNN文本分类实战

深度学习框架Tensorflow2系列 注&#xff1a;大家觉得博客好的话&#xff0c;别忘了点赞收藏呀&#xff0c;本人每周都会更新关于人工智能和大数据相关的内容&#xff0c;内容多为原创&#xff0c;Python Java Scala SQL 代码&#xff0c;CV NLP 推荐系统等&#xff0c;Spark …

STM8、STM8S003F3P6 双机串口通信(片上串口)

背景 这里为什么要写串口通信&#xff0c;因为实际项目上使用了串口&#xff0c;STM8S003F3P6的串口简单啊&#xff0c;不值得一提。本文写的串口确实简单&#xff0c;因为这里我想先从简单的写起来&#xff0c;慢慢的把难的引出来。这里呢&#xff0c;做个提纲说明&#xff0…

ASEMI代理长电MCR100-6可控硅的性能与应用分析

编辑-Z 本文主要介绍了新型MCR100-6晶闸管的性能与应用。首先&#xff0c;从晶闸管的基本原理和结构出发&#xff0c;分析了MCR100-6晶闸管的性能特点&#xff1b;其次&#xff0c;探讨了MCR100-6晶闸管在各种电子电路中的应用&#xff1b;最后&#xff0c;对MCR100-6晶闸管的…