最近集群遇到一个问题,就是集群在跑任务的时候,AM会超时10min而被KILL,但任务重跑则成功,问题是随机的出现的, 所以初步怀疑是因为AM心跳汇报出现问题或则RM因为繁忙hang住,AM因为某些机制导致等待10min不汇报心跳 ,所以我们还是先了解,AM是如何向RM汇报心跳的。
在MRAppMaster中,ContainerAllocatorRouter负责向RM申请资源(发送心跳)
RMContainerAllocator其最终父类是RMCommunicator,它实现了RMHeartbeatHandler接口
1 2 3 4
| public interface RMHeartbeatHandler { long getLastHeartbeatTime(); // 获取上一次心跳的时间 void runOnNextHeartbeat(Runnable callback); // 回调注册到callback队列的callback函数 }
|
每一次心跳回来,都会执行一次注册在heartbeatCallbacks中的回调函数:
1 2 3 4 5 6 7 8 9 10
| allocatorThread = new Thread(new Runnable() { @Override public void run() { while (!stopped.get() && !Thread.currentThread().isInterrupted()) { ...... heartbeat(); lastHeartbeatTime = context.getClock().getTime();// 记录上一次心跳时间 executeHeartbeatCallbacks(); // 执行回调函数 .... });
|
RMCommunicator类中:
1 2 3 4 5 6
| private void executeHeartbeatCallbacks() { Runnable callback = null; while ((callback = heartbeatCallbacks.poll()) != null) { callback.run(); } }
|
在RMCommunicator启动时, 首先会向RM注册 ,把自己的host和port告诉RM,然后在启动一条线程(startAllocatorThread)定期的调用 RMContainerAllocator中实现的heartbeat方法 (向RM申请资源,定期汇报信息,告诉RM自己还活着)。
AM初始化同时也会初始化RMCommunicator:
1 2 3 4 5 6
| protected void serviceStart() throws Exception { scheduler= createSchedulerProxy(); // 获取RM的代理 register(); // 注册 startAllocatorThread(); // 心跳线程 .... }
|
AM的ContainerAllocatorRouter事件处理流程如下图:
注册流程:
调用RMCommunicator远程调用ApplicationMasterService的registerApplicationMaster方法,设置维护responseId,然后把它加入AMLivelinessMonitor中,并使用map记录时间,用来监控AM是否因为长时间没有心跳而超时,如果AM长时间没有心跳信息更新,RM就会通知NodeManager把AM移除。
心跳线程:
在发送心跳的过程中,即也是获取资源的过程
1 2 3 4 5 6 7 8 9
| @Override protected synchronized void heartbeat() throws Exception { scheduleStats.updateAndLogIfChanged("Before Scheduling: "); List<Container> allocatedContainers = getResources();// 重要的方法 if (allocatedContainers.size() > 0) { scheduledRequests.assign(allocatedContainers); } ...... }
|
获取资源的过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| private List<Container> getResources() throws Exception { ... response = makeRemoteRequest(); // 和RM进行交互 ... // 优先处理RM发送过来的命令 if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: case AM_SHUTDOWN: eventHandler.handle(new JobEvent(this.getJob().getID(), JobEventType.JOB_AM_REBOOT)); throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " + this.getContext().getApplicationID()); default: .... } // 等等一系列处理 } }
|
构建请求:
1 2 3 4 5 6 7 8 9
| protected AllocateResponse makeRemoteRequest() throws IOException { AllocateRequest allocateRequest = AllocateRequest.newInstance(lastResponseID, super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(release), blacklistRequest); AllocateResponse allocateResponse; allocateResponse = scheduler.allocate(allocateRequest); // RPC调用ApplicationMasterService的allocate方法 ..... }
|
每一次心跳的调用都会刷新AMLivelinessMonitor的时间,代表AM还活着
而且我们通过代码可以看出,资源请求被封装为一个ask,即一个ResourceRequest的ArrayList的资源列表 例如:
1 2 3 4 5 6
| priority:20 host:host9 capability:<memory:2048, vCores:1> priority:20 host:host2 capability:<memory:2048, vCores:1> priority:20 host:host10 capability:<memory:2048, vCores:1> priority:20 host:/rack/rack3203 capability:<memory:2048, vCores:1> priority:20 host:/rack/rack3202 capability:<memory:2048, vCores:1> priority:20 host:* capability:<memory:2048, vCores:1>
|
然而,ask是如何被构造的呢?
RMContainerAllocator中的addMap,addReduce,assign方法中对ask的数据内容进行了修改
1
| addContainerReq --> addResourceRequest --> addResourceRequestToAsk;
|
通过在代码自己添加日志可以看出,资源会被分为local,rack,和any级别去申请资源
最终变为一个ask list发送到RM上:
1 2 3 4 5 6 7 8
| ask Capability:<memory:2048, vCores:1> ResourceName:* NumContainers:384 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3201 NumContainers:227 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3202 NumContainers:231 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3203 NumContainers:152 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3204 NumContainers:158 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:host1 NumContainers:46 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:host5 NumContainers:52 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:host6 NumContainers:38 Priority:20 RelaxLocality:true
|
类似日志为:
1
| getResources() for application_1438330253091_0004: ask=29 release= 0 newContainers=0 finishedContainers=0 resourcelimit=<memory:0, vCores:0> knownNMs=24
|
总结:
除了了解心跳之外,还学习了许多Map和Reduce的分配机制,收获良多。