`

Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析(下)

阅读更多
中间隔了国庆, 好不容易才看明白了MRAppMaster如何启动其他container以及如何在NodeManager上面运行Task的。

上回写到了AM启动到最后其实是运行的MRAppMaster的main方法, 那么我们就从这里开始看他是如何启动其他container的, 首先看一下main方法:
 public static void main(String[] args) {
    try {
      Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
	  
	  //其实这里代码看似很多, 但是基本上不需要看, 这里大部分代码是从本地读取各种配置然后重新创建相应的对象
	  //如containerId Host Port等等
      String containerIdStr =
          System.getenv(Environment.CONTAINER_ID.name());
      String nodeHostString = System.getenv(Environment.NM_HOST.name());
      String nodePortString = System.getenv(Environment.NM_PORT.name());
      String nodeHttpPortString =
          System.getenv(Environment.NM_HTTP_PORT.name());
      String appSubmitTimeStr =
          System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
      
      validateInputParam(containerIdStr,
          Environment.CONTAINER_ID.name());
      validateInputParam(nodeHostString, Environment.NM_HOST.name());
      validateInputParam(nodePortString, Environment.NM_PORT.name());
      validateInputParam(nodeHttpPortString,
          Environment.NM_HTTP_PORT.name());
      validateInputParam(appSubmitTimeStr,
          ApplicationConstants.APP_SUBMIT_TIME_ENV);

      ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
      ApplicationAttemptId applicationAttemptId =
          containerId.getApplicationAttemptId();
      long appSubmitTime = Long.parseLong(appSubmitTimeStr);
      
      
	  //根据当前获取的配置, 创建appMaster
      MRAppMaster appMaster =
          new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
              Integer.parseInt(nodePortString),
              Integer.parseInt(nodeHttpPortString), appSubmitTime);
      ShutdownHookManager.get().addShutdownHook(
        new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
      JobConf conf = new JobConf(new YarnConfiguration());
      conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
      
      MRWebAppUtil.initialize(conf);
      String jobUserName = System
          .getenv(ApplicationConstants.Environment.USER.name());
      conf.set(MRJobConfig.USER_NAME, jobUserName);
      // Do not automatically close FileSystem objects so that in case of
      // SIGTERM I have a chance to write out the job history. I'll be closing
      // the objects myself.
      conf.setBoolean("fs.automatic.close", false);
	  
	  //这里才是最重要的, 就是调用serviceInit和serviceStart
	  //
      initAndStartAppMaster(appMaster, conf, jobUserName);
    } catch (Throwable t) {
      LOG.fatal("Error starting MRAppMaster", t);
      ExitUtil.terminate(1, t);
    }
  }



看一下initAndStartAppMaster:
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
      final JobConf conf, String jobUserName) throws IOException,
      InterruptedException {
	  ...
	  
	  //这里就调用了MRAppMaster的init(serviceinit)和start(servicestart)方法
	  appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
      @Override
      public Object run() throws Exception {
        appMaster.init(conf);
        appMaster.start();
        if(appMaster.errorHappenedShutDown) {
          throw new IOException("Was asked to shut down.");
        }
        return null;
      }
    });
	}



serviceInit主要是初始化一堆对象, 这里就直接看一下serviceStart了:

protected void serviceStart() throws Exception {

...

//创建Job
job = createJob(getConfig(), forcedState, shutDownMessage);

...

//会启动所有的services, 比较重要的就包括containerAllocator 和containerLauncher
//containerAllocator会调用其初始化函数, 然后在跑其中的start方法, 如果我们进去看的话可以看到RMContainerAllocator实际上是抽象类AbstractService的实现 //他的init和start方法调用的是serviceinit和servicestart, 再看一下servicestart里面是启动了eventHandlingThread以及allocatorThread
//其中eventHandlingThread是负责事件处理的, allocatorThread是执行heartbeat与RM进行状态汇报和container操作的
super.serviceStart();


  // finally set the job classloader
    MRApps.setClassLoader(jobClassLoader, getConfig());

    if (initFailed) {
      JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
      jobEventDispatcher.handle(initFailedEvent);
    } else {
      // 所有都启动后, 开始启动Job
      startJobs();
    }
	
	
	}


那么既然所有的service都启动完成后, 就去看startJobs里面做了什么了:
  protected void startJobs() {
    /** create a job-start event to get this ball rolling */
    JobEvent startJobEvent = new JobStartEvent(job.getID(),
        recoveredJobStartTime);
    /** send the job-start event. this triggers the job execution. */
	
	//其实就是创建了JobStartEvent, 去JobImpl触发JobEventType.JOB_START transition
    dispatcher.getEventHandler().handle(startJobEvent);
  }


那么我们就要去看看JobImpl的状态机的定义了:
protected static final
    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 
       stateMachineFactory
     = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
              (JobStateInternal.NEW)

          // Transitions from NEW state
          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
              JobEventType.JOB_DIAGNOSTIC_UPDATE,
              DIAGNOSTIC_UPDATE_TRANSITION)
          
		  ...
			  
			  
			  //在这里, 会执行StartTransition这个方法
          .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
              JobEventType.JOB_START,
              new StartTransition())


那么接下来看一下StartTransition在做什么 (目前所有的事情都是在AM这个container里面做的, 还没有涉及到执行RM相关的操作)
  public static class StartTransition
  implements SingleArcTransition<JobImpl, JobEvent> {
    /**
     * This transition executes in the event-dispatcher thread, though it's
     * triggered in MRAppMaster's startJobs() method.
     */
    @Override
    public void transition(JobImpl job, JobEvent event) {
      JobStartEvent jse = (JobStartEvent) event;
      if (jse.getRecoveredJobStartTime() != 0) {
        job.startTime = jse.getRecoveredJobStartTime();
      } else {
        job.startTime = job.clock.getTime();
      }
      JobInitedEvent jie =
        new JobInitedEvent(job.oldJobId,
             job.startTime,
             job.numMapTasks, job.numReduceTasks,
             job.getState().toString(),
             job.isUber());
			 
			 
			 //触发一些JobHistory相关的event  
      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
      JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
          job.appSubmitTime, job.startTime);
      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
	  //running Job +1
      job.metrics.runningJob(job);

	  //触发CommitterEventHandler的JOB_SETUP事件
      job.eventHandler.handle(new CommitterJobSetupEvent(
              job.jobId, job.jobContext));
    }
  }


那么看一下CommitterEventHandler里面JOB_SETUP都做了什么:
    public void run() {
      LOG.info("Processing the event " + event.toString());
      switch (event.getType()) {
      case JOB_SETUP:
	  //JOB_SETUP回去调用handleJobSetup方法, 看一下
        handleJobSetup((CommitterJobSetupEvent) event);
        break;
      case JOB_COMMIT:
        handleJobCommit((CommitterJobCommitEvent) event);
        break;
      case JOB_ABORT:
        handleJobAbort((CommitterJobAbortEvent) event);
        break;
      case TASK_ABORT:
        handleTaskAbort((CommitterTaskAbortEvent) event);
        break;
      default:
        throw new YarnRuntimeException("Unexpected committer event "
            + event.toString());
      }
    }
	
	//handleJobSetup
	protected void handleJobSetup(CommitterJobSetupEvent event) {
      try {
	  
	  //回去OutputCommitter执行setupJob, setupJob是一个抽象类, 会根据不同情况调用不一样的实现类, //我们这里看FileOutputCommitter的setupJob其实就是创建一个Job工作的temp路径 , 创建完后job就算setup了
        committer.setupJob(event.getJobContext());
		
		//接下来就会去触发JobImpl的JOB_SETUP_COMPLETED事件
        context.getEventHandler().handle(
            new JobSetupCompletedEvent(event.getJobID()));
      } catch (Exception e) {
        LOG.warn("Job setup failed", e);
        context.getEventHandler().handle(new JobSetupFailedEvent(
            event.getJobID(), StringUtils.stringifyException(e)));
      }
    }


那么我们又要回到JobImpl的状态机定义了:
//会调用SetupCompletedTransition方法, 并把Job状态从SETUP改为RUNNING
.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,
              JobEventType.JOB_SETUP_COMPLETED,
              new SetupCompletedTransition())


那么我们就去看一下SetupCompletedTransition方法:
  private static class SetupCompletedTransition
      implements SingleArcTransition<JobImpl, JobEvent> {
    @Override
    public void transition(JobImpl job, JobEvent event) {
      job.setupProgress = 1.0f;
	  
	  //schedule Map task, 这里我们就只写maptask这一部分了, 不然太长了, 如果Map部分看懂了reduce部分也不难
      job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
	  
	  //schedule reduce Task
      job.scheduleTasks(job.reduceTasks, true);

      // If we have no tasks, just transition to job completed
      if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
        job.eventHandler.handle(new JobEvent(job.jobId,
            JobEventType.JOB_COMPLETED));
      }
    }
  }


我们要去看一下scheduleTasks这个方法到底做了什么:
  protected void scheduleTasks(Set<TaskId> taskIDs,
      boolean recoverTaskOutput) {
	  
	  //task其实在我们提交Job的时候就分好了, 包含很多TaskInfo, 这里就为每个task创建一个T_SCHEDULE的event
    for (TaskId taskID : taskIDs) {
      TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
	  
	  //如果是true代表之前执行过这个task, 我们这里只考虑完全新submit的 所以直接到else
      if (taskInfo != null) {
        eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
            committer, recoverTaskOutput));
      } else {
	  //为每个task触发TaskImpl的T_SCHEDULE事件
        eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
      }
    }
  }


一样, 我们需要去看一下TaskImpl里面是怎么定义状态机, 怎么处理T_SCHEDULE事件的:
  private static final StateMachineFactory
               <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent> 
            stateMachineFactory 
           = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
               (TaskStateInternal.NEW)

    // define the state machine of Task

    // Transitions from NEW state
	//调用InitialScheduleTransition方法, 并将task状态转为SCHEDULED
    .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED, 
        TaskEventType.T_SCHEDULE, new InitialScheduleTransition())


那么看一下InitialScheduleTransition这个方法:
 
  private static class InitialScheduleTransition
    implements SingleArcTransition<TaskImpl, TaskEvent> {

    @Override
    public void transition(TaskImpl task, TaskEvent event) {
	
	//这里, 开始schedule尝试执行这个task
      task.addAndScheduleAttempt(Avataar.VIRGIN);
      task.scheduledTime = task.clock.getTime();
      task.sendTaskStartedEvent();
    }
  }
  
    private void addAndScheduleAttempt(Avataar avataar) {
	
	//创建一个TaskAttempt
    TaskAttempt attempt = addAttempt(avataar);
    inProgressAttempts.add(attempt.getID());
    //schedule the nextAttemptNumber
	//如果有失败的taskattempt, 那么就reschedule去执行, 我们这里同样只考虑全新的task attempt
    if (failedAttempts.size() > 0) {
      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
          TaskAttemptEventType.TA_RESCHEDULE));
    } else {
	
	//触发taskattemptImpl的TA_SCHEDULE
      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
          TaskAttemptEventType.TA_SCHEDULE));
    }
  }


那么我们就要去TaskAttemptImpl里面去看一下相关的状态机了:
 private static final StateMachineFactory
        <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
        stateMachineFactory
    = new StateMachineFactory
             <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
           (TaskAttemptStateInternal.NEW)

     // Transitions from the NEW state.
	 //执行的是RequestContainerTransition 并且把状态改为UNASSIGNED
     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
         TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))


看一下RequestContainerTransition的方法:
  static class RequestContainerTransition implements
      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
    private final boolean rescheduled;
    public RequestContainerTransition(boolean rescheduled) {
      this.rescheduled = rescheduled;
    }
    @SuppressWarnings("unchecked")
    @Override
    public void transition(TaskAttemptImpl taskAttempt, 
        TaskAttemptEvent event) {
      // 通知   要去拿container啦
      taskAttempt.eventHandler.handle
          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));
      //request for container
	  //不考虑reschedule的 直接去else
      if (rescheduled) {
        taskAttempt.eventHandler.handle(
            ContainerRequestEvent.createContainerRequestEventForFailedContainer(
                taskAttempt.attemptId, 
                taskAttempt.resourceCapability));
      } else {
	  //为这个task创建一个ContainerRequestEvent事件, 这样会去调用RMcontainerAllocator的CONTAINER_REQ事件
	  //RMcontainerAllocator是在AppMaster创建时候就有的, 那么接下去就要去看一下RMcontainerAllocator是如何处置CONTAINER_REQ事件的
        taskAttempt.eventHandler.handle(new ContainerRequestEvent(
            taskAttempt.attemptId, taskAttempt.resourceCapability,
            taskAttempt.dataLocalHosts.toArray(
                new String[taskAttempt.dataLocalHosts.size()]),
            taskAttempt.dataLocalRacks.toArray(
                new String[taskAttempt.dataLocalRacks.size()])));
      }
    }
  }


去RMcontainerAllocator看一下:
protected synchronized void handleEvent(ContainerAllocatorEvent event) {
    recalculateReduceSchedule = true;
    if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
	
	...
	
	//如果是Map task则执行这一段
	if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
		//中间一大堆, 主要就是确认Map所要求的资源在这个集群内能不能提供, 不能就kill掉, 可以就做下面这个动作
		//addMap里面算是把maptask schedule了
		scheduledRequests.addMap(reqEvent);
	}
	
	}
	
	...
	
	}


那么就得去看一下addMap里面都做了什么了:
    void addMap(ContainerRequestEvent event) {
      ContainerRequest request = null;
      
	  //我们还没开始, 轮不到failed, 那么就去看else
      if (event.getEarlierAttemptFailed()) {
        earlierFailedMaps.add(event.getAttemptID());
        request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
        LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
      } else {
	  //根据event里面保存的host 和rack去找对应的机器, 这里就体现了Hadoop会优先去找task所在的机器启动container
        for (String host : event.getHosts()) {
          LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
          if (list == null) {
            list = new LinkedList<TaskAttemptId>();
            mapsHostMapping.put(host, list);
          }
          list.add(event.getAttemptID());
          if (LOG.isDebugEnabled()) {
            LOG.debug("Added attempt req to host " + host);
          }
       }
       for (String rack: event.getRacks()) {
         LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
         if (list == null) {
           list = new LinkedList<TaskAttemptId>();
           mapsRackMapping.put(rack, list);
         }
         list.add(event.getAttemptID());
         if (LOG.isDebugEnabled()) {
            LOG.debug("Added attempt req to rack " + rack);
         }
       }
       request = new ContainerRequest(event, PRIORITY_MAP);
      }
      maps.put(event.getAttemptID(), request);
	  
	  //执行addContainerReq去想办法request container
      addContainerReq(request);
    }
    


看一下addContainerReq这个方法了:
  protected void addContainerReq(ContainerRequest req) {
    // Create resource requests
	
	//其实就是执行一个addResourceRequest, 去想办法向RM申请资源
    for (String host : req.hosts) {
      // Data-local
      if (!isNodeBlacklisted(host)) {
        addResourceRequest(req.priority, host, req.capability);
      }      
    }

    // Nothing Rack-local for now
    for (String rack : req.racks) {
      addResourceRequest(req.priority, rack, req.capability);
    }

    // Off-switch
    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
  }


看一下addResourceRequest里面:
 private void addResourceRequest(Priority priority, String resourceName,
      Resource capability) {
    
	//里面也是一大堆, 总的来说就是创建一个remoteRequest, 然后放到ask这个set里面, 等着heartBeat的时候通过AM去要资源, 然后启动
	
	..
	
    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
    if (reqMap == null) {
      reqMap = new HashMap<Resource, ResourceRequest>();
      remoteRequests.put(resourceName, reqMap);
    }
	
	 ResourceRequest remoteRequest = reqMap.get(capability);
    if (remoteRequest == null) {
      remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
      remoteRequest.setPriority(priority);
      remoteRequest.setResourceName(resourceName);
      remoteRequest.setCapability(capability);
      remoteRequest.setNumContainers(0);
      reqMap.put(capability, remoteRequest);
    }
	
	//添加到ask里面 等AM和RM通讯时 要container
	addResourceRequestToAsk(remoteRequest);
	
	}


好了, 刚开始的时候我们就说到heartbeat是在RMContainerAllocater这个类的heartbeat()方法做的, 那么我们看一下这个方法是怎么要container的:
protected synchronized void heartbeat() throws Exception {
    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
	
	//和RM通讯要container的入口就在这个getResources里面, 接下来会看一下
    List<Container> allocatedContainers = getResources();
    if (allocatedContainers != null && allocatedContainers.size() > 0) {
	
	//如果拿到container的话就开始assign到NM上面去执行
      scheduledRequests.assign(allocatedContainers);
    }

    int completedMaps = getJob().getCompletedMaps();
    int completedTasks = completedMaps + getJob().getCompletedReduces();
    if ((lastCompletedTasks != completedTasks) ||
          (scheduledRequests.maps.size() > 0)) {
      lastCompletedTasks = completedTasks;
      recalculateReduceSchedule = true;
    }

    if (recalculateReduceSchedule) {
      preemptReducesIfNeeded();
      scheduleReduces(
          getJob().getTotalMaps(), completedMaps,
          scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
          assignedRequests.maps.size(), assignedRequests.reduces.size(),
          mapResourceRequest, reduceResourceRequest,
          pendingReduces.size(), 
          maxReduceRampupLimit, reduceSlowStart);
      recalculateReduceSchedule = false;
    }

    scheduleStats.updateAndLogIfChanged("After Scheduling: ");
  }


我们先看一下getResources这个方法:
 private List<Container> getResources() throws Exception {
    // will be null the first time
    Resource headRoom =
        getAvailableResources() == null ? Resources.none() :
            Resources.clone(getAvailableResources());
    AllocateResponse response;
    /*
     * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
     * milliseconds before aborting. During this interval, AM will still try
     * to contact the RM.
     */
    try {
	//这里就是去RM那边获取container
      response = makeRemoteRequest();
      // Reset retry count if no exception occurred.
      retrystartTime = System.currentTimeMillis();
    }
	
	...
	
	//将container从RM的response里面存到本地list中
	List<Container> newContainers = response.getAllocatedContainers();
	
	...
	
	//返回给heartbeat去assign
	return newContainers;
	
	
	}
	
	
	
	protected AllocateResponse makeRemoteRequest() throws YarnException,
      IOException {
    ResourceBlacklistRequest blacklistRequest =
        ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
            new ArrayList<String>(blacklistRemovals));
    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
          new ArrayList<ContainerId>(release), blacklistRequest);
		  
		  //这里就是像RM要container, scheduler其实是一个RM的proxy类, 最终会去RM上面的FifoScheduler上面去allocate container并返回
		  //scheduler = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
		  //所以AM就在这里和RM的scheduler通讯 获取到分配给他的container
    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
	
	
	...
	
	//返回
	return allocateResponse;
	
	}


接下来就要去看一下assign这个动作是怎么做的了:
 private void assign(List<Container> allocatedContainers) {
 Iterator<Container> it = allocatedContainers.iterator();
 ...
 
 while (it.hasNext()) {
 //一堆确认, 能不能assign container  忽略了
 }
 
 ...
 
 //开始assign container
 assignContainers(allocatedContainers);
 
 ...
 
 }


来看一下assignContainers这个方法:
    private void assignContainers(List<Container> allocatedContainers) {
      Iterator<Container> it = allocatedContainers.iterator();
      while (it.hasNext()) {
        Container allocated = it.next();
		
		//这里只会看是否这个task是PRIORITY_FAST_FAIL_MAP 或者 PRIORITY_REDUCE, 显然我们的是PRIORITY_MAP, 那么就执行assignMapsWithLocality
        ContainerRequest assigned = assignWithoutLocality(allocated);
        if (assigned != null) {
          containerAssigned(allocated, assigned);
          it.remove();
        }
      }

	  //执行assignMapsWithLocality
      assignMapsWithLocality(allocatedContainers);
    }
	
	
	 private void assignMapsWithLocality(List<Container> allocatedContainers) {
      // try to assign to all nodes first to match node local
	  
	  //下面代码很长, 其实就是先先去根据host是不是local 是的话就直接分了, 不是的话看一下rack是不是local, 如果还不是的话才另外分配container
	  //由此可以看出hadoop yarn是怎么分配container给task的
      Iterator<Container> it = allocatedContainers.iterator();
      while(it.hasNext() && maps.size() > 0){
        Container allocated = it.next();        
        Priority priority = allocated.getPriority();
        assert PRIORITY_MAP.equals(priority);
        // "if (maps.containsKey(tId))" below should be almost always true.
        // hence this while loop would almost always have O(1) complexity
        String host = allocated.getNodeId().getHost();
        LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
        while (list != null && list.size() > 0) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Host matched to the request list " + host);
          }
          TaskAttemptId tId = list.removeFirst();
          if (maps.containsKey(tId)) {
            ContainerRequest assigned = maps.remove(tId);
			
			//具体的container assign动作在这个类里面
            containerAssigned(allocated, assigned);
            it.remove();
            JobCounterUpdateEvent jce =
              new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
            jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
            eventHandler.handle(jce);
            hostLocalAssigned++;
            if (LOG.isDebugEnabled()) {
              LOG.debug("Assigned based on host match " + host);
            }
            break;
          }
        }
      }
      
      // try to match all rack local
      it = allocatedContainers.iterator();
      while(it.hasNext() && maps.size() > 0){
        Container allocated = it.next();
        Priority priority = allocated.getPriority();
        assert PRIORITY_MAP.equals(priority);
        // "if (maps.containsKey(tId))" below should be almost always true.
        // hence this while loop would almost always have O(1) complexity
        String host = allocated.getNodeId().getHost();
        String rack = RackResolver.resolve(host).getNetworkLocation();
        LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
        while (list != null && list.size() > 0) {
          TaskAttemptId tId = list.removeFirst();
          if (maps.containsKey(tId)) {
            ContainerRequest assigned = maps.remove(tId);
            containerAssigned(allocated, assigned);
            it.remove();
            JobCounterUpdateEvent jce =
              new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
            jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
            eventHandler.handle(jce);
            rackLocalAssigned++;
            if (LOG.isDebugEnabled()) {
              LOG.debug("Assigned based on rack match " + rack);
            }
            break;
          }
        }
      }
      
      // assign remaining
      it = allocatedContainers.iterator();
      while(it.hasNext() && maps.size() > 0){
        Container allocated = it.next();
        Priority priority = allocated.getPriority();
        assert PRIORITY_MAP.equals(priority);
        TaskAttemptId tId = maps.keySet().iterator().next();
        ContainerRequest assigned = maps.remove(tId);
        containerAssigned(allocated, assigned);
        it.remove();
        JobCounterUpdateEvent jce =
          new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
        jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
        eventHandler.handle(jce);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Assigned based on * match");
        }
      }
    }
  }


其实就是先先去根据host是不是local 是的话就直接分了, 不是的话看一下rack是不是local, 如果还不是的话才另外分配container 由此可以看出hadoop yarn是怎么分配container给task的, 那么我们就要看一下具体assign container的方法了containerAssigned:

    private void containerAssigned(Container allocated, 
                                    ContainerRequest assigned) {
      // Update resource requests
      decContainerReq(assigned);

      // send the container-assigned event to task attempt
	  //去执行TaskAttempt的TA_ASSIGNED事件
      eventHandler.handle(new TaskAttemptContainerAssignedEvent(
          assigned.attemptID, allocated, applicationACLs));

      assignedRequests.add(allocated, assigned.attemptID);

      if (LOG.isDebugEnabled()) {
        LOG.info("Assigned container (" + allocated + ") "
            + " to task " + assigned.attemptID + " on node "
            + allocated.getNodeId().toString());
      }
    }


那么又要回到TaskAttemptImpl里面状态机的定义了:
     .addTransition(TaskAttemptStateInternal.UNASSIGNED,
         TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
         new ContainerAssignedTransition())

会去执行ContainerAssignedTransition方法, 并将TaskAttempt转为ASSIGNED状态, 看一下ContainerAssignedTransition方法怎么做的:

  private static class ContainerAssignedTransition implements
      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
    @SuppressWarnings({ "unchecked" })
    @Override
    public void transition(final TaskAttemptImpl taskAttempt, 
        TaskAttemptEvent event) {
      final TaskAttemptContainerAssignedEvent cEvent = 
        (TaskAttemptContainerAssignedEvent) event;
		
		//创建container
      Container container = cEvent.getContainer();
      taskAttempt.container = container;
      // this is a _real_ Task (classic Hadoop mapred flavor):
      taskAttempt.remoteTask = taskAttempt.createRemoteTask();
      taskAttempt.jvmID =
          new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(),
              taskAttempt.remoteTask.isMapTask(),
              taskAttempt.container.getId().getContainerId());
      taskAttempt.taskAttemptListener.registerPendingTask(
          taskAttempt.remoteTask, taskAttempt.jvmID);

      taskAttempt.computeRackAndLocality();
      
      //launch the container
      //create the container object to be launched for a given Task attempt
	  
	  //这个很熟悉吧, launchContext其实是启动的脚本, 具体是启动的那个类, 我后面会说一下
      ContainerLaunchContext launchContext = createContainerLaunchContext(
          cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
          taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
          taskAttempt.taskAttemptListener, taskAttempt.credentials);
		  
		  //CONTAINER_REMOTE_LAUNCH会去ContainerLauncherImpl调用他的handle类 然后执行launch动作
      taskAttempt.eventHandler
        .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
          launchContext, container, taskAttempt.remoteTask));

      // send event to speculator that our container needs are satisfied
      taskAttempt.eventHandler.handle
          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
    }
  }
  
  
   class EventProcessor implements Runnable {
    private ContainerLauncherEvent event;

    EventProcessor(ContainerLauncherEvent event) {
      this.event = event;
    }

    @Override
    public void run() {
      LOG.info("Processing the event " + event.toString());

      // Load ContainerManager tokens before creating a connection.
      // TODO: Do it only once per NodeManager.
      ContainerId containerID = event.getContainerID();

      Container c = getContainer(event);
      switch(event.getType()) {

	  //就是在这里remote Launch的
      case CONTAINER_REMOTE_LAUNCH:
        ContainerRemoteLaunchEvent launchEvent
            = (ContainerRemoteLaunchEvent) event;
			
			//执行launch 去远程启动, 这里可以看出 RM负责分配contianer, 实际控制container的是AM
        c.launch(launchEvent);
        break;

      case CONTAINER_REMOTE_CLEANUP:
        c.kill();
        break;
      }
      removeContainerIfDone(containerID);
    }
  }


那么就要具体看一下launch这个动作是怎么启动container的:
public synchronized void launch(ContainerRemoteLaunchEvent event) {

...

//这是一个远程控制的对象
ContainerManagementProtocolProxyData proxy = null;

proxy = getCMProxy(containerMgrAddress, containerID);

//创建container的start request
StartContainerRequest startRequest =
            StartContainerRequest.newInstance(containerLaunchContext,
              event.getContainerToken());
        List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
        list.add(startRequest);
        StartContainersRequest requestList = StartContainersRequest.newInstance(list);
		
		//远程启动container, 是通过containermanagerImpl去启动的
//这个containermanagerImpl是对应的NM上面的containerManager
        StartContainersResponse response =
            proxy.getContainerManagementProtocol().startContainers(requestList);

...

}


那么就要去containermanagerImpl去看一下了, 这个startContainers里面到底做的是什么:
  public StartContainersResponse
      startContainers(StartContainersRequest requests) throws YarnException,
          IOException {
    if (blockNewContainerRequests.get()) {
      throw new NMNotYetReadyException(
        "Rejecting new containers as NodeManager has not"
            + " yet connected with ResourceManager");
    }
	
	//创建一堆token
    UserGroupInformation remoteUgi = getRemoteUgi();
    NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
    authorizeUser(remoteUgi,nmTokenIdentifier);
    List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
    Map<ContainerId, SerializedException> failedContainers =
        new HashMap<ContainerId, SerializedException>();
    for (StartContainerRequest request : requests.getStartContainerRequests()) {
      ContainerId containerId = null;
      try {
	  
	  //还是token
        ContainerTokenIdentifier containerTokenIdentifier =
            BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
        verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
          containerTokenIdentifier);
        containerId = containerTokenIdentifier.getContainerID();
		
		//在这个NM上面启动Container
        startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
          request);
        succeededContainers.add(containerId);
      } catch (YarnException e) {
        failedContainers.put(containerId, SerializedException.newInstance(e));
      } catch (InvalidToken ie) {
        failedContainers.put(containerId, SerializedException.newInstance(ie));
        throw ie;
      } catch (IOException e) {
        throw RPCUtil.getRemoteException(e);
      }
    }

    return StartContainersResponse.newInstance(getAuxServiceMetaData(),
      succeededContainers, failedContainers);
  }


在当前NM上面启动container是调用的startContainerInternal方法,其实这个方法之前也用到过, 就是去启动container的, 就不详细的进入了, 具体来说会去启动application 级别的事件, application init完 变成running状态后, 会去触发AppInitDoneTransition方法, 然后触发ontainerEventType.INIT_CONTAINER事件:
  static class AppInitDoneTransition implements
      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
    @Override
    public void transition(ApplicationImpl app, ApplicationEvent event) {
      // Start all the containers waiting for ApplicationInit
      for (Container container : app.containers.values()) {
        app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
              container.getContainerId()));
      }
    }
  }


这里就是开始初始化Container了, 看一下INIT_CONTAINER的状态机会做什么:
  private static StateMachineFactory
           <ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>
        stateMachineFactory =
      new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
    // From NEW State
    .addTransition(ContainerState.NEW,
        EnumSet.of(ContainerState.LOCALIZING,
            ContainerState.LOCALIZED,
            ContainerState.LOCALIZATION_FAILED,
            ContainerState.DONE),
        ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())


看到了, 会去触发RequestResourcesTransition方法, 视情况会改变container状态, 再来继续看一下RequestResourcesTransition:
  static class RequestResourcesTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
    @Override
    public ContainerState transition(ContainerImpl container,
        ContainerEvent event) { 
		
		...
		
		//拿启动脚本
		final ContainerLaunchContext ctxt = container.launchContext;
		
		...
		
		//最初的时候这个设置不会是空
		 Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
      if (!cntrRsrc.isEmpty()) {
		
		//这里开始就会去一步一步的去拿local resource
		
		...
		
		//继续触发各种事件, 直至到ContainerEventType.RESOURCE_LOCALIZED 为止
		//会触发LocalizedTransition 也就是resource都拿到了, 可以启动了
		//接下来就直接去看LocalizedTransition里面做什么了
		container.dispatcher.getEventHandler().handle(
              new ContainerLocalizationRequestEvent(container, req));
		
		} else {
		
		container.sendLaunchEvent();
        container.metrics.endInitingContainer();
        return ContainerState.LOCALIZED;
		
		}
		
		}
		
		}



来看一下LocalizedTransition这个方法吧:
static class LocalizedTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
    @Override
    public ContainerState transition(ContainerImpl container,
        ContainerEvent event) {
		
		//排除各种本地资源获取不到的错误后 直接启动
      ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
      List<String> syms =
          container.pendingResources.remove(rsrcEvent.getResource());
      if (null == syms) {
        LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
                 " for container " + container.containerId);
        assert false;
        // fail container?
        return ContainerState.LOCALIZING;
      }
      container.localizedResources.put(rsrcEvent.getLocation(), syms);
      if (!container.pendingResources.isEmpty()) {
        return ContainerState.LOCALIZING;
      }

      container.dispatcher.getEventHandler().handle(
          new ContainerLocalizationEvent(LocalizationEventType.
              CONTAINER_RESOURCES_LOCALIZED, container));

			  //启动container在这个地方
      container.sendLaunchEvent();
      container.metrics.endInitingContainer();
      return ContainerState.LOCALIZED;
    }
  }


那么就要去看sendLaunchEvent了:
  private void sendLaunchEvent() {
    ContainersLauncherEventType launcherEvent =
        ContainersLauncherEventType.LAUNCH_CONTAINER;
    if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
      // try to recover a container that was previously launched
      launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
    }
	//会去触发ContainersLauncherEvent, 也就是ContainersLauncherEventType.LAUNCH_CONTAINER事件
	//实际上是去ContainersLauncher的handle里面调用 case LAUNCH_CONTAINER
    dispatcher.getEventHandler().handle(
        new ContainersLauncherEvent(this, launcherEvent));
  }


去到ContainersLauncher里面看handle里面的LAUNCH_CONTAINER定义:
           case LAUNCH_CONTAINER:
        Application app =
          context.getApplications().get(
              containerId.getApplicationAttemptId().getApplicationId());

			  //创建一个ContainerLaunch对象, 里面有一个call方法, 是会去调用最初放进去的脚本文件, 然后运行他的main方法
        ContainerLaunch launch =
            new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
              event.getContainer(), dirsHandler, containerManager);
			  
			  //containerLauncher是一个ExecutorService, submit后系统会自动调用传入对象的call方法
			  //到这里为止 container就启动了, 这个mapTask就启动了  开始运行了
        containerLauncher.submit(launch);
        running.put(containerId, launch);
        break;


既然开始运行了, 那么运行的那个class是哪个呢, 其实是一个YarnChild.class, 这个类是在createContainerLaunchContext方法的时候被传入到脚本里面的, 具体的方法是在
List<String> commands = MapReduceChildJVM.getVMCommand(
        taskAttemptListener.getAddress(), remoteTask, jvmID);


在getVMCommand方法里面, 把YarnChild.class传入到命令里, 在启动container的时候就执行这个命令, 从而执行YarnChild的main方法。

在YarnChild里面的main方法里面有这么一句:
taskFinal.run(job, umbilical)

可以看出这里开始才是真正run了task。

目前为止我们知道了AM是怎么向RM要container的, 然后AM再远程启动container, 再执行脚本开始run task。

后面有空的话会去看一下map后的shuffle过程是怎么做的, reduce这边就不看了, 整个过程应该和map task差不多, AM去RM申请contianer然后启动, 再执行
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics