Study/Spring Batch

스프링 배치 실행 -SimpleJob 아키텍처

공29 2024. 4. 15. 23:22

1) SimpleJob 흐름도

 

2) 클래스 상속 관계도

 

3) SimpleJob 흐름도 따라가보기

3-1) SimpleJobLauncher.java

JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);

 

마지막에 수행됐던 JobExecution 객체를 가져온다.

3-2) SimpleJobRepository.java

public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
    JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
    if (jobInstance == null) {
        return null;
    }
    JobExecution jobExecution = jobExecutionDao.getLastJobExecution(jobInstance);

    if (jobExecution != null) {
        jobExecution.setExecutionContext(ecDao.getExecutionContext(jobExecution));
        stepExecutionDao.addStepExecutions(jobExecution);
    }
    return jobExecution;
}

jobInstance가 없으면 null 반환한다.

3-3) SimpleJobLauncher.java

job.getJobParametersValidator().validate(jobParameters);

JobParameter 검증 과정을 거친다.

jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

JobExecution을 생성한다.

3-4) SimpleJobRepository.java

@Override
public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
        throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
    // ... 코드 생략

    JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
    ExecutionContext executionContext;

    // existing job instance found
    if (jobInstance != null) {

        List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);

        if (executions.isEmpty()) {
            throw new IllegalStateException("Cannot find any job execution for job instance: " + jobInstance);
        }

        // check for running executions and find the last started
        for (JobExecution execution : executions) {
            // ... 코드 생략
        }
        executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
    }
    else {
        // no job found, create one
        jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
        executionContext = new ExecutionContext();
    }

    JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null);
    jobExecution.setExecutionContext(executionContext);
    jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));

    // Save the JobExecution so that it picks up an ID (useful for clients
    // monitoring asynchronous executions):
    jobExecutionDao.saveJobExecution(jobExecution);
    ecDao.saveExecutionContext(jobExecution);

    return jobExecution;

}

JobInstance가 없으면 새로운 JobInstance 객체를 생성하고, JobExecution 안에 포함되어질 ExecutionContext 객체도 생성한 후 저장한다.

3-5) SimpleJobLauncher.java

taskExecutor.execute(new Runnable() { // ... 코드 생략

Job을 실행시킨다.

3-7) Abstract.java

@Override
public final void execute(JobExecution execution) {

    // 코드 생략...
    try {

        jobParametersValidator.validate(execution.getJobParameters());

        if (execution.getStatus() != BatchStatus.STOPPING) {

            execution.setStartTime(new Date());
            updateStatus(execution, BatchStatus.STARTED);
            // 배치 상태를 STARTED 상태로 변경한다.

            listener.beforeJob(execution);
            // 등록된 리스너가 있다면 실행(doExecution)하기 전에 beforeJob 메서드를 호출한다.

            try {
                doExecute(execution);
                if (logger.isDebugEnabled()) {
                    logger.debug("Job execution complete: " + execution);
                }
            } catch (RepeatException e) {
                throw e.getCause();
            }
        } else {

            // The job was already stopped before we even got this far. Deal
            // with it in the same way as any other interruption.
            execution.setStatus(BatchStatus.STOPPED);
            execution.setExitStatus(ExitStatus.COMPLETED);
            if (logger.isDebugEnabled()) {
                logger.debug("Job execution was stopped: " + execution);
            }

        }

    } catch (JobInterruptedException e) {
		// 코드 생략...
    }

}

검증 과정을 한 번 더 거친 후 배치 상태를 STARTED로 변경한다. 등록된 리스너가 있다면 실행(doExecution)하기 전에 beforeJob 메서드를 호출하고, doExecute 메서드를 호출한다.

3-8) SimpleJob.java

@Override
protected void doExecute(JobExecution execution) throws JobInterruptedException, JobRestartException,
StartLimitExceededException {

    StepExecution stepExecution = null;
    for (Step step : steps) {
        stepExecution = handleStep(step, execution);
        if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
            // BatchStatus가 COMPLETED가 아니면 성공적인 종료가 아니기때문에 빠져나가서 다음 스텝은 수행되지 않도록 한다.
            //
            // Terminate the job if a step fails
            //
            break;
        }
    }

    //
    // Update the job status to be the same as the last step
    //
    if (stepExecution != null) {
        if (logger.isDebugEnabled()) {
            logger.debug("Upgrading JobExecution status: " + stepExecution);
        }
        execution.upgradeStatus(stepExecution.getStatus());
        execution.setExitStatus(stepExecution.getExitStatus());
        // 작업 상태를 마지막 Step 단계와 동일하게 업데이트한다.
    }
}

 

스텝을 순차적으로 실행하고, BatchStatus가 COMPLETED가 아니면 성공적인 종료가 아니기때문에 빠져나가서 다음 스텝은 수행되지 않도록 한다.

3-9) AbstractJob.java

@Override
public final void execute(JobExecution execution) {

    // ... 코드 생략
    try {
        // ... 코드 생략
            try {
                doExecute(execution);
                if (logger.isDebugEnabled()) {
                    logger.debug("Job execution complete: " + execution);
                }
            } catch (RepeatException e) {
                throw e.getCause();
            }
        } else {
            // ... 코드 생략
        }

    } catch (JobInterruptedException e) {
        // ... 코드 생략
    } catch (Throwable t) {
        // ... 코드 생략
    } finally {
        try {
            // ... 코드 생략

            try {
                listener.afterJob(execution);
                // finally 안에 있기 때문에 Job에서 예외가 발생해도 실행된다.
            } catch (Exception e) {
                logger.error("Exception encountered in afterJob callback", e);
            }

            jobRepository.update(execution);
        } finally {
            JobSynchronizationManager.release();
        }

    }

}

afterJob을 호출한다.