ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 스프링 배치 도메인 이해 -StepContribution
    Study/Spring Batch 2024. 2. 15. 08:31

    StepContribution

    1) 기본 개념

    • chunk process의 변경 사항을 버퍼링한 후 StepExecution 상태를 업데이트하는 도메인 객체
    • chunk commit 직전에 StepExecution의 apply메서드를 호출하여 상태를 업데이트.
    • ExitStatus의 기본 종료코드 외 사용자 정의 종료코드를 생성해서 적용할 수 있다.

     

    2) 구조

     

    3) 도식화

     

    실습

    실습 1) TaskletStep class의 doInTransaction 메서드에 breakpoint 걸어서 따라가보기

    @Override
    public RepeatStatus doInTransaction(TransactionStatus status) {
        TransactionSynchronizationManager.registerSynchronization(this);
    
        RepeatStatus result = RepeatStatus.CONTINUABLE;
    
        StepContribution contribution = stepExecution.createStepContribution(); // StepContribution 생성
        // breakpoint를 걸어 contribution 확인
    
        // 코드 생략...
    
        try {
    
            try {
                try {
                    result = tasklet.execute(contribution, chunkContext);
                    if (result == null) {
                        result = RepeatStatus.FINISHED; // null이면 RepeatStatus.FINISHED
                    }
                }
                catch (Exception e) {
                    if (transactionAttribute.rollbackOn(e)) {
                        chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
                        throw e;
                    }
                }
            }
            finally {
                // 코드 생략...
                stepExecution.apply(contribution);
            }
    
            stepExecutionUpdated = true;
    
            stream.update(stepExecution.getExecutionContext());
    
            try {
                // Going to attempt a commit. If it fails this flag will
                // stay false and we can use that later.
                getJobRepository().updateExecutionContext(stepExecution); // BATCH_STEP_EXECUTION_CONTEXT 테이블 업데이트
                stepExecution.incrementCommitCount(); // commit count 증가
                if (logger.isDebugEnabled()) {
                    logger.debug("Saving step execution before commit: " + stepExecution);
                }
                getJobRepository().update(stepExecution); // BATCH_STEP_EXECUTION_CONTEXT 테이블 업데이트
            }
            catch (Exception e) {
                // If we get to here there was a problem saving the step
                // execution and we have to fail.
                String msg = "JobRepository failure forcing rollback";
                logger.error(msg, e);
                throw new FatalStepExecutionException(msg, e);
            }
        }
        
        // 코드 생략...
    
        return result;
    
    }

    - contribution

    - commit 직전에 apply메서드 호출

    public synchronized void apply(StepContribution contribution) {
        readSkipCount += contribution.getReadSkipCount();
        writeSkipCount += contribution.getWriteSkipCount();
        processSkipCount += contribution.getProcessSkipCount();
        filterCount += contribution.getFilterCount();
        readCount += contribution.getReadCount();
        writeCount += contribution.getWriteCount();
        exitStatus = exitStatus.and(contribution.getExitStatus());
    }

     

    실습 2) contribution 객체를 통해 job instance 접근 가능

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step1 has executed");
                        System.out.println(">>>" + contribution.getStepExecution().getJobExecution().getJobInstance().getJobName());
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }

    댓글

Designed by Tistory.