본문 바로가기
Back-End/Server

[Kong's Blog] 프로젝트 회고와 리팩토링 (3) - 배치 사용해보기

by kong_tae 2025. 1. 15.

들어가며

이전에 구현하였던 크롤링 작업을 통해 가져온 게시글 (제목, 내용)을 Post 엔티티로 생성하여 저장하는 기능을 사용하는 Spring Batch . 를 구현하고자 합니다. 단순히 Scheduler 를 통해 해당 작업 지정된 시간마다 수행할 수 있지만, 왜 대용량 데이터를 처리하는데 Spring Batch 를 사용하는지에 대해 궁금함이 생겨 이를 직접 느껴보고자 도입하게 되었습니다!

Batch 는 뭘까?

Spring Batch provides reusable functions that are essential in processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management. It also provides more advanced technical services and features that will enable extremely high-volume and high performance batch jobs through optimization and partitioning techniques. Simple as well as complex, high-volume batch jobs can leverage the framework in a highly scalable manner to process significant volumes of information.

출처 : https://spring.io/projects/spring-batch

'로깅/추적, 트랜잭션 관리, 작업 처리 통계, 작업 재시작, 건너뛰기 등' 대량의 레코드를 처리하는 데 필수적인 재사용 가능한 기능을 제공한다고합니다. 저의 소규모 데이터 크롤링 후, DB Insert 작업에서도 Spring Batch 가 제공해주는 '트랜잭션 관리/ 재시작 및 오류 처리 기능' 정도는 간접적으로라도 장점으로 느낄 . 수 있을 것 같다고 생각이 듭니다.

사용해보고자 하는 것 (목표)

1. 크롤링 로깅 처리.

2. 크롤링한 데이터를 DB에 Insert 시, Transaction 처리.

이러한 기능에 초점을 맞춰서 Spring Batch 를 통한 작업을 구현해보겠습니다!

구현 해보기.

위의 spring-batch 사이트의 예제를 보니, 아래와 같은 구조로 하나의 Job 을 구성하는 것 같습니다.

1. Reader : File, DB, 외부 데이터로부터 필요한 정보를 읽는 역할.

2. Processor : 읽어온 데이터를 가공하는 역할.

3. Writer : 가공한 데이터를 저장하는 역할.

저의 상황에서는 Reader 는 크롤러, Processor 는 읽어온 크롤링 데이터를 Post 엔터티로 변환, Writer 는 마지막으로 변환된 엔터티를 DB에 저장하는 역할로 구현하면 될 것 같습니다!

Reader

@RequiredArgsConstructor
public class PostCrawlingReader implements ItemReader<PostSearchDto> {

    private final TistoryCrawlerService tistoryCrawlerService;
    private List<PostSearchDto> posts;
    private Iterator<PostSearchDto> iterator;

    @Override
    public PostSearchDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        if (posts == null) {
            List<Long> postIds = tistoryCrawlerService.getPostIdList();
            postIds.stream().forEach(id -> {
                System.out.println("id : " + id);
            });
            posts = tistoryCrawlerService.getPosts(postIds);
            iterator = posts.iterator();
        }

        return iterator.hasNext() ? iterator.next() : null;
    }
}

한번에 데이터를 읽고, 바로 Processor 로 넘겨주는 것이 일반적이라고 생각하였는데 그게 아니었습니다. 그럼 왜 이런식으로 구현을 했을 까?(예제에서는) 생각해봤을 때, 예외를 컨트롤하기 더 쉬울 것 같다는 생각이 들었습니다. 왜 그렇게 생각하였냐면, 만약 Read -> Proceesor 단계중에서는 예외가 발생한다면 다시 데이터를 읽어와야할텐데 만약 전체를 전부 불러오게 된다면 예외처리 단위가 커지기 때문입니다.

Ex: 크롤링한 게시글 중 1개만 문제가 있는데, read 단계에서 매번 글을 읽어서 가져온다면 다시 모든 게시글을 다 가져오게 될 것이기 때문!

이라고 생각이됩니다.. (추후, Batch를 본격적으로 쓰게 된다면 업데이트하겠습니다)

Processor

@RequiredArgsConstructor
public class PostCrawlingProcessor implements ItemProcessor<PostSearchDto, Post> {

    @Override
    public Post process(PostSearchDto item) throws Exception {
        System.out.println("item : " + item.title());
        return BatchMapper.toPost(item);
    }
}

Writer

@RequiredArgsConstructor
public class PostCrawlingWriter implements ItemWriter<Post> {

    private final PostRepository postRepository;

    @Override
    public void write(Chunk<? extends Post> chunk) throws Exception {

        chunk.getItems().stream().forEach(post -> {
            System.out.println("제목 : " + post.getTitle());
        });

//        if (chunk.getItems().stream().allMatch(item -> item instanceof Post)) {
//            throw new RuntimeException("타입이 일치하지 않습니다.");
//        }

        postRepository.saveAll((List<Post>) chunk.getItems());
    }
}

Config

@Configuration
@RequiredArgsConstructor
public class PostBatchConfig {
    private final JobRepository jobRepository;
    private final JobLauncher jobLauncher;
    private final PlatformTransactionManager transactionManager;
    private final PostRepository postRepository;
    private final TistoryCrawlerService tistoryCrawlerService;


    @Bean
    public PostCrawlingWriter postCrawlingWriter() {
        return new PostCrawlingWriter(postRepository);
    }

    @Bean
    public PostCrawlingProcessor postCrawlingProcessor() {
        return new PostCrawlingProcessor();
    }

    @Bean
    public PostCrawlingReader postCrawlingReader() {
        return new PostCrawlingReader(tistoryCrawlerService);
    }


    @Bean
    public Job postCrwalingJob() {
        return new JobBuilder("postCrawlingJob", jobRepository)
                .start(postCrwalingStep())
                .build();
    }

    @Bean
    public Step postCrwalingStep() {
        return new StepBuilder("postCrwalingStep", jobRepository)
                .<PostSearchDto, Post>chunk(10, transactionManager)
                .reader(postCrawlingReader())
                .processor(postCrawlingProcessor())
                .writer(postCrawlingWriter())
                .build();
    }

    public void runJob() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("time", Long.toString(System.currentTimeMillis()))
                .toJobParameters();

        // 배치 작업 실행
        jobLauncher.run(postCrwalingJob(), jobParameters);
    }
}

특이한점은 Reader는 Processor에게 1개씩 전달을 하는데, Writer는 또 정의한 chunk 단위만큼을 한번에 처리를하고 있습니다.

위에서는 '예외처리의 용이성때문에 1개씩 전달하는 것이 아닐까?'라는 생각을 하였었는데 Writer에서 그럼 예외가 발생하면 어떻게 해야할까라는 생각이들기도 합니다. 일단은 Writer 는 단순 저장의 역할을 하고 있으니, 예외가 발생하지 않을 것이라는 낙관적 생각을 하고 구현하였습니다 . . .!

테스트 결과

runjob() 함수를 통해 한번 배치작업을 실행시켜보겠습니다!

Reader 에서는 읽어온 게시글의 id를 출력하도록 하였고, Writer 와 Processor 에서는 제목을 출력하도록 하였습니다. (코드 참고!)

가설로는 Reader -> Processor 가 chunk 단위만큼 반복후, 한번에 작성하는 것으로 이해하였는데 출력 순서를 보면 Reader를 통해 데이터를 chunk 단위만큼 읽어온 후, Processor에게 순서대로 하나씩 전달하였습니다. 그리고 마지막으로 Writer 에서 해당 데이터를 한번에 처리하고 있었습니다.

DB에도 잘 저장이 되었습니다!

Batch 자체는 주기적으로 실행하는 기능이 없기 때문에 Scheduler 를 함께 자주 사용하는 것 같습니다. 저 또한 추후 Scheduler를 함께 사용하여, 정기적으로 저의 Tistory 블로그 글을 크롤링하여 제 EC2 블로그로 가져올 수 있도록 코드를 수정해야겠습니다!

댓글