[YARN] 基于 ZKRMStateStore 的 Yarn 的 HA 机制分析

2016-04-29

前面已经说过HDFS的HA的相关机制 简单了解NameNode的ZKFC机制 ,所以我们接着上面的说,YARN的HA切换由EmbeddedElectorService类控制,和ZKFailoverController的ElectorCallbacks一样,实现了ActiveStandbyElectorCallback接口,他们的区别是fenceOldActive方法的实现

1
2
3
4
5
6
7
8
9
private Stat fenceOldActive() throws InterruptedException, KeeperException {
.......
if (Arrays.equals(data, appData)) {
LOG.info("But old node has our own data, so don't need to fence it.");
} else {
appClient.fenceOldActive(data);
}
return stat;
}

ZKFailoverController为,先GracefulFence,不行则进行真正的fence

1
2
3
4
5
6
7
8
9
10
private synchronized void fenceOldActive(byte[] data) {
HAServiceTarget target = dataToTarget(data);

try {
doFence(target);
} catch (Throwable t) {
recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
Throwables.propagate(t);
}
}

而Yarn的HA则为

1
2
3
4
5
6
7
@Override
public void fenceOldActive(byte[] oldActiveData) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request to fence old active being ignored, " +
"as embedded leader election doesn't support fencing");
}
}

NameNode通过rpc或ssh kill的防止脑裂,而ZKRMStateStore是怎么在防止脑裂的呢?

在ZKRMStateStore中,大部分的操作都会在实际操作之前创建RM_ZK_FENCING_LOCK的文件,操作完成之后则删除对应的文件,这些操作是事务性的,这样意味着同时只有一个client去写rmstore目录,当有两个rm同时写,创建RM_ZK_FENCING_LOCK时则会抛出异常,同时rm则会捕获异常,并将自己的状态转化为standby的状态。

1
2
3
4
5
6
7
8
9
10
11
private synchronized void doDeleteMultiWithRetries( final List<Op> opList) throws Exception { 
  final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
  execOpList.add(createFencingNodePathOp); execOpList.addAll(opList);
  execOpList.add(deleteFencingNodePathOp);
  new ZKAction<Void>() {
    @Override
    public Void run() throws KeeperException, InterruptedException {
      setHasDeleteNodeOp(true);
      zkClient.multi(execOpList);
       return null;
   } }.runWithRetries(); }

举一个例子,异常会被store.notifyStoreOperationFailed(e)处理

1
2
3
4
5
6
7
8
9
10
11
12
public void transition(RMStateStore store, RMStateStoreEvent event) {
......
try {
LOG.info("Storing RMDelegationToken and SequenceNumber");
store.storeRMDelegationTokenState(
dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
} catch (Exception e) {
LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
e);
store.notifyStoreOperationFailed(e);
}
}

这里就进行context相关的关闭,转化为standby的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* This method is called to notify the ResourceManager that the store
* operation has failed.
* @param failureCause the exception due to which the operation failed
*/
protected void notifyStoreOperationFailed(Exception failureCause) {
if (failureCause instanceof StoreFencedException) {
updateFencedState();
Thread standByTransitionThread =
new Thread(new StandByTransitionThread());
standByTransitionThread.setName("StandByTransitionThread Handler");
standByTransitionThread.start();
} else {
rmDispatcher.getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause));
}
}

除了防止同时写的情况发生,ZKRMStateStore还在切换的时候对ZKRMStateStore的存储目录进行权限的设置,只允许自己读写,其他用户只有读的权限,我们可以通过zk命令去看到这样的权限设置

1
2
3
4
5
6
7
8
[zk: localhost:2181(CONNECTED) 27] ls /yarn-test/rmstore/ZKRMStateRoot  
[AMRMTokenSecretManagerRoot, RMAppRoot, EpochNode, RMVersionNode, RMDTSecretManagerRoot]

[zk: localhost:2181(CONNECTED) 28] getAcl /yarn-test/rmstore/ZKRMStateRoot
'world,'anyone
: rwa
'digest,'xxx-xxxx-xxx15.hadoop.xxx.com:0vfG9l2cyt85oF5/H01oip5KEGU=
: cd

参考资料

[RM HA3] Zookeeper在RM HA的应用