본문 바로가기
Web Frameworks/Spring

Spring Batch

by IamBeau 2021. 9. 27.

1.  배치 어플리케이션 정의 

일괄 처리(batch processing)란 최종 사용자의 개입 없이 또는 (자원이 허가한다면) 실행을 스케줄링할 수 있는 작업(job)의 실행을 의미한다.[1]

- 위키백과 (https://ko.wikipedia.org/wiki/%EC%9D%BC%EA%B4%84_%EC%B2%98%EB%A6%AC)

 

1-1. 배치 어플리케이션이 필요한 이유?  

- 사용자들에게 보여주기 위해 매일매일 집계되어야 하는 데이터를 운영중인 WAS 에서 수행하게 된다면 해당 서버는 CPU, I/O 등의 자원을 모두 사용하게 되어버려 이용중인 사용자들의 요청들을 처리할 수 없게 되어버려 운영 중의 이슈발생

- 10만개의 데이터를 처리하는 중 3만번째에서 실패하게 된다면, 3만1번째에서 이어서 실행 가능

- 이미 해당일자(= jobParameter)의 배치 함수가 실행 된 경우라면 중복실행 건 실패처리 

 

1-2. 배치 어플리케이션 조건 

1. 대용량 데이터 - 배치 어플리케이션은 대량의 데이터를 가져오거나, 전달하거나, 계산하는 등의 처리를 할 수 ​​있어야 합니다.
2. 자동화 - 배치 어플리케이션은 심각한 문제 해결을 제외하고는 사용자 개입 없이 실행되어야 합니다.
3. 견고성 - 배치 어플리케이션은 잘못된 데이터를 충돌/중단 없이 처리할 수 있어야 합니다.
4. 신뢰성 - 배치 어플리케이션은 무엇이 잘못되었는지를 추적할 수 있어야 합니다. (로깅, 알림)
5. 성능 - 배치 어플리케이션은 지정한 시간 안에 처리를 완료하거나 동시에 실행되는 다른 어플리케이션을 방해하지 않도록 수행되어야합니다.

- 이동욱 개발자님 "기억보단 기록을" https://jojoldu.tistory.com/324

           

2.  Spring Batch + MyBatis +  Spring Scheduler

2-1.  Spring Batch + MyBatis

2-1-1.  pom.xml 

<dependency>
	<groupId>org.springframework.batch</groupId>
	<artifactId>spring-batch-core</artifactId>
</dependency>

 

2-1-2.  Application.java 

- BatchApplication.java 에 @EnableBatchProcessing 해당 어노테이션 붙여서 Spring Batch 기능 활성화

@EnableBatchProcessing //배치기능 활성화 !!
@SpringBootApplication
public class BatchApplication {
	public static void main(String[] args) {
		SpringApplication.run(BatchApplication.class, args);
	}
}

 

2-1-3.  BatchJobConfig.java

@Slf4j
@RequiredArgsConstructor
@Configuration
public class batchJobConfig {

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final SqlSessionFactory sqlSessionFactory;

  private static final int CHUNK_SIZE = 1000;

  @Bean
  public Job batchJob() {
    return jobBuilderFactory.get("batchJob")
        .start(batchStep(null))
        .build();
  }

  @Bean
  @JobScope
  public Step batchStep(@Value("#{jobParameters[date]}") String date) {
    log.debug("batch job execution date : {}", date); // Batch Job 실행일시
    return stepBuilderFactory.get("step")
        .<MyDto, String>chunk(CHUNK_SIZE)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .build();
  }

  @Bean
  public ItemReader<MyDto> reader() {
    return new MyBatisCursorItemReaderBuilder<MyDto>()
            .sqlSessionFactory(sqlSessionFactory)
            .queryId("com.batch.test.findData")
            .build();
  }

  @Bean
  public ItemProcessor<MyDto, String> processor() {
    return myDto -> {
      return myDto.getStringData();
    };
  }

  @Bean
  public ItemWriter<String> writer() {
    return new MyBatisBatchItemWriterBuilder<String>()
            .sqlSessionFactory(sqlSessionFactory)
            .statementId("com.batch.test.insertData")
            .build();
  }

- CHUNK_SIZE : Spring Batch 는 Chunk 기반 처리 (각 commit 사이에 처리되는 row 수)

- @Value("#{jobParameters[date]}" : @JobScope(Job 또는 Step에 선언), @StepScope(ItemReader, ItemWriter 등 에 선언) 를 사용하여 파라미터를 받을 수 있음

- jobBuilderFactory.get("batchJob") : Builder 를 통해 batchJob 이라는 이름의 Batch Job 생성

- stepBuilderFactory.get("simpleStep1") : Builder 를 통해 step 이란 이름의 Batch Step 생성

- reader : ItemReader 의 read() 를 통해 아이템을 하나씩 읽음

- processor : reader로 부터 읽어온 데이터들을 ItemProcessor 에서 가공 또는 필터링

- writer : reader, processor를 거친 데이터의 갯수가 commit interval에 도달하면, 모든 chunk는 ItemWriter로 한꺼번에 전달되고 트랜잭션은 커밋

 

2-1-4.  schema-postgre.sql 

- Spring Batch 관련된 meta data 저장하는 테이블 필요

- 아래는 PostgreSQL 에 해당하는 batch meta table schema 정보 (환경에 맞는 DB Schema 찾아서 적용)

-- Autogenerated: do not edit this file

CREATE TABLE BATCH_JOB_INSTANCE  (
	JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT ,
	JOB_NAME VARCHAR(100) NOT NULL,
	JOB_KEY VARCHAR(32) NOT NULL,
	constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;

CREATE TABLE BATCH_JOB_EXECUTION  (
	JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT  ,
	JOB_INSTANCE_ID BIGINT NOT NULL,
	CREATE_TIME TIMESTAMP NOT NULL,
	START_TIME TIMESTAMP DEFAULT NULL ,
	END_TIME TIMESTAMP DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED TIMESTAMP,
	JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
	constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
	references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
	JOB_EXECUTION_ID BIGINT NOT NULL ,
	TYPE_CD VARCHAR(6) NOT NULL ,
	KEY_NAME VARCHAR(100) NOT NULL ,
	STRING_VAL VARCHAR(250) ,
	DATE_VAL TIMESTAMP DEFAULT NULL ,
	LONG_VAL BIGINT ,
	DOUBLE_VAL DOUBLE PRECISION ,
	IDENTIFYING CHAR(1) NOT NULL ,
	constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION  (
	STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT NOT NULL,
	STEP_NAME VARCHAR(100) NOT NULL,
	JOB_EXECUTION_ID BIGINT NOT NULL,
	START_TIME TIMESTAMP NOT NULL ,
	END_TIME TIMESTAMP DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	COMMIT_COUNT BIGINT ,
	READ_COUNT BIGINT ,
	FILTER_COUNT BIGINT ,
	WRITE_COUNT BIGINT ,
	READ_SKIP_COUNT BIGINT ,
	WRITE_SKIP_COUNT BIGINT ,
	PROCESS_SKIP_COUNT BIGINT ,
	ROLLBACK_COUNT BIGINT ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED TIMESTAMP,
	constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
	STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT TEXT ,
	constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
	references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
	JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT TEXT ,
	constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;

 

2-1-5.  ContextRefreshedEventListener.java 

@Configuration
@Slf4j
@RequiredArgsConstructor
public class ContextRefreshedEventListener implements ApplicationListener<ContextRefreshedEvent> {
  private final JobExplorer jobExplorer;
  private final JobRepository jobRepository;

  @Override
  public void onApplicationEvent(ContextRefreshedEvent event) {
    log.info("Stop running jobs.");

    for (String jobName : jobExplorer.getJobNames()) {
      Set<JobExecution> runningJobExecutions = jobExplorer.findRunningJobExecutions(jobName);

      for (JobExecution jobExecution : runningJobExecutions) {
        jobExecution.setStatus(BatchStatus.STOPPED);
        jobExecution.setEndTime(new Date());

        for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
          if (stepExecution.getStatus().isRunning()) {
            stepExecution.setStatus(BatchStatus.STOPPED);
            stepExecution.setEndTime(new Date());
            jobRepository.update(stepExecution);
          }
        }

        jobRepository.update(jobExecution);
        log.info("Updated job execution status: {}", jobExecution.getJobId());
      }

    }

    log.info("Stopped running jobs.");
  }

}

 - Spring Batch Job 실행 중 비정상적인 Job 에 대해서 status = stopped 로 바꿔주는 처리

 

2-2.  Spring Batch + Spring Scheduler

2-2-1.  Application.java 

- BatchApplication.java 에 @EnableScheduling 해당 어노테이션 붙여서 Spring Schedule 기능 활성화

@EnableScheduling // Spring Schedule 활성화 !!
@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {
	public static void main(String[] args) {
		SpringApplication.run(BatchApplication.class, args);
	}
}

 

 

2-2-2.  BatchJobScheduler.java 

@Slf4j
@RequiredArgsConstructor
@Component
public class BatchJobScheduler {

    private final JobLauncher jobLauncher;
    private final BatchJobConfig batchJobConfig;

    @Scheduled(cron = " 0 01 00 * * *")
    public void executeSchedule() {
        Map<String, JobParameter> confMap = new HashMap<>();
        confMap.put("date", new JobParameter(LocalDate.now().toString()));
        JobParameters jobParameters = new JobParameters(confMap);
        
        try {
            jobLauncher.run(batchJobConfig.batchJob(), jobParameters);
        } catch (JobExecutionAlreadyRunningException
                | JobInstanceAlreadyCompleteException
                | JobParametersInvalidException
                | org.springframework.batch.core.repository.JobRestartException e) {
            e.printStackTrace();
        }
    }

}

- @Scheduled(cron = " 0 01 00 * * *") : Cron 식으로 Scheduler 시간 설정 (위 예제는 매일 자정 00:01 분으로 설정) 

- jobLauncher.run(batchJobConfig.batchJob(), jobParameters) : 실행을 원하는 Batch Job을 넣어주고, 실행에 필요한 jobParameter 전달

댓글