[YARN] Reduce 一直被 kill 分析

2016-07-27

现象描述:

之前遇到有些任务在队列满时,reduce一直被kill,任务运行了很久,导致进入了死循环的状态,以下是原因分析

我们先看任务:

1
2016-05-31 19:29:46,382 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from NEW to UNASSIGNED

到分配

1
2
2016-05-31 19:39:50,999 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Assigned container container_e08_1464351197737_140388_01_001657 to attempt_1464351197737_140388_m_000836_0
2016-05-31 19:39:51,002 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from UNASSIGNED to ASSIGNED

启动:

1
2016-05-31 19:39:51,002 INFO [ContainerLauncher #216] org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl: Processing the event EventType: CONTAINER_REMOTE_LAUNCH for container container_e08_1464351197737_140388_01_001657 taskAttempt attempt_1464351197737_140388_m_000836_0

开始跑:

1
2016-05-31 19:39:51,014 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from ASSIGNED to RUNNING

完成清理相关信息,此时container已经退出,只是AM保存这个Task在这个机器的状态是success的:

1
2
3
2016-05-31 19:40:00,511 INFO [IPC Server handler 20 on 34613] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Done acknowledgement from attempt_1464351197737_140388_m_000836_0
2016-05-31 19:40:00,512 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from RUNNING to SUCCESS_CONTAINER_CLEANUP
2016-05-31 19:40:00,518 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from SUCCESS_CONTAINER_CLEANUP to SUCCEED

因为该节点是不可用的状态,打印了日志,原理是,如果该节点不可用则把该节点的TaskAttemp给kill了

1
2016-05-31 21:09:35,528 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl: TaskAttempt killed because it ran on unusable node BJM6-Decare-14057.hadoop.jd.local:50086. AttemptId:attempt_1464351197737_140388_m_000836_0

但是该Task是SUCCESS状态:

1
2016-05-31 21:09:35,529 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: attempt_1464351197737_140388_m_000836_0 TaskAttempt Transitioned from SUCCEEDED to KILLED

总的来说这个task完成了他自身的所有流程,程序也跑完了,就是因为NM汇报它自身为不健康节点导致AM把这个节点的所有Task给KILL了所以有SUCCEEDED to KILLED

每个Task完成都会触发AttemptSucceededTransition,会发送事件到AM中,触发TaskCompletedTransition,然后job.completedTaskCount++就进行相加了,也就是说每一个task完成都会被相加,直到所有的map跑完才执行reduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static class TaskCompletedTransition implements
MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {

@Override
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.completedTaskCount++;
LOG.info("Num completed Tasks: " + job.completedTaskCount);
JobTaskEvent taskEvent = (JobTaskEvent) event;
Task task = job.tasks.get(taskEvent.getTaskID());
if (taskEvent.getState() == TaskState.SUCCEEDED) {
taskSucceeded(job, task);
} else if (taskEvent.getState() == TaskState.FAILED) {
taskFailed(job, task);
} else if (taskEvent.getState() == TaskState.KILLED) {
taskKilled(job, task);
}

return checkJobAfterTaskCompletion(job);
}

所有有了以下日志:

1
2
3
2016-05-31 19:45:09,275 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Before Scheduling: PendingReds:100 ScheduledMaps:0 ScheduledReds:0 AssignedMaps:2 AssignedReds:0 CompletedMaps:1885 CompletedReds:0 ContAlloc:1899 ContRel:9 HostLocal:1572 RackLocal:291
2016-05-31 19:45:09,279 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Recalculating schedule, headroom=<memory:-4096, vCores:763>
2016-05-31 19:45:09,279 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Reduce slow start threshold reached. Scheduling reduces.

因为在Reduce跑着的过程中,发现map所在的机器的节点出现了不健康,所以AM把Task的状态SUCCESS转换为KILLED事件,进行Task的重新调度:

1
2
3
4
5
6
7
8
9
private static class MapTaskRescheduledTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
//succeeded map task is restarted back
job.completedTaskCount--;
job.succeededMapTaskCount--;
}
}

对应的数值开始减1,然后在心跳线程中重新申请资源preemptReducesIfNeeded:

1
2
3
4
5
6
7
8
9
10
11
if (recalculateReduceSchedule) {
preemptReducesIfNeeded();
scheduleReduces(
getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceRequest, reduceResourceRequest,
pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false;
}

如果map的数量大于0,集群资源不满足则kill reduce

1
2
3
4
5
6
7
8
9
10
11
@Private
@VisibleForTesting
void preemptReducesIfNeeded() {
if (reduceResourceRequest.equals(Resources.none())) {
return; // no reduces
}
//check if reduces have taken over the whole cluster and there are
//unassigned maps
if (scheduledRequests.maps.size() > 0) {
Resource resourceLimit = getResourceLimit();
Resource availableResourceForMap =

日志都打印出来没什么好说的了:

1
2016-05-31 21:12:35,401 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Going to preempt 1 due to lack of space for maps

然后抢到资源:

1
2016-05-31 21:12:37,408 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Assigned container container_e08_1464351197737_140388_01_002139 to attempt_1464351197737_140388_m_000836_1

开始跑了

总结:

设置slow start阀值为1,map跑完后就开始跑Reduce了,但是过程中有些节点变成了不可用,所以相应的task就会从SUCCESS状态变为KILLED状态进行重新调度,因为所有的Reduce都开始跑队列资源沾满的情况下,开始尝试kill自己的Reduce看能不能抢到资源,抢不到继续KILL,直到抢到运行完map为止,Reduce才能继续执行。