
Spring Batch의 동작 코드 #Job 생성과 실행 본문


Spring Batch의 동작 코드 #Job 생성과 실행

woniper1 2018. 5. 4. 12:30

누구나 아는 Spring Batch 기본 개념에서 Spring Batch Domain에 대해 간단히 알아보았다. 이번 글에선 기본 개념에 정리한 여러 클래스가 서로 어떤 의존성을 가지며, 어떻게 동작하는지 코드를 살펴볼 것이다. 모든 코드를 다 볼 수 없기에 이번 글은 Job이 어떻게 생성되고 실행되는지 살펴본다.


public class SimpleConfiguration {

    private JobBuilderFactory jobBuilderFactory;

    private StepBuilderFactory stepBuilderFactory;

    public Job job() {
        return jobBuilderFactory.get("simple-job")

    public Step step() {
        return stepBuilderFactory.get("simple-step")
                .<String, StringWrapper>chunk(10)

    private ItemReader<String> itemReader() {
        List<String> list = new ArrayList<>();

        for (int i = 0; i < 100; i++) {
            list.add("test" + i);

        return new ListItemReader(list);

    private ItemProcessor<String, StringWrapper> itemProcess() {
        return StringWrapper::new;

    private ItemWriter<StringWrapper> itemWriter() {
        return System.out::println;

    private class StringWrapper {
        private String value;

        StringWrapper(String value) {
            this.value = value;

        public String getValue() {
            return value;

        public String toString() {
            return String.format("i'm %s", getValue());

위 예제는 아주 단순한 batch다.

  • Job은 하나의 Step을 갖고 있으며,
  • Step의 ItemReader는 ArrayList에 100개의 String value를 담고 있다. (읽기)
  • ItemProcessor는 ItemReader에서 반환된 String List를 StringWrapper 클래스로 wrapping 한다. (가공)
  • ItemWriter는 ItemProcessor를 통해 StringWrapper로 반환된 List를 System.out.println으로 로그를 찍는다. (쓰기)


public class JobRunnerConfiguration {

    public JobLauncherTestUtils utils() throws Exception {
        return new JobLauncherTestUtils();


Batch를 실행하기 위한 JobLauncherTestUtils를 Bean으로 등록

@ContextConfiguration(classes = { SimpleConfiguration.class, JobRunnerConfiguration.class})
public class SimpleConfigurationTests {

    private JobLauncherTestUtils jobLauncherTestUtils;

    public void testLaunchJob() throws Exception {
  • SimpleConfiguration, JobRunnerConfiguration@Contextconfiguration을 이용해 테스트에 필요한 Config Bean으로 등록
  • Bean으로 등록된 JobLauncherTestUtils를 주입(@Autowired) 받아 Batch Job(SimpleConfiguration)을 실행
  • 이때 JobLauncherTestUtils은 SimpleJobLauncher를 이용해 Batch Job을 실행
  • SimpleConfiguration 예제의 JobBuilderFactory, StepBuilderFactory의 자세한 설명은 생략한다. Job과 Step을 생성하는 객체라고 생각하면 된다.


최대한 간단하게 diagram을 그리려 노력했다. 위 예제를 기준으로 Spring Batch가 내부적으로 어떻게 동작하는지 살펴보자.

일단 예제에서 JobLauncherTestUtils가 SimpleJobLauncher를 통해 Job을 실행한다고 설명했다. 실제로 어떻게 실행하는지 코드를 보자.

public JobExecution run(final Job job, final JobParameters jobParameters)
        throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
        JobParametersInvalidException {

    Assert.notNull(job, "The Job must not be null.");
    Assert.notNull(jobParameters, "The JobParameters must not be null.");

    final JobExecution jobExecution;
    JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
    if (lastExecution != null) {
        // 재실행 가능한 Job 인지 체크
        if (!job.isRestartable()) {
            throw new JobRestartException("JobInstance already exists and is not restartable");
         * validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING
         * retrieve the previous execution and check
        for (StepExecution execution : lastExecution.getStepExecutions()) {
            BatchStatus status = execution.getStatus();
            if (status.isRunning() || status == BatchStatus.STOPPING) {
                throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
                        + lastExecution);
            } else if (status == BatchStatus.UNKNOWN) {
                throw new JobRestartException(
                        "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
                            + "The last execution ended with a failure that could not be rolled back, "
                            + "so it may be dangerous to proceed. Manual intervention is probably necessary.");


    jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

    try {
        taskExecutor.execute(new Runnable() {

            public void run() {
                try {
                    logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
                            + "]");
                    logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
                            + "] and the following status: [" + jobExecution.getStatus() + "]");
                catch (Throwable t) {
                    logger.info("Job: [" + job
                            + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
                            + "]", t);

            private void rethrow(Throwable t) {
                if (t instanceof RuntimeException) {
                    throw (RuntimeException) t;
                else if (t instanceof Error) {
                    throw (Error) t;
                throw new IllegalStateException(t);
    catch (TaskRejectedException e) {
        if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {

    return jobExecution;
  • JobExecution 반환
  • JobRepository로 JobExecution 조회 및 생성

run 메소드는 Job 객체와 JobParamter 객체를 받아 JobRepository를 이용해 JobExecution을 조회(getLastJobExecution) 및 생성(createJobExecution)한다.


public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
        throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {

    Assert.notNull(jobName, "Job name must not be null.");
    Assert.notNull(jobParameters, "JobParameters must not be null.");

    JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
    ExecutionContext executionContext;

    if (jobInstance != null) {

        List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);

        for (JobExecution execution : executions) {
            if (execution.isRunning() || execution.isStopping()) {
                throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
                        + jobInstance);
            BatchStatus status = execution.getStatus();
            if (status == BatchStatus.UNKNOWN) {
                throw new JobRestartException("Cannot restart job from UNKNOWN status. "
                        + "The last execution ended with a failure that could not be rolled back, "
                        + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
            if (execution.getJobParameters().getParameters().size() > 0 && (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED)) {
                throw new JobInstanceAlreadyCompleteException(
                        "A job instance already exists and is complete for parameters=" + jobParameters
                        + ".  If you want to run this job again, change the parameters.");
        executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
    else {
        jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
        executionContext = new ExecutionContext();

    JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null);
    jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));


    return jobExecution;


SimpleJobRepository는 JobRepository interface의 구현체.

  • JobInstance를 조회
  • JobInstance가 null이 아니라면, 실행 가능한 Job인지 체크 후 JobExecution을 조회(ecDao.getExecutionContext)
  • JobInstance가 null이라면, JobInstance와 ExecutionContext생성
  • 마지막으로 JobExecution을 저장

이 코드에서 몇가지 객체가 눈에 들어온다.


앞 글에서 JobExecution에 대해 설명했다. Job이 한번 실행될 때 생성되는 객체다. 이 객체는 Job이 실행되는 데 위해 필요한 아래와 같은 객체를 담고 있다.

  • JobParamter : Job을 실행하기 위해 필요한 paramter
  • JobInstance : JobExecution을 조회하기 위한 id, name
  • Collection : Job이 포함하고 있는 실행 가능한 StepExecution List
  • Job 실행 생성, 시작, 종료, 수정 시간
  • 그 외 여러 객체
private final JobParameters jobParameters;
private JobInstance jobInstance;
private volatile Collection<StepExecution> stepExecutions = Collections.synchronizedSet(new LinkedHashSet<>());
private volatile BatchStatus status = BatchStatus.STARTING;
private volatile Date startTime = null;
private volatile Date createTime = new Date(System.currentTimeMillis());
private volatile Date endTime = null;
private volatile Date lastUpdated = null;
private volatile ExitStatus exitStatus = ExitStatus.UNKNOWN;
private volatile ExecutionContext executionContext = new ExecutionContext();
private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<>();
private final String jobConfigurationName;

JobExecution에 멤버 변수로 선언된 객체들.


ExecutionContext 객체는 Job이 실행되는 동안 필요한 데이터를 메모리(Map)에 저장하고 관리하는 객체다. 실제로 이 객체를 살펴보면 Map을 통해 데이터를 저장, 조회한다.

ExecutionContext의 생명 주기는 Job이 실행되는 동안 사용된다.


다시 SimpleJobLauncher 코드로 돌아가 보자. job.execute 메소드가 바로 Job을 실행하는 부분이다. JobRepository를 통해 생성된 JobExecution을 argument로 넘긴다. 즉, JobExecution은 Job을 실행하는 데 필요한 객체다.


public final void execute(JobExecution execution) {

    if (logger.isDebugEnabled()) {
        logger.debug("Job execution starting: " + execution);

    // 1. ThreadLocal에 현재 실행될 Job 등록

    try {

        // 2. 실행 가능한 Job인지 JobParameter 검증

        if (execution.getStatus() != BatchStatus.STOPPING) {

            // 3. 시작 시간 등록
            execution.setStartTime(new Date());
            // 4. Batch 상태를 시작으로 변경
            updateStatus(execution, BatchStatus.STARTED);

            // 5. JobExecutionListener.beforeJob 실행 (전 처리)

            try {
                // 6. job 구현체 실행
                if (logger.isDebugEnabled()) {
                    logger.debug("Job execution complete: " + execution);
            } catch (RepeatException e) {
                throw e.getCause();
        } else {

            // The job was already stopped before we even got this far. Deal
            // with it in the same way as any other interruption.
            if (logger.isDebugEnabled()) {
                logger.debug("Job execution was stopped: " + execution);


    } catch (JobInterruptedException e) {
        logger.info("Encountered interruption executing job: "
                + e.getMessage());
        if (logger.isDebugEnabled()) {
            logger.debug("Full exception", e);
        execution.setExitStatus(getDefaultExitStatusForFailure(e, execution));
        execution.setStatus(BatchStatus.max(BatchStatus.STOPPED, e.getStatus()));
    } catch (Throwable t) {
        logger.error("Encountered fatal error executing job", t);
        execution.setExitStatus(getDefaultExitStatusForFailure(t, execution));
    } finally {
        try {
            if (execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED)
                    && execution.getStepExecutions().isEmpty()) {
                ExitStatus exitStatus = execution.getExitStatus();
                ExitStatus newExitStatus =
                        ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job.");

            execution.setEndTime(new Date());

            try {
                // 7. JobExecutionListener.afterJob 실행 (후 처리)
            } catch (Exception e) {
                logger.error("Exception encountered in afterStep callback", e);

        } finally {



코드가 길고 복잡하지만, 주석을 보면 그렇게 복잡한 로직은 아니다.

5. JobExecutionListener.beforeJob 실행

Job이 실행되기 전처리, 후처리 가능한 JobExecutionListener가 있다. 이 정도만 알고 넘어가자.

6. job 구현체 실행

abstract protected void doExecute(JobExecution execution) throws JobExecutionException;

AbstractJob은 이름과 같이 추상 클래스다. AbstractJob.doExecute 메소드는 추상 메소드다. 이를 상속받아 구현된 Job 객체가 doExecution을 구현하고 있을 것이다. 그럼 이 예제에서 AbstractJob을 구현한 구현 객체는 무엇일까?


protected void doExecute(JobExecution execution) throws JobInterruptedException, JobRestartException,
StartLimitExceededException {

    StepExecution stepExecution = null;
    for (Step step : steps) {
        stepExecution = handleStep(step, execution);
        if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
            // Terminate the job if a step fails

    // Update the job status to be the same as the last step
    if (stepExecution != null) {
        if (logger.isDebugEnabled()) {
            logger.debug("Upgrading JobExecution status: " + stepExecution);

SimpleJob이 바로 AbstractJob을 구현한 구현체다. doExecute 메소드는 Step List를 실행한다.

Step이 실행되는 과정은 다음 포스팅에.
