private Stat fenceOldActive() throws InterruptedException, KeeperException { ....... if (Arrays.equals(data, appData)) { LOG.info("But old node has our own data, so don't need to fence it."); } else { appClient.fenceOldActive(data); } return stat; }
@Override public void fenceOldActive(byte[] oldActiveData) { if (LOG.isDebugEnabled()) { LOG.debug("Request to fence old active being ignored, " + "as embedded leader election doesn't support fencing"); } }
private synchronized void doDeleteMultiWithRetries( final List<Op> opList) throws Exception { final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2); execOpList.add(createFencingNodePathOp); execOpList.addAll(opList); execOpList.add(deleteFencingNodePathOp); new ZKAction<Void>() { @Override public Void run() throws KeeperException, InterruptedException { setHasDeleteNodeOp(true); zkClient.multi(execOpList); return null; } }.runWithRetries(); }
举一个例子,异常会被store.notifyStoreOperationFailed(e)处理
1 2 3 4 5 6 7 8 9 10 11 12
public void transition(RMStateStore store, RMStateStoreEvent event) { ...... try { LOG.info("Storing RMDelegationToken and SequenceNumber"); store.storeRMDelegationTokenState( dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate()); } catch (Exception e) { LOG.error("Error While Storing RMDelegationToken and SequenceNumber ", e); store.notifyStoreOperationFailed(e); } }
这里就进行context相关的关闭,转化为standby的状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/** * This method is called to notify the ResourceManager that the store * operation has failed. * @param failureCause the exception due to which the operation failed */ protected void notifyStoreOperationFailed(Exception failureCause) { if (failureCause instanceof StoreFencedException) { updateFencedState(); Thread standByTransitionThread = new Thread(new StandByTransitionThread()); standByTransitionThread.setName("StandByTransitionThread Handler"); standByTransitionThread.start(); } else { rmDispatcher.getEventHandler().handle( new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause)); } }