`

Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析 (中)

阅读更多
继续上一篇文章, 那时候AM Allocation已经生成, 就等着NM 的心跳来找到有资源的NM, 再去启动, 那么假设一个NM 心跳, 然后走的就是RMNodeImpl的状态机的RMNodeEventType.STATUS_UPDATE事件, 看一下事件定义:
  private static final StateMachineFactory<RMNodeImpl,
                                           NodeState,
                                           RMNodeEventType,
                                           RMNodeEvent> stateMachineFactory 
                 = new StateMachineFactory<RMNodeImpl,
                                           NodeState,
                                           RMNodeEventType,
                                           RMNodeEvent>(NodeState.NEW)
  
     //Transitions from NEW state
     .addTransition(NodeState.NEW, NodeState.RUNNING, 
         RMNodeEventType.STARTED, new AddNodeTransition())
     .addTransition(NodeState.NEW, NodeState.NEW,
         RMNodeEventType.RESOURCE_UPDATE, 
         new UpdateNodeResourceWhenUnusableTransition())

     //Transitions from RUNNING state
     .addTransition(NodeState.RUNNING,
         EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
         RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
		 
		 //调用了StatusUpdateWhenHealthyTransition的方法


那么我们要继续看到StatusUpdateWhenHealthyTransition去, 看注释:
public static class StatusUpdateWhenHealthyTransition implements
      MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
    @Override
    public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {

      RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;

	  //心跳基本信息确认
      // Switch the last heartbeatresponse.
      rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();

      NodeHealthStatus remoteNodeHealthStatus = 
          statusEvent.getNodeHealthStatus();
      rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
      rmNode.setLastHealthReportTime(
          remoteNodeHealthStatus.getLastHealthReportTime());
		  
		  //这个If一般走不进去
      if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
        LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: "
            + remoteNodeHealthStatus.getHealthReport());
        rmNode.nodeUpdateQueue.clear();
        // Inform the scheduler
        rmNode.context.getDispatcher().getEventHandler().handle(
            new NodeRemovedSchedulerEvent(rmNode));
        rmNode.context.getDispatcher().getEventHandler().handle(
            new NodesListManagerEvent(
                NodesListManagerEventType.NODE_UNUSABLE, rmNode));
        // Update metrics
        rmNode.updateMetricsForDeactivatedNode(rmNode.getState(),
            NodeState.UNHEALTHY);
        return NodeState.UNHEALTHY;
      }

      rmNode.handleContainerStatus(statusEvent.getContainers());

	  //心跳实际作用内容, 会通过dispatcher去通知scheduler进行NodeUpdateSchedulerEvent, 也就是SchedulerEventType.NODE_UPDATE
	  //那么就要去看FifoScheduler的NODE_UPDATE事件了
      if(rmNode.nextHeartBeat) {
        rmNode.nextHeartBeat = false;
        rmNode.context.getDispatcher().getEventHandler().handle(
            new NodeUpdateSchedulerEvent(rmNode));
      }

      // Update DTRenewer in secure mode to keep these apps alive. Today this is
      // needed for log-aggregation to finish long after the apps are gone.
      if (UserGroupInformation.isSecurityEnabled()) {
        rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
          statusEvent.getKeepAliveAppIds());
      }

      return NodeState.RUNNING;
    }
  }


接下来就要去FifoScheduler的Node Update事件了, 看一下这个事件的处理:
    case NODE_UPDATE:
    {
      NodeUpdateSchedulerEvent nodeUpdatedEvent = 
      (NodeUpdateSchedulerEvent)event;
      nodeUpdate(nodeUpdatedEvent.getRMNode());
    }


调用nodeUpdate方法:
  private synchronized void nodeUpdate(RMNode rmNode) {
    FiCaSchedulerNode node = getNode(rmNode.getNodeID());
    
	//从NM中拿到各种信息
    List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
    for(UpdatedContainerInfo containerInfo : containerInfoList) {
      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
      completedContainers.addAll(containerInfo.getCompletedContainers());
    }
	
	//已存信息处理
    // Processing the newly launched containers
    for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
    }

	//已存信息处理
    // Process completed containers
    for (ContainerStatus completedContainer : completedContainers) {
      ContainerId containerId = completedContainer.getContainerId();
      LOG.debug("Container FINISHED: " + containerId);
      completedContainer(getRMContainer(containerId), 
          completedContainer, RMContainerEventType.FINISHED);
    }


    if (rmContext.isWorkPreservingRecoveryEnabled()
        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
      return;
    }

	//判断是否有资源来处理一个Allocation, 我们AM返回的就是一个Allocation, 在scheduler中等着
	//如果这里刚好能够执行这个Allocation, 那么就会进入
    if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
            node.getAvailableResource(),minimumAllocation)) {
      LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
          " available resource = " + node.getAvailableResource());

		  //分配container
      assignContainers(node);

      LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
          + node.getAvailableResource());
    }

    updateAvailableResourcesMetrics();
  }



可以看到如果碰到资源足够的NM, 那么就会执行assignContainers这个方法, 很明显了, 就是要给这个Node起一个AM的container, 继续看这个方法:
private void assignContainers(FiCaSchedulerNode node) {
    LOG.debug("assignContainers:" +
        " node=" + node.getRMNode().getNodeAddress() + 
        " #applications=" + applications.size());

    // Try to assign containers to applications in fifo order
    for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications
        .entrySet()) {
		
		//先进先出原则 拿出当前的application
      FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
      if (application == null) {
        continue;
      }

      LOG.debug("pre-assignContainers");
      application.showRequests();
      synchronized (application) {
        // Check if this resource is on the blacklist
        if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
          continue;
        }
        
		//根据priority高低顺序来安排assign
        for (Priority priority : application.getPriorities()) {
          int maxContainers = 
            getMaxAllocatableContainers(application, priority, node, 
                NodeType.OFF_SWITCH); 
          // Ensure the application needs containers of this priority
		  
		  //有要assign的
          if (maxContainers > 0) {
		  
		  //assign到NM上, 要看一下assignContainersOnNode
            int assignedContainers = 
              assignContainersOnNode(node, application, priority);
            // Do not assign out of order w.r.t priorities
            if (assignedContainers == 0) {
              break;
            }
          }
        }
      }
      
      LOG.debug("post-assignContainers");
      application.showRequests();

      // Done
      if (Resources.lessThan(resourceCalculator, clusterResource,
              node.getAvailableResource(), minimumAllocation)) {
        break;
      }
    }

    // Update the applications' headroom to correctly take into
    // account the containers assigned in this update.
    for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
      FiCaSchedulerApp attempt =
          (FiCaSchedulerApp) application.getCurrentAppAttempt();
      if (attempt == null) {
        continue;
      }
      updateAppHeadRoom(attempt);
    }
  }


那么继续看一下assignContainersOnNode方法:

 private int assignContainersOnNode(FiCaSchedulerNode node, 
      FiCaSchedulerApp application, Priority priority 
  ) {
    // Data-local
	
	//不一样的container 不一样的方法,但都会调用assignContainer的方法
	//但是最终都会到FicaScheduler的allocate方法, 里面会触发RMContainerEventType.START事件
	//就不细看下去了
    int nodeLocalContainers = 
      assignNodeLocalContainers(node, application, priority); 

    // Rack-local
    int rackLocalContainers = 
      assignRackLocalContainers(node, application, priority);

    // Off-switch
    int offSwitchContainers =
      assignOffSwitchContainers(node, application, priority);


    LOG.debug("assignContainersOnNode:" +
        " node=" + node.getRMNode().getNodeAddress() + 
        " application=" + application.getApplicationId().getId() +
        " priority=" + priority.getPriority() + 
        " #assigned=" + 
        (nodeLocalContainers + rackLocalContainers + offSwitchContainers));


    return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
  }


当触发RMContainerEventType.START事件后, 会去RMContainerImpl的状态机去执行对应的方法, 状态机:
 stateMachineFactory = new StateMachineFactory<RMContainerImpl, 
       RMContainerState, RMContainerEventType, RMContainerEvent>(
      RMContainerState.NEW)

    // Transitions from NEW state
    .addTransition(RMContainerState.NEW, RMContainerState.ALLOCATED,
        RMContainerEventType.START, new ContainerStartedTransition())
    .addTransition(RMContainerState.NEW, RMContainerState.KILLED,
        RMContainerEventType.KILL)


回去执行ContainerStartedTransition方法:
  private static final class ContainerStartedTransition extends
      BaseTransition {

    @Override
    public void transition(RMContainerImpl container, RMContainerEvent event) {
	
	//在NM尝试启动container, RMAppAttemptContainerAllocatedEvent = RMAppAttemptEventType.CONTAINER_ALLOCATED
      container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
          container.appAttemptId));
    }
  }


回到RMAppAttemptImpl看一下  RMAppAttemptEventType.CONTAINER_ALLOCATED 触发的是什么:
.addTransition(RMAppAttemptState.SCHEDULED,
          EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING,
            RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.CONTAINER_ALLOCATED,
          new AMContainerAllocatedTransition())


执行的是AMContainerAllocatedTransition方法:
  private static final class AMContainerAllocatedTransition
      implements
      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
    @Override
    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
      // Acquire the AM container from the scheduler.
	  
	  //从scheduler那边拿到这个AMallocation
      Allocation amContainerAllocation =
          appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
            EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
            null);
      // There must be at least one container allocated, because a
      // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
      // and is put in SchedulerApplication#newlyAllocatedContainers.

      // Note that YarnScheduler#allocate is not guaranteed to be able to
      // fetch it since container may not be fetchable for some reason like
      // DNS unavailable causing container token not generated. As such, we
      // return to the previous state and keep retry until am container is
      // fetched.
      if (amContainerAllocation.getContainers().size() == 0) {
        appAttempt.retryFetchingAMContainer(appAttempt);
        return RMAppAttemptState.SCHEDULED;
      }

	  //设置各种AM 的属性
      // Set the masterContainer
      appAttempt.setMasterContainer(amContainerAllocation.getContainers()
          .get(0));
		  
		  //拿到RMContainer对象
      RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
          .getRMContainer(appAttempt.getMasterContainer().getId());
      rmMasterContainer.setAMContainer(true);
      // The node set in NMTokenSecrentManager is used for marking whether the
      // NMToken has been issued for this node to the AM.
      // When AM container was allocated to RM itself, the node which allocates
      // this AM container was marked as the NMToken already sent. Thus,
      // clear this node set so that the following allocate requests from AM are
      // able to retrieve the corresponding NMToken.
      appAttempt.rmContext.getNMTokenSecretManager()
        .clearNodeSetForAttempt(appAttempt.applicationAttemptId);
      appAttempt.getSubmissionContext().setResource(
        appAttempt.getMasterContainer().getResource());
		
		//执行attempt的storeAttempt, 接下来看
      appAttempt.storeAttempt();
      return RMAppAttemptState.ALLOCATED_SAVING;
    }
  }


那么就到appAttempt.storeAttempt()去了:
  private void storeAttempt() {
    // store attempt data in a non-blocking manner to prevent dispatcher
    // thread starvation and wait for state to be saved
    LOG.info("Storing attempt: AppId: " + 
              getAppAttemptId().getApplicationId() 
              + " AttemptId: " + 
              getAppAttemptId()
              + " MasterContainer: " + masterContainer);
			  
			  //将attempt保存草resource manager的statestore中
    rmContext.getStateStore().storeNewApplicationAttempt(this);
  }


看一下storeNewApplicationAttempt:
 public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
    Credentials credentials = getCredentialsFromAppAttempt(appAttempt);

    AggregateAppResourceUsage resUsage =
        appAttempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
    ApplicationAttemptState attemptState =
        new ApplicationAttemptState(appAttempt.getAppAttemptId(),
          appAttempt.getMasterContainer(), credentials,
          appAttempt.getStartTime(), resUsage.getMemorySeconds(),
          resUsage.getVcoreSeconds());

		  //statestore里面会去执行RMStateStoreAppAttemptEvent事件也就是触发RMStateStoreEventType.STORE_APP_ATTEMPT事件
		  //实际上是会去处理RMStateStore里面执行RMStateStoreEventType.STORE_APP_ATTEMPT事件
    dispatcher.getEventHandler().handle(
      new RMStateStoreAppAttemptEvent(attemptState));
  }


那么就要去statestore看一下这个事件触发了什么动作:
private static final StateMachineFactory<RMStateStore,
                                           RMStateStoreState,
                                           RMStateStoreEventType, 
                                           RMStateStoreEvent>
      stateMachineFactory = new StateMachineFactory<RMStateStore,
                                                    RMStateStoreState,
                                                    RMStateStoreEventType,
                                                    RMStateStoreEvent>(
      RMStateStoreState.DEFAULT)
      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
          RMStateStoreEventType.STORE_APP, new StoreAppTransition())
      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
          RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
          RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
		  
		  //在这里, 要去看StoreAppAttemptTransition了
      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
          RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
          RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());


那么接下来就去看StoreAppAttemptTransition方法:
  private static class StoreAppAttemptTransition implements
      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
    @Override
    public void transition(RMStateStore store, RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreAppAttemptEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return;
      }
	  
	  //attempt state
      ApplicationAttemptState attemptState =
          ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
      try {
	  
	  //拿attemptdata
        ApplicationAttemptStateData attemptStateData = 
            ApplicationAttemptStateData.newInstance(attemptState);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
        }
		//保存
        store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
            attemptStateData);
			
			//通知事件处理器去处理RMAppAttemptEventType.ATTEMPT_NEW_SAVED事件
        store.notifyApplicationAttempt(new RMAppAttemptEvent
               (attemptState.getAttemptId(),
               RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
      } catch (Exception e) {
        LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
        store.notifyStoreOperationFailed(e);
      }
    };
  }


RMAppAttemptEventType.ATTEMPT_NEW_SAVED事件事件的定义:
 .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
          RMAppAttemptState.ALLOCATED,
          RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())


将状态改为RMAppAttemptState.ALLOCATED, 执行AttemptStoredTransition():

  private static final class AttemptStoredTransition extends BaseTransition {
    @Override
    public void transition(RMAppAttemptImpl appAttempt,
                                                    RMAppAttemptEvent event) {

//既然attempt已经保存了, 那么接下来就是启动了, 这个attempt是AM启动的attempt
      appAttempt.launchAttempt();
    }
  }


再去看一下launchAttempt:
  public synchronized void  handle(AMLauncherEvent appEvent) {
    AMLauncherEventType event = appEvent.getType();
    RMAppAttempt application = appEvent.getAppAttempt();
    switch (event) {
	//AMLauncherEventType.LAUNCH事件被ApplicationMasterLauncher捕获
    case LAUNCH:
      launch(application);
      break;
    case CLEANUP:
      cleanup(application);
    default:
      break;
    }
  }


然后再launch里面执行了:
  private void launch(RMAppAttempt application) {
  
  //创建了一个Runnable 的AMLauncher, 然后放入到masterEvents中
    Runnable launcher = createRunnableLauncher(application, 
        AMLauncherEventType.LAUNCH);
    masterEvents.add(launcher);
  }
  
    
  protected Runnable createRunnableLauncher(RMAppAttempt application, 
      AMLauncherEventType event) {
    Runnable launcher =
        new AMLauncher(context, application, event, getConfig());
    return launcher;
  }


在ApplicationMasterLauncher启动的时候会同时启动一个thread:
this.launcherHandlingThread = new LauncherThread();

这个LauncherThread中会从masterEvents将放进去的Runnable一个一个取出来, 然后执行run方法:
private class LauncherThread extends Thread {
    
    public LauncherThread() {
      super("ApplicationMaster Launcher");
    }

    @Override
    public void run() {
      while (!this.isInterrupted()) {
        Runnable toLaunch;
        try {
          toLaunch = masterEvents.take();
          launcherPool.execute(toLaunch);
        } catch (InterruptedException e) {
          LOG.warn(this.getClass().getName() + " interrupted. Returning.");
          return;
        }
      }
    }
  }    


那么就要去看一下AMLauncher的run方法了:
public void run() {
    switch (eventType) {
	
	//AMLauncher的run方法, 根据传进来的eventtype执行Launch或者cleanup
	//我们这里就是去执行Launch动作
    case LAUNCH:
      try {
        LOG.info("Launching master" + application.getAppAttemptId());
        launch();
        handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
            RMAppAttemptEventType.LAUNCHED));
      } catch(Exception ie) {
        String message = "Error launching " + application.getAppAttemptId()
            + ". Got exception: " + StringUtils.stringifyException(ie);
        LOG.info(message);
        handler.handle(new RMAppAttemptLaunchFailedEvent(application
            .getAppAttemptId(), message));
      }
      break;
    case CLEANUP:
      try {
        LOG.info("Cleaning master " + application.getAppAttemptId());
        cleanup();
      } catch(IOException ie) {
        LOG.info("Error cleaning master ", ie);
      } catch (YarnException e) {
        StringBuilder sb = new StringBuilder("Container ");
        sb.append(masterContainer.getId().toString());
        sb.append(" is not handled by this NodeManager");
        if (!e.getMessage().contains(sb.toString())) {
          // Ignoring if container is already killed by Node Manager.
          LOG.info("Error cleaning master ", e);          
        }
      }
      break;
    default:
      LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");
      break;
    }
  }


在laynch里面只要执行了launch(), 然后通知事件处理器执行RMAppAttemptEventType.LAUNCHED事件

其实从字面意思来理解 这个launch方法必然是去启动AM的container的, 来看一下怎么启动的:
  private void launch() throws IOException, YarnException {
    connect();
    ContainerId masterContainerID = masterContainer.getId();
	
	//这里就是我们刚开始的时候把启动脚本拼接起来存储进去的对象, 马上就要用到了
    ApplicationSubmissionContext applicationContext =
      application.getSubmissionContext();
    LOG.info("Setting up container " + masterContainer
        + " for AM " + application.getAppAttemptId());  
		
		//根据创建LaunchContext
    ContainerLaunchContext launchContext =
        createAMContainerLaunchContext(applicationContext, masterContainerID);

		//将LaunchContext封装到startContainerRequest里面
    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(launchContext,
          masterContainer.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(scRequest);
	
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);

		//启动AM Container
    StartContainersResponse response =
        containerMgrProxy.startContainers(allRequests);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(masterContainerID)) {
      Throwable t =
          response.getFailedRequests().get(masterContainerID).deSerialize();
      parseAndThrowException(t);
    } else {
      LOG.info("Done launching container " + masterContainer + " for AM "
          + application.getAppAttemptId());
    }
  }


通过containerMgrProxy.startContainers(allRequests); ContainerManagerImpl来创建传入的request, 看一下:
  public StartContainersResponse
      startContainers(StartContainersRequest requests) throws YarnException,
          IOException {
    
	...
		
		//根据request的list, 启动所有的container
    for (StartContainerRequest request : requests.getStartContainerRequests()) {
      ContainerId containerId = null;
      try {
        ContainerTokenIdentifier containerTokenIdentifier =
            BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
        verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
          containerTokenIdentifier);
        containerId = containerTokenIdentifier.getContainerID();
		
		//启动container
        startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
          request);
        succeededContainers.add(containerId);
      } catch (YarnException e) {
        failedContainers.put(containerId, SerializedException.newInstance(e));
      } catch (InvalidToken ie) {
        failedContainers.put(containerId, SerializedException.newInstance(ie));
        throw ie;
      } catch (IOException e) {
        throw RPCUtil.getRemoteException(e);
      }
    }

    return StartContainersResponse.newInstance(getAuxServiceMetaData(),
      succeededContainers, failedContainers);
  }


会通过startContainerInternal来启动container:
 private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
      ContainerTokenIdentifier containerTokenIdentifier,
      StartContainerRequest request) throws YarnException, IOException {
	  
	  ...
	  //拿到之前设置的launchContext, 包含着启动脚本
	  ContainerLaunchContext launchContext = request.getContainerLaunchContext();
	  
	  ...
	  
	  //创建container对象
	  Container container =
        new ContainerImpl(getConfig(), this.dispatcher,
            context.getNMStateStore(), launchContext,
          credentials, metrics, containerTokenIdentifier);
	  	  
	  ...
	  
	  //初始化container, 在ApplicationImpl中触发启动container, 最后是会去ContainerImpl里面的状态机执行ContainerEventType.INIT_CONTAINER事件
	  dispatcher.getEventHandler().handle(
          new ApplicationContainerInitEvent(container));
	  
	  
	  ...
	  
	    
	  }


看一下ContainerEventType.INIT_CONTAINER事件的状态机:
  private static StateMachineFactory
           <ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>
        stateMachineFactory =
      new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
    // From NEW State
    .addTransition(ContainerState.NEW,
        EnumSet.of(ContainerState.LOCALIZING,
            ContainerState.LOCALIZED,
            ContainerState.LOCALIZATION_FAILED,
            ContainerState.DONE),
        ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())


会去执行RequestResourcesTransition, 顾名思义 去要资源了, 看一下这个方法:
static class RequestResourcesTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
	  
	  ...
	  
	  //获取到启动脚本
	   final ContainerLaunchContext ctxt = container.launchContext;
      container.metrics.initingContainer();

	  ...
	  
	  //启动
	  container.sendLaunchEvent();
        container.metrics.endInitingContainer();
	  
	  }

看到, 最后就是去启动脚本了:
  private void sendLaunchEvent() {
    ContainersLauncherEventType launcherEvent =
        ContainersLauncherEventType.LAUNCH_CONTAINER;
    if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
      // try to recover a container that was previously launched
      launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
    }
	
	//触发containerLaunch事件,  ContainersLauncherEventType.LAUNCH_CONTAINER 
	//这个事件将在ContainersLauncher中的handle执行
    dispatcher.getEventHandler().handle(
        new ContainersLauncherEvent(this, launcherEvent));
  }


那么就接下来看一下ContainersLauncher的handle方法:
public void handle(ContainersLauncherEvent event) {
    // TODO: ContainersLauncher launches containers one by one!!
    Container container = event.getContainer();
    ContainerId containerId = container.getContainerId();
    switch (event.getType()) {
      case LAUNCH_CONTAINER:
        Application app =
          context.getApplications().get(
              containerId.getApplicationAttemptId().getApplicationId());

        ContainerLaunch launch =
            new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
              event.getContainer(), dirsHandler, containerManager);
			  
			  //直接submit到containerLauncher里面
			  //containerLauncher是一个ExecutorService, submit进去后, 会去执行launch的call方法, 那么我们就要接下来看一下call这个方法了
        containerLauncher.submit(launch);
        running.put(containerId, launch);
        break;
		
     ...
	 
    }
  }


ContainerLaunch 的call方法, 这个方法很长, 那么就只抓其中几个比较重点的方法来看一下了:

public Integer call() {

	//这个包含启动脚本的launchContext
	final ContainerLaunchContext launchContext = container.getLaunchContext();

	...
	//拿出启动的命令
	final List<String> command = launchContext.getCommands();
	
	...
	//将command+日志路径的变量 放到一个List里面再设置回launchcontext
	List<String> newCmds = new ArrayList<String>(command.size());
	
	for (String str : command) {
        // TODO: Should we instead work via symlinks without this grammar?
        newCmds.add(expandEnvironment(str, containerLogDir));
      }
      launchContext.setCommands(newCmds)
	
	...
	
	//通知container已经启动了, 修改状态
	dispatcher.getEventHandler().handle(new ContainerEvent(
            containerID,
            ContainerEventType.CONTAINER_LAUNCHED));
      context.getNMStateStore().storeContainerLaunched(containerID);

      // Check if the container is signalled to be killed.
      if (!shouldLaunchContainer.compareAndSet(false, true)) {
        LOG.info("Container " + containerIdStr + " not launched as "
            + "cleanup already called");
        ret = ExitCode.TERMINATED.getExitCode();
      }
      else {
        exec.activateContainer(containerID, pidFilePath);
		//启动container
        ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
                nmPrivateTokensPath, user, appIdStr, containerWorkDir,
                localDirs, logDirs);
      }
	
	...
	
}

exec.launchContainer会根据不一样的环境去执行不一样的类, 比如说LinuxContainerExecutor,

这个类会执行:
shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
            container.getLaunchContext().getEnvironment());

shExec.execute();


在execute里面就去执行我们刚刚开始就设置的command, 也就是MRAppMaster的main方法。

至此 AM container启动完毕, 后面就看MRAppMaster里面的main是怎么做的了
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics