HDFS学习笔记【Datanode/写数据】

news/2024/5/20 4:29:34 标签: hdfs, hadoop

说明

  1. 谁发起的写数据
    DFSClient通过调用Sender触发写操作
  2. 如何建立连接
    NN应该知道所有的DN情况
    Sender和Receiver创建TCP连接
  3. 如何接收请求确认,谁来拆分
    写请求传送到管道中的每一个节点,最后一个返回确认
    DFSClient需要做切分,依次发送
  4. 数据流,落盘
    先落盘,再向后一个节点发送
  5. 如何确认结束
    最后一个节点发送ACK消息
    DFSClient发送空包表明发送完成
    在这里插入图片描述

写数据流程

Receiver在Datanode的服务器是DataXceiverServer,它通过创建daemon线程DataXceiver来实现读写数据。
级联的写数据。

数据流管道

DataXceiver.writeBlock()

  1. 如何控制管道流程
    isDatanode //是否是Datanode发起的
    isClient //是不是Client发起的
    inTransfer //是否是数据块触发的复制操作。比如TRANSFER_FINALIZED

啥时候是Datanode发起的?

  1. 如何建立管道,如何确认管道已经成功
    在这里插入图片描述
    Writeblock创建管道,通过构造mirrorOut,mirrorIn。维护一个到下游的socket
    writeBlock参数
@Override
  public void writeBlock(final ExtendedBlock block,
      final StorageType storageType, 
      final Token<BlockTokenIdentifier> blockToken,
      final String clientname,
      final DatanodeInfo[] targets, //所有的datanode信息
      final StorageType[] targetStorageTypes,
      final DatanodeInfo srcDataNode,
      final BlockConstructionStage stage,
      final int pipelineSize,
      final long minBytesRcvd,
      final long maxBytesRcvd,
      final long latestGenerationStamp,
      DataChecksum requestedChecksum,
      CachingStrategy cachingStrategy,
      boolean allowLazyPersist,
      final boolean pinning,
      final boolean[] targetPinnings,
      final String storageId,
      final String[] targetStorageIds) throws IOException

构造连接

mirrorNode = targets[0].getXferAddr();
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSocket = datanode.newSocket();
//向下游发送写请求
new Sender(mirrorOut)
//接收下游的ack请求
connectAck = BlockOpResponseProto.parseFrom(XX(mirrorIn))
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();

连接是否成功
ACK状态成功说明创建成功,出现异常说明连接创建失败
失败处理逻辑
关闭下游节点的socket,输入流和输出流,向reployOut汇报情况

BlockOpResponseProto.newBuilder()
.writeDelimitedTo(replyOut);
  1. 发送和接收数据块
    BlockReceiver是在创建连接时创建的
    它负责接收上游的数据块,发送给下游
    从下游节点接收确认消息,转发给上游节点
blockReceiver.receiverBlock();
  1. 后续处理
    更新副本的时间戳,副本长度信息
  2. 和NN的交互
    数据块复制操作 通过BPOfferService和NN交互
    客户端请求 response关闭数据块,关闭线程

数据块接收

blockReceiver负责接收上游的数据块

  1. 接收数据块
    启动daemon线程,packetResponser
    循环调用receiverpacket方法,接收上游包,写到下游。
    关闭packetResponser线程
  2. 保存数据块
    receiver过程中保存
  3. 转发数据块
    receiver过程中转发
  4. 响应数据块
    独立线程packetResponser响应

数据包接收

在这里插入图片描述

  1. 何时切分
    packageReceiver.receiveNextPackage从数据流中读取一个packet,放到缓冲区中。

数据切分成packet,客户端切分吗?

  1. 何时写盘
    receivePacket时判断是不是最后一个dn,或者sync。这时候先写盘,再response。否则立即response ack。
    其他的是先转发,再写盘。(写盘需要考虑cache)
  2. 何时转发
    receivePacket之后就要转发,除非需要同步落盘。
  3. 何时响应ACK
    一般情况下是先构造responser,转发响应,再落盘。
    sync,最后一个dn,不转发,落盘,构造responser。
  4. 如何响应
    加入到responser的队列中。

数据包的响应

BlockReceiver从下游接收数据包的ACK,然后传给上游。packetResponser独立线程实现。

  1. 数据结构
    enqueue,队列。存放需要下游确认的数据包。
  2. 校验线程
    packetResponser.run
    循环检查块是否异常,异常中断线程
    下游输入流读取响应
    构造响应,发送给上游
  3. 移除ack
    从队列中移除数据包

响应逻辑

  1. Datanode写操作重启
    OOB异常,直接向上游响应
  2. ack异常
    如果ack异常,可能会中断循环线程
  3. 最后一块的判断
    直接向NN汇报。
    finalizeBlock(startTime)
  4. 向上游响应
    sendAckUpstream
    OOB直接返回
    读取异常,那么中断线程
    其他情况,构建数据包响应

上报NN

BlockReceiver.finalizeBlock()结束接收
调用datanode.closeBlock()
最终调用BlockPoolOfferService.notifyNamenodeReceivedBloce()通知NM

小结

client提交
首先构造pipeline
然后发送数据块
receivepacket处理具体的落盘和转发
packetresponser处理ack
结束后上报nn


http://www.niftyadmin.cn/n/197746.html

相关文章

ToBeWritten之车载信息娱乐系统 (IVI)

也许每个人出生的时候都以为这世界都是为他一个人而存在的&#xff0c;当他发现自己错的时候&#xff0c;他便开始长大 少走了弯路&#xff0c;也就错过了风景&#xff0c;无论如何&#xff0c;感谢经历 转移发布平台通知&#xff1a;将不再在CSDN博客发布新文章&#xff0c;敬…

应届生,实力已超6年,太卷了!

你好&#xff0c;我是田哥今晚上&#xff0c;给一位朋友做模拟面试&#xff0c;原本说好的90分钟左右&#xff0c;结果整了2个多小时。很多人估计也很好奇&#xff0c;我们这两个多小时聊聊什么&#xff0c;下面我给大致总结一下&#xff1a;面试技巧面试中&#xff0c;我们回答…

归并排序(递归实现)

上一次我们说了快排的其他版本&#xff0c;还有就是快排的非递归实现 这次我们就说一哈归并排序&#xff0c;归并排序也是很厉害的一种排序&#xff0c;而且归并排序的时间复杂度可以说成标准的O(n log n) 下面我们就来看一下归并排序 我们先来看一下什么是归并排序 假设我…

MCAL知识点(十二):IRQ中断系统驱动配置详解

目录 1、概述 2、EB-tresos配置 3、注意点 1、概述 IRQ即为中断模块,中断请求既可以由cpu处理,也可以由DMA模块处理。中断请求在本文档中被称为“服务请求”而不是“中断请求”,因为它们可以由任何一个服务提供者提供服务。 TC27x中的中断系统是在中断路由器模块中实现的,…

ubuntu方便使用杂项

1. greps扩展 1.1 脚本实例 #!/bin/bashgit remote - 2> /dev/null # tmp$? # echo "tmp:$tmp" if [ "x$?" "x129" ];thengit grep -n "$" -- :^noru/busybox-* :^ext :^busybox-* elsegrep -nr "$" --colorauto --…

Java EE企业级应用开发(SSM)第3章

第3章Spring Bean装配一.预习笔记 1.Spring中的Bean 在Spring中&#xff0c;一切Java类都被视为资源&#xff0c;而这些资源都被视为Bean&#xff0c;而Spring就是管理这些Bean的容器。 Bean的配置有3种方式&#xff0c;分别是XML文件配置、Java类和注解 2.基于XML的Bean装…

线程生命周期及五种状态

文章目录一、线程生命周期及五种状态1、New(初始化状态)2、Runnable(就绪状态)3、Running(运行状态)4、Blocked(阻塞状态)5、Terminated&#xff08;终止状态&#xff09;二、线程基本方法1、线程等待&#xff08;wait&#xff09;2、线程睡眠&#xff08;sleep&#xff09;3、…

如何理解Lock

显示锁 JDK层面提供了Lock锁都是通过Java提供的接口来手动解锁和释放锁的&#xff0c;所以在某种程度上&#xff0c;JDK中提供的Lock锁也叫显示锁、JDK提供的显示锁位于java.util.concurrent.locks包下&#xff0c;Lock接口的源码如下&#xff1a; public interface Lock {vo…