HDFS read解析(一)之Open文件流

本篇主要记录下HDFS读取文件的流程,流程图如下:
read流程图

如上图所示,

  1. HDFS Client通过FileSystem.get()方法实例化一个FileSystem对象
  2. 调用FileSystem.open打开一个文件的数据流FSDataInputStream,open中会用rpc方法getBlockLocations得到block的locations信息,默认会先得到prefetchSize(conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 10 * defaultBlockSize))大小文件的信息。
  3. HDFS Client通过FSDataInputStream流读取离客户端最近的dn上的block,在第2步中得到的block location信息已经将block的副本按照离客户端的网络拓扑距离进行了排序,此过程是在nn端完成的。
  4. 读取完当前block的数据后,关闭与当前的DataNode连接,并为读取下一个block寻找最佳的DataNode;
  5. 当读完列表的block后,且文件读取还没有结束,客户端会继续向Namenode获取下一批的block列表。
  6. 读取完一个block都会进行checksum验证,如果读取datanode时出现错误,客户端会通知Namenode,然后再从下一个拥有该block拷贝的datanode继续读

相关概念

  • inode
    文件数据都储存在”块”中,那么很显然,我们还必须找到一个地方储存文件的元信息,比如文件的创建者、文件的创建日期、文件的大小等等。这种储存文件元信息的区域就叫做inode,中文译名为”索引节点”。每一个文件都有对应的inode,里面包含了与该文件有关的一些信息。
    在HDFS中inode也就是一个文件在内存中的一个抽象,保存了该文件的一些元数据信息。
    INODE在代码中是一个最底层的抽象类
  • INodeFile
    文件节点类,继承自INode,表示一个文件,其中有个重要的属性private BlockInfo[] blocks,blocks中存储的是该文件的block信息BlockInfo,其中每个block(BlockInfo)是一个具有3个元素的数组(triplets),也就是三个指针域,大小为3replicas,其中replicas是Block副本数量。triplets包含的信息:
    triplets[3
    i]:Block所在的DataNode A;(DatanodeStorageInfo对象)
    triplets[3i+1]:该DataNode A上前一个Block;(指向前一个block的BlockInfo对象引用)
    triplets[3
    i+2]:该DataNode A上后一个Block;(指向后一个block的BlockInfo对象引用)
    其中i表示的是Block的第i个副本,i取值[0,replicas)。
  • INodeDirectory
    文件目录类,也是继承自INode.他的孩子中是文件集也可能是目录
  • INodeDirectoryWithQuota
    有配额限制的目录,这是为了适应HDFS中的配额策略。
  • INodeFileUnderConstruction
    处于构建状态的文件类,可以从INodeFile中转化而来。
  • FSNamesystem中相关的属性

HDFS Read之打开文件流

HDFS读取文件内容跟java读取文件内容类似,都需要先打开一个文件流,HDFS是通过FileSystem对象打开文件流的,代码流程为通过FileSystem.get(conf)得到一个FileSystem对象,然后调用open(Path)打开一个FSDataInputStream流,看下open代码:

1
2
3
4
public FSDataInputStream open(Path f) throws IOException {
// io.file.buffer.size The size of buffer for use in sequence files.
return open(f, getConf().getInt("io.file.buffer.size", 4096));
}

FileSystem是一个抽象类,将具体的open操作留给子类实现,例如DistributedFileSystem、WebHdfsFileSystem等,不同的文件系统具有不同打开文件的行为,我们以DistributedFileSystem为例,open方法实现,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// buffersize 决定buffersize大小的数据在read/write时被缓存
public FSDataInputStream open(Path f, final int bufferSize)
throws IOException {
statistics.incrementReadOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}
// FileSystemLinkResolver.resolve
public T resolve(final FileSystem filesys, final Path path)
throws IOException {
int count = 0;
T in = null;
Path p = path;
// Assumes path belongs to this FileSystem.
// Callers validate this by passing paths through FileSystem#checkPath
FileSystem fs = filesys;
for (boolean isLink = true; isLink;) {
try {
in = doCall(p);
isLink = false;
} catch (UnresolvedLinkException e) {
...
// Have to call next if it's a new FS
if (!fs.equals(filesys)) {
return next(fs, p);
}
// Else, we keep resolving with this filesystem
}
}
// Successful call, path was fully resolved
return in;
}

DistributedFileSystem.open()中new一个FileSystemLinkResolver的匿名类,在resolve中调用在匿名类中重写的doCall()方法,如果doCall抛出UnresolvedLinkException异常,被resolve捕获调用next(),进行再次打开。

doCall()中调用dfs.open(getPathName(p), bufferSize, verifyChecksum)返回一个DFSInputStream对象,然后再通过dfs.createWrappedInputStream(dfsis)包装一个HdfsDataInputStream对象返回给FSDataInputStream,FSDataInputStream是HdfsDataInputStream的父类,这样就通过FileSystem.open(path)打开了一个文件流。

doCall中dfs是FSDataInputStream的成员变量DFSClient,其open方法中new出一个DFSInputStream实例,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
throws IOException, UnresolvedLinkException {
checkOpen();
// Get block info from namenode
return new DFSInputStream(this, src, buffersize, verifyChecksum);
}
// DFSInputStream 构造方法
DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum
) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize;
this.src = src;
// Client的缓存策略,CachingStrategy(readDropBehind, readahead);
// Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
// null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
// Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
// null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
this.cachingStrategy =
dfsClient.getDefaultReadCachingStrategy();
openInfo();
}
synchronized void openInfo() throws IOException, UnresolvedLinkException {
// fetchLocatedBlocksAndGetLastBlockLength有两个功能,
// 一个是对locatedBlocks赋值
// 另一个是返回最后一个未构造完成的block的长度
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
// 如果获取失败,则进行重试,默认是3次
// dfs.client.retry.times.get-last-block-length 设置重试次数
// 当集群重启时,dn可能没来及汇报block,此时可能存在部分block的location无法从nn上读出
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length. Lets
// retry for 3 times to get the length.
if (lastBlockBeingWrittenLength == -1) {
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
// 等待dfs.client.retry.interval-ms.get-last-block-length ms之后重试
// 默认是4000ms
waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
} else {
break;
}
retriesForLastBlockLength--;
}
if (retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}

openInfo里只调用了一个方法fetchLocatedBlocksAndGetLastBlockLength,openInfo主要是保证能够正确获取写入的起始地址lastBlockBeingWrittenLength,因为当集群重启时,dn可能没来及汇报block,此时可能存在部分block的location无法从nn上读出,所以当lastBlockBeingWrittenLength赋值失败时,等待4000ms然后进行重试,默认重试次数为3次,由dfs.client.retry.times.get-last-block-length控制。主要逻辑在方法fetchLocatedBlocksAndGetLastBlockLength中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
// DFSClient 通过rpc调用FSNamespace.getLocatedBlocks得到部分block的locations
final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
...
if (locatedBlocks != null) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
}
locatedBlocks = newInfo;
long lastBlockBeingWrittenLength = 0;
// 判断此src是否有正在构建的block,有则返回当前的长度,没有则返回0,-1表示失败
if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
if (last != null) {
if (last.getLocations().length == 0) {
if (last.getBlockSize() == 0) {
// if the length is zero, then no data has been written to
// datanode. So no need to wait for the locations.
return 0;
}
return -1;
}
final long len = readBlockLength(last);
last.getBlock().setNumBytes(len);
lastBlockBeingWrittenLength = len;
}
}
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
currentNode = null;
return lastBlockBeingWrittenLength;
}

fetchLocatedBlocksAndGetLastBlockLength主要是通过DFSClient.getLocatedBlocks得到src的LocatedBlocks。

LocatedBlocks包含blocks locations信息和src文件长度。LocatedBlocks中重要的属性有:

1
2
3
4
5
6
7
8
9
10
11
12
// 存储文件的长度
private final long fileLength;
// 存储文件中block与dn的信息和溢写block的元数据
private final List<LocatedBlock> blocks; // array of blocks with prioritized locations
// 表示src是否有正在创建的block
private final boolean underConstruction;
// src的最后一个block
private LocatedBlock lastLocatedBlock = null;
// 标识最后一个block是否完成
private boolean isLastBlockComplete = false;
//
private FileEncryptionInfo fileEncryptionInfo = null;

LocatedBlocks中有个LocatedBlock的list,LocatedBlock中存储的是block与其副本所在dn的信息和block的元数据信息,比较重要的属性有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// file 的某个block
private final ExtendedBlock b;
// block中第一个字节在file中的偏移量
private long offset;
// block的副本所在dn数组
private final DatanodeInfo[] locs;
// 每个副本所在的磁盘id
private final String[] storageIDs;
// 每个副本所在磁盘的类型
private final StorageType[] storageTypes;
// corrupt flag is true if all of the replicas of a block are corrupt.
// else false. If block has few corrupt replicas, they are filtered and
// their locations are not part of this object
private boolean corrupt;
private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
/**
* List of cached datanode locations
*/
private DatanodeInfo[] cachedLocs;

DFSClient.getLocatedBlocks是通过RPC得到结果的,调用顺序为
dfsClient.getLocatedBlocks(src, 0) -> getLocatedBlocks(src, start, dfsClientConf.prefetchSize) -> callGetBlockLocations(namenode, src, start, length) -> namenode.getBlockLocations(src, start, length) -> NameNodeRpcServer.getBlockLocations(src, start, length) -> namesystem.getBlockLocations(getClientMachine(), src, offset, length)

在此过程中涉及到一个变量prefetchSize,由属性dfs.client.read.prefetch.size控制,默认值是10 * defaultBlockSize表示在打开一个文件流时默认情况下将10个block的信息放入内存中,对读的一种优化

下面看下FSNamesystem.getBlockLocations的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
LocatedBlocks getBlockLocations(String clientMachine, String src,
long offset, long length) throws AccessControlException,
FileNotFoundException, UnresolvedLinkException, IOException {
// 得到某段长度区间的block信息
LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true, true);
// 对block的副本所在的dn按照到client的网络拓扑距离进行排序
if (blocks != null) {
blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
blocks.getLocatedBlocks());
// lastBlock is not part of getLocatedBlocks(), might need to sort it too
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
if (lastBlock != null) {
ArrayList<LocatedBlock> lastBlockList =
Lists.newArrayListWithCapacity(1);
lastBlockList.add(lastBlock);
blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
lastBlockList);
}
}
// 将按照网络拓扑排好序的block返回给client端
return blocks;
}

getBlockLocations又调用getBlockLocationsInt()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private LocatedBlocks getBlockLocationsInt(String src, long offset,
long length, boolean doAccessTime, boolean needBlockToken,
boolean checkSafeMode)
throws FileNotFoundException, UnresolvedLinkException, IOException {
// 检验offset length是否合法,不合法抛出异常
final LocatedBlocks ret = getBlockLocationsUpdateTimes(src,
offset, length, doAccessTime, needBlockToken);
logAuditEvent(true, "open", src);
if (checkSafeMode && isInSafeMode()) {
for (LocatedBlock b : ret.getLocatedBlocks()) {
// if safemode & no block locations yet then throw safemodeException
if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
SafeModeException se = new SafeModeException(
"Zero blocklocations for " + src, safeMode);
if (haEnabled && haContext != null &&
haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
throw new RetriableException(se);
} else {
throw se;
}
}
}
}
return ret;
}

getBlockLocationsInt主要是用来检查SafeMode模式下,当block的locations信息为null时,抛出safemodeException。getBlockLocationsInt中调用getBlockLocationsUpdateTimes得到LocatedBlocks,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
private LocatedBlocks getBlockLocationsUpdateTimes(final String srcArg,
long offset, long length, boolean doAccessTime, boolean needBlockToken)
throws FileNotFoundException,
UnresolvedLinkException, IOException {
String src = srcArg;
FSPermissionChecker pc = getPermissionChecker();
// 此处的pathComponents在普通src(/user/hadoop/xx)中为null
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
// 是否需要更新访问时间,如果超过了访问精度则进行访问时间的更新,
// 更新时需要拿到writeLock
for (int attempt = 0; attempt < 2; attempt++) {
boolean isReadOp = (attempt == 0);
if (isReadOp) { // first attempt is with readlock
checkOperation(OperationCategory.READ);
// 可重入锁
readLock();
} else { // second attempt is with write lock
checkOperation(OperationCategory.WRITE);
writeLock(); // writelock is needed to set accesstime
}
try {
src = resolvePath(src, pathComponents);
if (isReadOp) {
checkOperation(OperationCategory.READ);
} else {
checkOperation(OperationCategory.WRITE);
}
if (isPermissionEnabled) {
checkPathAccess(pc, src, FsAction.READ);
}
// if the namenode is in safemode, then do not update access time
if (isInSafeMode()) {
doAccessTime = false;
}
// 将src path解析为INodes infomation
final INodesInPath iip = dir.getINodesInPath(src, true);
final INode[] inodes = iip.getINodes();
// 将path中的文件名INode转化为INodeFile
final INodeFile inode = INodeFile.valueOf(
inodes[inodes.length - 1], src);
if (isPermissionEnabled) {
checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
}
if (!iip.isSnapshot() //snapshots are readonly, so don't update atime.
&& doAccessTime && isAccessTimeSupported()) {
final long now = now();
// 当前的访问时间与上次访问时间超过了访问时间精度
// 进行访问时间的更新
if (now > inode.getAccessTime() + getAccessTimePrecision()) {
// if we have to set access time but we only have the readlock, then
// restart this entire operation with the writeLock.
if (isReadOp) {
continue;
}
boolean changed = dir.setTimes(inode, -1, now, false,
iip.getLatestSnapshotId());
if (changed) {
getEditLog().logTimes(src, -1, now);
}
}
}
// 当前src path不是快照则计算文件的长度时不包括最后一个正在构建的block
final long fileSize = iip.isSnapshot() ?
inode.computeFileSize(iip.getPathSnapshotId())
: inode.computeFileSizeNotIncludingLastUcBlock();
boolean isUc = inode.isUnderConstruction();
if (iip.isSnapshot()) {
// if src indicates a snapshot file, we need to make sure the returned
// blocks do not exceed the size of the snapshot file.
length = Math.min(length, fileSize - offset);
isUc = false;
}
final FileEncryptionInfo feInfo =
FSDirectory.isReservedRawName(srcArg) ?
null : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(),
iip);
// INodeFile的blocks通过blockManager创建LocatedBlocks
final LocatedBlocks blocks =
blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
// Set caching information for the located blocks.
for (LocatedBlock lb: blocks.getLocatedBlocks()) {
cacheManager.setCachedLocations(lb);
}
return blocks;
} finally {
if (isReadOp) {
readUnlock();
} else {
writeUnlock();
}
}
}
return null; // can never reach here
}

getBlockLocationsUpdateTimes中有个for循环,初次进入循环attempt为0,得到readLock,如果支持访问时间的设置,则当前的访问时间与上次的访问时间差超过了HDFS设置的访问时间精度(dfs.namenode.accesstime.precision),则进行访问时间的更新,时间的更新需要得到writeLock,如果当前是readLock则continue,进行第二次循环拿到writeLock进行更新。(可以根据file的访问时间判断数据的冷热

通过(FSDirectory)dir.getINodesInPath得到src的INode信息,然后将INode转化为INodeFile(INodeFile对应一个文件)。然后通过blockManager.createLocatedBlocks将INodeFile中的Block转化为LocatedBlocks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
final long fileSizeExcludeBlocksUnderConstruction,
final boolean isFileUnderConstruction, final long offset,
final long length, final boolean needBlockToken,
final boolean inSnapshot, FileEncryptionInfo feInfo)
throws IOException {
assert namesystem.hasReadLock();
if (blocks == null) {
return null;
} else if (blocks.length == 0) {
return new LocatedBlocks(0, isFileUnderConstruction,
Collections.<LocatedBlock>emptyList(), null, false, feInfo);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
}
final AccessMode mode = needBlockToken? AccessMode.READ: null;
// 从blocks中将offset到length之间的block创建为LocateBlock
// 这里是LocateBlock不是LocateBlocks,注意区分,LocateBlocks里有个LocateBlock的list
final List<LocatedBlock> locatedblocks = createLocatedBlockList(
blocks, offset, length, Integer.MAX_VALUE, mode);
final LocatedBlock lastlb;
final boolean isComplete;
if (!inSnapshot) {
final BlockInfo last = blocks[blocks.length - 1];
final long lastPos = last.isComplete()?
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
: fileSizeExcludeBlocksUnderConstruction;
// 将文件的最后一个block创建为LocateBlock
// 这里是LocateBlock不是LocateBlocks,注意区分,LocateBlocks里有个LocateBlock的list
lastlb = createLocatedBlock(last, lastPos, mode);
isComplete = last.isComplete();
} else {
lastlb = createLocatedBlock(blocks,
fileSizeExcludeBlocksUnderConstruction, mode);
isComplete = true;
}
// 实例化LocateBlocks对象
return new LocatedBlocks(
fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
locatedblocks, lastlb, isComplete, feInfo);
}
}

createLocatedBlocks中有createLocatedBlockListcreateLocatedBlock两个方法,看名字就可以猜到这两个方法一个返回已LocatedBlock List,一个只返回一个LocatedBlock。其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
final long offset, final long length, final int nrBlocksToReturn,
final AccessMode mode) throws IOException {
int curBlk = 0;
long curPos = 0, blkSize = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
// 找到offset所在的block
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
blkSize = blocks[curBlk].getNumBytes();
assert blkSize > 0 : "Block of size 0";
if (curPos + blkSize > offset) {
break;
}
curPos += blkSize;
}
if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
return Collections.<LocatedBlock>emptyList();
// 此处的length默认是128*10,由属性dfs.client.read.prefetch.size控制
long endOff = offset + length;
List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
do {
// 调用createLocatedBlock create LocatedBlock
results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
&& curBlk < blocks.length
&& results.size() < nrBlocksToReturn);
return results;
}

createLocatedBlockList内部也是调用createLocatedBlock得到LocatedBlock然后放入list中,createLocatedBlock源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos,
final BlockTokenSecretManager.AccessMode mode) throws IOException {
final LocatedBlock lb = createLocatedBlock(blk, pos);
if (mode != null) {
setBlockToken(lb, mode);
}
return lb;
}
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
) throws IOException {
// blk 正在构建
if (blk instanceof BlockInfoUnderConstruction) {
if (blk.isComplete()) {
throw new IOException(
"blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
+ ", blk=" + blk);
}
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return new LocatedBlock(eb, storages, pos, false);
}
// get block locations
// 计算src中Block的副本中无法读取该Block的Datanode节点数
final int numCorruptNodes = countNodes(blk).corruptReplicas();
// 计算FSNamesystem在内存中维护的Block=>Datanode映射的列表中,无法读取该Block的Datanode节点数
final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
if (numCorruptNodes != numCorruptReplicas) {
LOG.warn("Inconsistent number of corrupt replicas for "
+ blk + " blockMap has " + numCorruptNodes
+ " but corrupt replicas map has " + numCorruptReplicas);
}
// 计算src中Block存储的Datanode节点数
final int numNodes = blocksMap.numNodes(blk);
// 如果损坏的数和副本数一样,则标识此block为corrupt
final boolean isCorrupt = numCorruptNodes == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
int j = 0;
if (numMachines > 0) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
// 如果某个block的副本corrupt,则不将此副本放入machines中
// 只有在block是corrupt时,才将损坏的副本放入machines
if (isCorrupt || (!replicaCorrupt))
machines[j++] = storage;
}
}
assert j == machines.length :
"isCorrupt: " + isCorrupt +
" numMachines: " + numMachines +
" numNodes: " + numNodes +
" numCorrupt: " + numCorruptNodes +
" numCorruptRepls: " + numCorruptReplicas;
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
// 实例化一个LocatedBlock,isCorrupt标识block是否corrupt
return new LocatedBlock(eb, machines, pos, isCorrupt);
}

上述方法是在BlockManager类中,类中有两个属性blocksMap和corruptReplicas,简单介绍下这两个属性的作用:
blocksMapBlocksMap的实例,其代表了Block→{INode、datanodes}的映射。具体代表了每一个Block在哪一个DataNode上存储,其内部有个BlockInfo对象。
CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(),corruptReplicas是CorruptReplicasMap的一个实例。corruptReplicas保存了文件系统中所有的损坏block。注意,只有一个block的所有备份存储都损坏才认为该block是损坏的。其保存的形式如下,一个block和所有保存该block的datanode,Block→TreeSet

得到LocatedBlocks之后,回到FSNamesystem.getBlockLocations中对block的locations根据client的网络拓扑距离进行排序,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void sortLocatedBlocks(final String targethost,
final List<LocatedBlock> locatedblocks) {
//sort the blocks
// As it is possible for the separation of node manager and datanode,
// here we should get node but not datanode only .
Node client = getDatanodeByHost(targethost);
if (client == null) {
List<String> hosts = new ArrayList<String> (1);
hosts.add(targethost);
String rName = dnsToSwitchMapping.resolve(hosts).get(0);
if (rName != null)
client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
}
Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
new DFSUtil.DecomStaleComparator(staleInterval) :
DFSUtil.DECOM_COMPARATOR;
for (LocatedBlock b : locatedblocks) {
DatanodeInfo[] di = b.getLocations();
// 将下线的或者过时的dn放到最后
Arrays.sort(di, comparator);
int lastActiveIndex = di.length - 1;
while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) {
--lastActiveIndex;
}
int activeLen = lastActiveIndex + 1;
networktopology.sortByDistance(client, b.getLocations(), activeLen);
}
}

对block的locations进行排序分为两步,首先对locations中的状态进行排序,将下线的或者过时的dn排在最后。
Block所在的Datanode列表中,如果其中某个Datanode在指定的时间内没有向Namenode发送heartbeat(默认由常量DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT定义,默认值为30s),则该Datanode的状态即为STALE,具有该状态的Datanode对应的Block排在后面

然后按照网络拓扑进行排序,方法为sortByDistance:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
/** Sort weights for the nodes array */
int[] weights = new int[activeLen];
for (int i=0; i<activeLen; i++) {
// 0 is local, 1 is same rack, 2 is off rack
weights[i] = getWeight(reader, nodes[i]);
}
// Add weight/node pairs to a TreeMap to sort
// TreeMap 按照key进行排序
TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
for (int i=0; i<activeLen; i++) {
int weight = weights[i];
Node node = nodes[i];
List<Node> list = tree.get(weight);
if (list == null) {
list = Lists.newArrayListWithExpectedSize(1);
tree.put(weight, list);
}
list.add(node);
}
int idx = 0;
for (List<Node> list: tree.values()) {
if (list != null) {
// 对TreeMap中value进行shuffle,以免产生热点
// 默认情况下list里只有一个node,当副本数增多,
// 可能在同一rack上有两个副本,则此时list中就有两个node
Collections.shuffle(list, r);
for (Node n: list) {
nodes[idx] = n;
idx++;
}
}
}
Preconditions.checkState(idx == activeLen,
"Sorted the wrong number of nodes!");
}

最后将排好序的LocatedBlocks返回给客户端,则open file的过程结束。下面就是read file阶段

您的肯定,是我装逼的最大的动力!