说明
- 谁发起的写数据
DFSClient通过调用Sender触发写操作 - 如何建立连接
NN应该知道所有的DN情况
Sender和Receiver创建TCP连接 - 如何接收请求确认,谁来拆分
写请求传送到管道中的每一个节点,最后一个返回确认
DFSClient需要做切分,依次发送 - 数据流,落盘
先落盘,再向后一个节点发送 - 如何确认结束
最后一个节点发送ACK消息
DFSClient发送空包表明发送完成
写数据流程
Receiver在Datanode的服务器是DataXceiverServer,它通过创建daemon线程DataXceiver来实现读写数据。
级联的写数据。
数据流管道
DataXceiver.writeBlock()
- 如何控制管道流程
isDatanode //是否是Datanode发起的
isClient //是不是Client发起的
inTransfer //是否是数据块触发的复制操作。比如TRANSFER_FINALIZED
啥时候是Datanode发起的?
- 如何建立管道,如何确认管道已经成功
Writeblock创建管道,通过构造mirrorOut,mirrorIn。维护一个到下游的socket
writeBlock参数
@Override
public void writeBlock(final ExtendedBlock block,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets, //所有的datanode信息
final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings,
final String storageId,
final String[] targetStorageIds) throws IOException
构造连接
mirrorNode = targets[0].getXferAddr();
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSocket = datanode.newSocket();
//向下游发送写请求
new Sender(mirrorOut)
//接收下游的ack请求
connectAck = BlockOpResponseProto.parseFrom(XX(mirrorIn))
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();
连接是否成功
ACK状态成功说明创建成功,出现异常说明连接创建失败
失败处理逻辑
关闭下游节点的socket,输入流和输出流,向reployOut汇报情况
BlockOpResponseProto.newBuilder()
.writeDelimitedTo(replyOut);
- 发送和接收数据块
BlockReceiver是在创建连接时创建的
它负责接收上游的数据块,发送给下游
从下游节点接收确认消息,转发给上游节点
blockReceiver.receiverBlock();
- 后续处理
更新副本的时间戳,副本长度信息 - 和NN的交互
数据块复制操作 通过BPOfferService和NN交互
客户端请求 response关闭数据块,关闭线程
数据块接收
blockReceiver负责接收上游的数据块
- 接收数据块
启动daemon线程,packetResponser
循环调用receiverpacket方法,接收上游包,写到下游。
关闭packetResponser线程 - 保存数据块
receiver过程中保存 - 转发数据块
receiver过程中转发 - 响应数据块
独立线程packetResponser响应
数据包接收
- 何时切分
packageReceiver.receiveNextPackage从数据流中读取一个packet,放到缓冲区中。
数据切分成packet,客户端切分吗?
- 何时写盘
receivePacket时判断是不是最后一个dn,或者sync。这时候先写盘,再response。否则立即response ack。
其他的是先转发,再写盘。(写盘需要考虑cache) - 何时转发
receivePacket之后就要转发,除非需要同步落盘。 - 何时响应ACK
一般情况下是先构造responser,转发响应,再落盘。
sync,最后一个dn,不转发,落盘,构造responser。 - 如何响应
加入到responser的队列中。
数据包的响应
BlockReceiver从下游接收数据包的ACK,然后传给上游。packetResponser独立线程实现。
- 数据结构
enqueue,队列。存放需要下游确认的数据包。 - 校验线程
packetResponser.run
循环检查块是否异常,异常中断线程
下游输入流读取响应
构造响应,发送给上游 - 移除ack
从队列中移除数据包
响应逻辑
- Datanode写操作重启
OOB异常,直接向上游响应 - ack异常
如果ack异常,可能会中断循环线程 - 最后一块的判断
直接向NN汇报。
finalizeBlock(startTime) - 向上游响应
sendAckUpstream
OOB直接返回
读取异常,那么中断线程
其他情况,构建数据包响应
上报NN
BlockReceiver.finalizeBlock()结束接收
调用datanode.closeBlock()
最终调用BlockPoolOfferService.notifyNamenodeReceivedBloce()通知NM
小结
client提交
首先构造pipeline
然后发送数据块
receivepacket处理具体的落盘和转发
packetresponser处理ack
结束后上报nn