Study/Spring Batch
스프링 배치 실행 -SimpleJob 아키텍처
공29
2024. 4. 15. 23:22
1) SimpleJob 흐름도
2) 클래스 상속 관계도
3) SimpleJob 흐름도 따라가보기
3-1) SimpleJobLauncher.java
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
마지막에 수행됐던 JobExecution 객체를 가져온다.
3-2) SimpleJobRepository.java
public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
if (jobInstance == null) {
return null;
}
JobExecution jobExecution = jobExecutionDao.getLastJobExecution(jobInstance);
if (jobExecution != null) {
jobExecution.setExecutionContext(ecDao.getExecutionContext(jobExecution));
stepExecutionDao.addStepExecutions(jobExecution);
}
return jobExecution;
}
jobInstance가 없으면 null 반환한다.
3-3) SimpleJobLauncher.java
job.getJobParametersValidator().validate(jobParameters);
JobParameter 검증 과정을 거친다.
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
JobExecution을 생성한다.
3-4) SimpleJobRepository.java
@Override
public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
// ... 코드 생략
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
ExecutionContext executionContext;
// existing job instance found
if (jobInstance != null) {
List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);
if (executions.isEmpty()) {
throw new IllegalStateException("Cannot find any job execution for job instance: " + jobInstance);
}
// check for running executions and find the last started
for (JobExecution execution : executions) {
// ... 코드 생략
}
executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
}
else {
// no job found, create one
jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
executionContext = new ExecutionContext();
}
JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null);
jobExecution.setExecutionContext(executionContext);
jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
// Save the JobExecution so that it picks up an ID (useful for clients
// monitoring asynchronous executions):
jobExecutionDao.saveJobExecution(jobExecution);
ecDao.saveExecutionContext(jobExecution);
return jobExecution;
}
JobInstance가 없으면 새로운 JobInstance 객체를 생성하고, JobExecution 안에 포함되어질 ExecutionContext 객체도 생성한 후 저장한다.
3-5) SimpleJobLauncher.java
taskExecutor.execute(new Runnable() { // ... 코드 생략
Job을 실행시킨다.
3-7) Abstract.java
@Override
public final void execute(JobExecution execution) {
// 코드 생략...
try {
jobParametersValidator.validate(execution.getJobParameters());
if (execution.getStatus() != BatchStatus.STOPPING) {
execution.setStartTime(new Date());
updateStatus(execution, BatchStatus.STARTED);
// 배치 상태를 STARTED 상태로 변경한다.
listener.beforeJob(execution);
// 등록된 리스너가 있다면 실행(doExecution)하기 전에 beforeJob 메서드를 호출한다.
try {
doExecute(execution);
if (logger.isDebugEnabled()) {
logger.debug("Job execution complete: " + execution);
}
} catch (RepeatException e) {
throw e.getCause();
}
} else {
// The job was already stopped before we even got this far. Deal
// with it in the same way as any other interruption.
execution.setStatus(BatchStatus.STOPPED);
execution.setExitStatus(ExitStatus.COMPLETED);
if (logger.isDebugEnabled()) {
logger.debug("Job execution was stopped: " + execution);
}
}
} catch (JobInterruptedException e) {
// 코드 생략...
}
}
검증 과정을 한 번 더 거친 후 배치 상태를 STARTED로 변경한다. 등록된 리스너가 있다면 실행(doExecution)하기 전에 beforeJob 메서드를 호출하고, doExecute 메서드를 호출한다.
3-8) SimpleJob.java
@Override
protected void doExecute(JobExecution execution) throws JobInterruptedException, JobRestartException,
StartLimitExceededException {
StepExecution stepExecution = null;
for (Step step : steps) {
stepExecution = handleStep(step, execution);
if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
// BatchStatus가 COMPLETED가 아니면 성공적인 종료가 아니기때문에 빠져나가서 다음 스텝은 수행되지 않도록 한다.
//
// Terminate the job if a step fails
//
break;
}
}
//
// Update the job status to be the same as the last step
//
if (stepExecution != null) {
if (logger.isDebugEnabled()) {
logger.debug("Upgrading JobExecution status: " + stepExecution);
}
execution.upgradeStatus(stepExecution.getStatus());
execution.setExitStatus(stepExecution.getExitStatus());
// 작업 상태를 마지막 Step 단계와 동일하게 업데이트한다.
}
}
스텝을 순차적으로 실행하고, BatchStatus가 COMPLETED가 아니면 성공적인 종료가 아니기때문에 빠져나가서 다음 스텝은 수행되지 않도록 한다.
3-9) AbstractJob.java
@Override
public final void execute(JobExecution execution) {
// ... 코드 생략
try {
// ... 코드 생략
try {
doExecute(execution);
if (logger.isDebugEnabled()) {
logger.debug("Job execution complete: " + execution);
}
} catch (RepeatException e) {
throw e.getCause();
}
} else {
// ... 코드 생략
}
} catch (JobInterruptedException e) {
// ... 코드 생략
} catch (Throwable t) {
// ... 코드 생략
} finally {
try {
// ... 코드 생략
try {
listener.afterJob(execution);
// finally 안에 있기 때문에 Job에서 예외가 발생해도 실행된다.
} catch (Exception e) {
logger.error("Exception encountered in afterJob callback", e);
}
jobRepository.update(execution);
} finally {
JobSynchronizationManager.release();
}
}
}
afterJob을 호출한다.