[HDFS] 浅谈 Hadoop Distcp 工具的 InputFormat

2020-06-22

在集群迁移或者数据跨集群同步的过程中,必要少不了数据拷贝的动作,在同一个集群内,跨NameSpace的数据拷贝,你可以使用distcp,你也可以自己实现类似facebook提供的fastcopy的拷贝(社区好像没提供),那么在使用distcp工具的过程中,其中的一些参数到底影响了什么,他是一个怎样的原理,今天就和大家简单的分享下。

我们在命令行执行hadoop distcp命令回车,就会看到他所支持的很多参数,其中在命令行拷贝策略(-strategy)选项中,有两个参数可选参数:dynamic,uniformsize。在默认情况下使用的是uniformsize,含义是distcp的每个map会相对均衡去复制数据量大小的文件。

我们通过查看源码容易可以看出,除了命令行选项之外,distcp还能默认的去加载distcp-default.xml,我们可以放置到$HADOOP_CONF_DIR下,我们可以配置相对常用的参数到这个文件中。

1
2
3
4
5
6
7
public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
Configuration config = new Configuration(configuration);
config.addResource(DISTCP_DEFAULT_XML);
setConf(config);
this.inputOptions = inputOptions;
this.metaFolder = createMetaFolderPath();
}

除此之外,我们还从默认的配置当中,看到了两个参数:

1
2
3
4
5
6
7
8
9
10
11
<property>
<name>distcp.dynamic.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.lib.DynamicInputFormat</value>
<description>Implementation of dynamic input format</description>
</property>

<property>
<name>distcp.static.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.UniformSizeInputFormat</value>
<description>Implementation of static input format</description>
</property>

这个就是上述说的两种命令行策略的参数模式。通过命名可以很容易可以看出,其实这就是两个InputFormat的实现类,distcp任务(其实也就是MR任务),通过配置命令行或者参数指定使用不同的inputFormat生成不同的splits,从而实现不同的拷贝文件的逻辑。

然而,既然有两个选项,那他们的区别在哪呢?我们可以简单的看看下图的一个整体结构

DynamicInputFormat

对于DynamicInputFormat来说,有几个重要的相关的类:DynamicRecordReader,DynamicInputChunk,DynamicInputChunkContext。

对于distcp任务,会先生成一个copy-listing文件,该文件包含复制文件的列表等信息,DynamicInputFormat的getSplits方法就是将这些切分为不同chunk,然后分配到不同的task中。

在切分copy-listing文件到不同的chunk当中,其中有几个变量,numMaps和numRecords得到splitRatio的比例,也就是算出平均每个map处理多少个chunk,然后通过总的records数量,算出每个chunk中有多少条records

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int getSplitRatio(int nMaps, int nRecords, Configuration conf) {
int maxChunksIdeal = getMaxChunksIdeal(conf);
int minRecordsPerChunk = getMinRecordsPerChunk(conf);
int splitRatio = getSplitRatio(conf);

if (nMaps == 1) {
LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
return 1;
}

if (nMaps > maxChunksIdeal)
return splitRatio;

int nPickups = (int)Math.ceil((float)maxChunksIdeal/nMaps);
int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));

return nRecordsPerChunk < minRecordsPerChunk ?
splitRatio : nPickups;
}

最终会将所有的record放到不同的chunk中,在hdfs上会在对应目录行程对应的文件类似fileList.seq.chunk.0000x:

1
2
3
4
5
6
7
8
drwx------ - hadoop supergroup 0 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248
drwx------ - hadoop supergroup 0 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir
-rw-r--r-- 1 hadoop supergroup 1504 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/fileList.seq.chunk.00002
-rw-r--r-- 1 hadoop supergroup 1486 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/fileList.seq.chunk.00003
-rw-r--r-- 1 hadoop supergroup 1646 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/task_1526024399954_0017_m_000000
-rw-r--r-- 1 hadoop supergroup 1524 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/task_1526024399954_0017_m_000001
-rw-r--r-- 1 hadoop supergroup 6686 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/fileList.seq
-rw-r--r-- 1 hadoop supergroup 5906 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/fileList.seq_sorted

然后map通过DynamicRecordReader去读取数据的时候就会将对应的chunk文件修改为task(task_1526024399954_0017_m_000000)名字,所以通过上面的文件夹输入可以看出,这时有两个map正在对数据进行拷贝,执行速度快的map会继续读取未被领取的chunk进行拷贝,这就让速度快的map可以对更多的数据进行拷贝。

UniformSizeInputFormat

对于默认的UniformSizeInputFormat,他的实现方式比DynamicInputFormat简单,原理其实就是得到总的totalSizeBytes,然后除以map数量得到平均每个map处理多少数据,然后当文件的大小加起来大于nBytesPerSplit的时候,就形成一个split,这样是希望每个map处理的数据差距不会太大。

带宽控制

带宽控制主要实现在ThrottledInputStream当中,他在hadoop除了在distcp之外,也用在了NameNode之间的FSImage传输等场景上的使用,原理就是,他继承了原有的InputStream并在每次读取的时候进行每秒获取字节的速率检查(throttle),如果超过,则进行sleep:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Read bytes starting from the specified position. This requires rawStream is
* an instance of {@link PositionedReadable}.
*/
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
if (!(rawStream instanceof PositionedReadable)) {
throw new UnsupportedOperationException(
"positioned read is not supported by the internal stream");
}
throttle();
int readLen = ((PositionedReadable) rawStream).read(position, buffer,
offset, length);
if (readLen != -1) {
bytesRead += readLen;
}
return readLen;
}

private void throttle() throws IOException {
while (getBytesPerSec() > maxBytesPerSec) {
try {
Thread.sleep(SLEEP_DURATION_MS);
totalSleepTime += SLEEP_DURATION_MS;
} catch (InterruptedException e) {
throw new IOException("Thread aborted", e);
}
}
}

/**
* Getter for the read-rate from this stream, since creation.
* Calculated as bytesRead/elapsedTimeSinceStart.
* @return Read rate, in bytes/sec.
*/
public long getBytesPerSec() {
long elapsed = (System.currentTimeMillis() - startTime) / 1000;
if (elapsed == 0) {
return bytesRead;
} else {
return bytesRead / elapsed;
}
}

总结:

除了本文说的参数之外,我们平时在数据拷贝的过程中,我们还可以综合的通过控制map的数量,控制带宽阈值去减少这个过程对线上系统的影响,其中还有update参数等等,我们可以按照自身的业务需求去调整自身的参数,从而达到一个相对最优的数据拷贝效果。