1. 배치 어플리케이션 정의
일괄 처리(batch processing)란 최종 사용자의 개입 없이 또는 (자원이 허가한다면) 실행을 스케줄링할 수 있는 작업(job)의 실행을 의미한다.[1]
- 위키백과 (https://ko.wikipedia.org/wiki/%EC%9D%BC%EA%B4%84_%EC%B2%98%EB%A6%AC)
1-1. 배치 어플리케이션이 필요한 이유?
- 사용자들에게 보여주기 위해 매일매일 집계되어야 하는 데이터를 운영중인 WAS 에서 수행하게 된다면 해당 서버는 CPU, I/O 등의 자원을 모두 사용하게 되어버려 이용중인 사용자들의 요청들을 처리할 수 없게 되어버려 운영 중의 이슈발생
- 10만개의 데이터를 처리하는 중 3만번째에서 실패하게 된다면, 3만1번째에서 이어서 실행 가능
- 이미 해당일자(= jobParameter)의 배치 함수가 실행 된 경우라면 중복실행 건 실패처리
1-2. 배치 어플리케이션 조건
1. 대용량 데이터 - 배치 어플리케이션은 대량의 데이터를 가져오거나, 전달하거나, 계산하는 등의 처리를 할 수 있어야 합니다.
2. 자동화 - 배치 어플리케이션은 심각한 문제 해결을 제외하고는 사용자 개입 없이 실행되어야 합니다.
3. 견고성 - 배치 어플리케이션은 잘못된 데이터를 충돌/중단 없이 처리할 수 있어야 합니다.
4. 신뢰성 - 배치 어플리케이션은 무엇이 잘못되었는지를 추적할 수 있어야 합니다. (로깅, 알림)
5. 성능 - 배치 어플리케이션은 지정한 시간 안에 처리를 완료하거나 동시에 실행되는 다른 어플리케이션을 방해하지 않도록 수행되어야합니다.
- 이동욱 개발자님 "기억보단 기록을" https://jojoldu.tistory.com/324
2. Spring Batch + MyBatis + Spring Scheduler
2-1. Spring Batch + MyBatis
2-1-1. pom.xml
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
2-1-2. Application.java
- BatchApplication.java 에 @EnableBatchProcessing 해당 어노테이션 붙여서 Spring Batch 기능 활성화
@EnableBatchProcessing //배치기능 활성화 !!
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
2-1-3. BatchJobConfig.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class batchJobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final SqlSessionFactory sqlSessionFactory;
private static final int CHUNK_SIZE = 1000;
@Bean
public Job batchJob() {
return jobBuilderFactory.get("batchJob")
.start(batchStep(null))
.build();
}
@Bean
@JobScope
public Step batchStep(@Value("#{jobParameters[date]}") String date) {
log.debug("batch job execution date : {}", date); // Batch Job 실행일시
return stepBuilderFactory.get("step")
.<MyDto, String>chunk(CHUNK_SIZE)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
public ItemReader<MyDto> reader() {
return new MyBatisCursorItemReaderBuilder<MyDto>()
.sqlSessionFactory(sqlSessionFactory)
.queryId("com.batch.test.findData")
.build();
}
@Bean
public ItemProcessor<MyDto, String> processor() {
return myDto -> {
return myDto.getStringData();
};
}
@Bean
public ItemWriter<String> writer() {
return new MyBatisBatchItemWriterBuilder<String>()
.sqlSessionFactory(sqlSessionFactory)
.statementId("com.batch.test.insertData")
.build();
}
- CHUNK_SIZE : Spring Batch 는 Chunk 기반 처리 (각 commit 사이에 처리되는 row 수)
- @Value("#{jobParameters[date]}" : @JobScope(Job 또는 Step에 선언), @StepScope(ItemReader, ItemWriter 등 에 선언) 를 사용하여 파라미터를 받을 수 있음
- jobBuilderFactory.get("batchJob") : Builder 를 통해 batchJob 이라는 이름의 Batch Job 생성
- stepBuilderFactory.get("simpleStep1") : Builder 를 통해 step 이란 이름의 Batch Step 생성
- reader : ItemReader 의 read() 를 통해 아이템을 하나씩 읽음
- processor : reader로 부터 읽어온 데이터들을 ItemProcessor 에서 가공 또는 필터링
- writer : reader, processor를 거친 데이터의 갯수가 commit interval에 도달하면, 모든 chunk는 ItemWriter로 한꺼번에 전달되고 트랜잭션은 커밋
2-1-4. schema-postgre.sql
- Spring Batch 관련된 meta data 저장하는 테이블 필요
- 아래는 PostgreSQL 에 해당하는 batch meta table schema 정보 (환경에 맞는 DB Schema 찾아서 적용)
-- Autogenerated: do not edit this file
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL TIMESTAMP DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME TIMESTAMP NOT NULL ,
END_TIME TIMESTAMP DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
2-1-5. ContextRefreshedEventListener.java
@Configuration
@Slf4j
@RequiredArgsConstructor
public class ContextRefreshedEventListener implements ApplicationListener<ContextRefreshedEvent> {
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
log.info("Stop running jobs.");
for (String jobName : jobExplorer.getJobNames()) {
Set<JobExecution> runningJobExecutions = jobExplorer.findRunningJobExecutions(jobName);
for (JobExecution jobExecution : runningJobExecutions) {
jobExecution.setStatus(BatchStatus.STOPPED);
jobExecution.setEndTime(new Date());
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (stepExecution.getStatus().isRunning()) {
stepExecution.setStatus(BatchStatus.STOPPED);
stepExecution.setEndTime(new Date());
jobRepository.update(stepExecution);
}
}
jobRepository.update(jobExecution);
log.info("Updated job execution status: {}", jobExecution.getJobId());
}
}
log.info("Stopped running jobs.");
}
}
- Spring Batch Job 실행 중 비정상적인 Job 에 대해서 status = stopped 로 바꿔주는 처리
2-2. Spring Batch + Spring Scheduler
2-2-1. Application.java
- BatchApplication.java 에 @EnableScheduling 해당 어노테이션 붙여서 Spring Schedule 기능 활성화
@EnableScheduling // Spring Schedule 활성화 !!
@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
2-2-2. BatchJobScheduler.java
@Slf4j
@RequiredArgsConstructor
@Component
public class BatchJobScheduler {
private final JobLauncher jobLauncher;
private final BatchJobConfig batchJobConfig;
@Scheduled(cron = " 0 01 00 * * *")
public void executeSchedule() {
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("date", new JobParameter(LocalDate.now().toString()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(batchJobConfig.batchJob(), jobParameters);
} catch (JobExecutionAlreadyRunningException
| JobInstanceAlreadyCompleteException
| JobParametersInvalidException
| org.springframework.batch.core.repository.JobRestartException e) {
e.printStackTrace();
}
}
}
- @Scheduled(cron = " 0 01 00 * * *") : Cron 식으로 Scheduler 시간 설정 (위 예제는 매일 자정 00:01 분으로 설정)
- jobLauncher.run(batchJobConfig.batchJob(), jobParameters) : 실행을 원하는 Batch Job을 넣어주고, 실행에 필요한 jobParameter 전달
댓글