2015년 6월 24일 수요일

Spring Boot + Spring Batch (Spring Batch Integration : Remote Partitioning) + Mysql + JPA +Gradle


Spring Boot + Spring Batch (Spring Batch Integration : Remote Partitioning) + Mysql + JPA +Gradle

  • batch 설정 javaConfig, integration 설정 xml
  • Job : resources 인 person.txt의 내용을 DB(mysql)에 저장시킨다.
  • Batch : Job > Step > tasklet > Chunk (read-process-write) 의 포함관계
  • partitioning 이란 Job이 step1 을 실행시키고, step1 이 partionHandler를 이용해 partition 한 후 step2 실행. 실질적인 일은 step2 (reader, processor, writer)가 한다.
  • remote partioning은 서로 다른 서버에 job과 스텝이 있을때, master 서버와 slave 서버를 분리시킨 후 integration을 이용해 통신.
  • Master 서버에서는 job 등록 -> step1 수행 -> partitioner 까지. 후에 메세지 큐를 이용해 Slave로 보냄
  • Slave 서버에서는 master에서 보낸 메시지 큐에서 partition한 step2를 실질적으로 수행.(read, process, write)
  • 그림

    참고 [http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html]

Master 코드

패키지 구조
  • src/main/java
    • kr.co.kware.batch.test
      • MasterApplicaion.java
    • kr.co.kware.batch.test.model
      • Person.java
    • kr.co.kware.batch.test.partition
      • MaterPartitioner.java
    • kr.co.kware.batch.test.tasklet
      • MasterTasklet.java
    • kr.co.kware.batch.test.config
      • MasterBatchConfiguration.java
  • src/main/resouces
    • application.properties
    • integration-master.xml
    • PersonData.txt (first_name, last_name, year 데이터만 있음)
build.gradle
buildscript {
    ext {
        springBootVersion = '1.2.3.RELEASE'
    }

    repositories {
        jcenter()
    }

    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: 'spring-boot'

sourceCompatibility = 1.7
targetCompatibility = 1.7

jar {
    baseName = 'batch-master-mysql'
    version = '0.0.1'
}

repositories {
    jcenter()

    maven { url 'https://repo.spring.io/release' }
    maven { url 'https://repo.spring.io/snapshot' }
    maven { url 'https://repo.spring.io/milestone' }

    maven { url 'https://repository.jboss.org/nexus/content/repositories/releases' }
    maven { url 'https://oss.sonatype.org/content/repositories/releases' }
    maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
    maven { url 'http://repo.maven.apache.org/maven2' }
}

dependencies {

    compile("org.springframework.boot:spring-boot-autoconfigure")
    compile("org.springframework.boot:spring-boot-starter-data-jpa")
    compile("org.springframework.boot:spring-boot-starter-actuator")
    compile("org.springframework.boot:spring-boot-starter-logging")
    compile("org.springframework.boot:spring-boot-starter-aop")

    compile("org.springframework.boot:spring-boot-starter-batch")
    compile("org.springframework.batch:spring-batch-infrastructure")
    compile("org.springframework.batch:spring-batch-integration")

    compile("mysql:mysql-connector-java")

    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-jms")
    compile("org.springframework.integration:spring-integration-amqp")

    compile("org.apache.activemq:activemq-broker:5.11.1")

    testCompile("org.springframework.boot:spring-boot-starter-test")
}
application.properties
spring.jpa.hibernate.ddl-auto=create
spring.jpa.show-sql=false

spring.datasource.driver-class-name= com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/
spring.datasource.username=
spring.datasource.password=
spring.datasource.platform=mysql
spring.datasource.initialize=false

spring.jpa.database=MYSQL
spring.jpa.open-in-view=false
spring.jpa.show-sql=true
spring.jpa.generate-ddl=false
spring.jpa.hibernate.ddl-auto=create-drop
spring.jpa.hibernate.naming-strategy=org.hibernate.cfg.ImprovedNamingStrategy
spring.jpa.properties.hibernate.dialect= org.hibernate.dialect.MySQL5Dialect

spring.activemq.broker-url=
spring.activemq.user=
spring.activemq.password=

spring.batch.initializer.enabled=false
spring.batch.grid.size=5
spring.batch.admin.job.configuration.file.dir=target/config # /META-INF/spring/batch/bootstrap/integration/configuration-context.xml
spring.batch.admin.job.service.reaper.interval= 60000

batch.job.configuration.file.dir= target/config
batch.job.service.reaper.interval= 60000

logging.file=./logs/master.log
logging.level=INFO
integration_master.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/integration/jms
         http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
         http://www.springframework.org/schema/integration
         http://www.springframework.org/schema/integration/spring-integration.xsd">

       <int:channel id="requestsChannel" />

       <int-jms:outbound-channel-adapter connection-factory="jmsConnectionFactory"
                                         channel="requestsChannel"
                                         destination-name="requestsQueue" />

       <int:channel id="replyChannel"/>

       <int-jms:message-driven-channel-adapter connection-factory="jmsConnectionFactory"
                                               channel="replyChannel"
                                               destination-name="replyQueue"/>

       <int:channel id="aggregatedReplyChannel">
           <int:queue/>
       </int:channel>

       <int:aggregator ref="remotePartitionHandler"
                       input-channel="replyChannel"
                       output-channel="aggregatedReplyChannel"
                       send-timeout="3600000"/>

</beans>
Person.java
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity(name="person")
public class Person {

    @Id
    @GeneratedValue(strategy=GenerationType.IDENTITY)
    private int id = 1;

    @Column(name="first_name")
    private String firstName;
    @Column(name="last_name")
    private String lastName;
    @Column(name="year")
    private int year;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getFirstName() {
        return firstName;
    }
    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }
    public String getLastName() {
        return lastName;
    }
    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
    public int getYear() {
        return year;
    }
    public void setYear(int year) {
        this.year = year;
    }
}
MasterApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ImportResource;

@SpringBootApplication
@ImportResource("classpath:integration-master.xml")
public class MasterApplication {
    public static void main(String[] args) {
         ApplicationContext context= SpringApplication.run(MasterApplication.class, args);
          for (String name : context.getBeanDefinitionNames()) {
                System.out.println("BEAN NAME : " + name);
            }
    }
}
MasterBatchCnfiguration.java
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.integration.partition.MessageChannelPartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.orm.jpa.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import spring.boot.batch.mysql.test.model.Person;
import spring.boot.batch.mysql.test.partition.MasterPartitioner;
import spring.boot.batch.mysql.test.step.MasterTasklet;

@SpringBootApplication
@EnableBatchProcessing
@EntityScan(basePackageClasses = Person.class)
@PropertySource("classpath:application.properties")
public class MasterBatchConfiguration {

    /*
     * Load the properties
     */
    @Value("${spring.batch.grid.size}")
    private int gridSize;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;


    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private PollableChannel aggregatedReplyChannel;

    @Autowired
    private MessageChannel requestsChannel;


    @Bean
    public Job importPerson(Step step1) {
        return jobBuilderFactory.get("importPerson")
                    .incrementer(new RunIdIncrementer()) 
                    .start(partitionStep())
                    .build();

    }

    @Bean
    public Step partitionStep() {
        return stepBuilderFactory.get("partitionStep")
                .partitioner(step1())
                .partitioner("step1", new MasterPartitioner(MasterPartitioner.DEFAULT_TEST_VALUE_SIZE))
                .partitionHandler(remotePartitionHandler())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                                  .tasklet(new MasterTasklet())
                                  .build();
    }

    @Bean
    public PartitionHandler remotePartitionHandler() {
        MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
        partitionHandler.setStepName("step1");
        partitionHandler.setGridSize(this.gridSize);
        partitionHandler.setReplyChannel(this.aggregatedReplyChannel);

        MessagingTemplate messagingTemplate = new MessagingTemplate(this.requestsChannel);

        partitionHandler.setMessagingOperations(messagingTemplate);

        return partitionHandler;
    }
}
MasterTasklet.java
  • partitioner에서 job을 전달받은 step2는 slave에서 처리되야하기 때문에, 여기서는 비어있는 tasklet을 넣어준다. 이게 없으면 step2를 찾을수 없다는 에러..ㅠㅠ
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

public class MasterTasklet implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        return RepeatStatus.FINISHED;
    }
}
MasterPartitioner : 어떻게 파티션할것인가.
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Value;

import java.util.HashMap;
import java.util.Map;

public class MasterPartitioner implements Partitioner {
    public static long DEFAULT_TEST_VALUE_SIZE = 10L;

    private long testValueSize = DEFAULT_TEST_VALUE_SIZE;

    public MasterPartitioner(long testValueSize) {
        this.testValueSize = testValueSize;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        long targetSize = testValueSize / gridSize;

        Map<String, ExecutionContext> result = new HashMap<>();
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext value = new ExecutionContext();
            result.put("partition" + i, value);

            value.putLong("minValue", i * targetSize);
            value.putLong("maxValue", (i + 1) * targetSize -1);
        }

        return result;
    }
}

Slave 코드

패키지 구조
  • src/main/java
    • kr.co.kware.batch.test.slave
      • SlaveApplication.java (마스터와 상동, xml 파일명을 integration-slave.xml 변경)
    • kr.co.kware.batch.test.slave.model
      • Person.java (마스터와 상동)
    • kr.co.kware.batch.test.slave.config
      • SlaveBatchConfiguration.java
    • kr.co.kware.batch.test.slave.processor
      • PersonProcessor.java
    • kr.co.kware.batch.test.slave.tasklet
      • SlaveTasklet.java
  • src/main/resources
    • application.properties (마스터와 상동)
    • PersonData.txt
    • integration-slave.xml
integration-slave.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/integration/jms
         http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
         http://www.springframework.org/schema/integration
         http://www.springframework.org/schema/integration/spring-integration.xsd">

    <int:channel id="requestChannel"/>

    <int-jms:message-driven-channel-adapter connection-factory="jmsConnectionFactory"
                                            destination-name="requestsQueue"
                                            channel="requestChannel"/>

    <int:channel id="replyChannel"/>

    <int-jms:outbound-channel-adapter connection-factory="jmsConnectionFactory"
                                      destination-name="replyQueue"
                                      channel="replyChannel"/>

    <int:service-activator input-channel="requestChannel"
                           output-channel="replyChannel"
                           ref="remoteStepExecutionRequestHandler"/>
</beans>
# SlaveBachConfiguration

import javax.sql.DataSource;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
import org.springframework.batch.integration.partition.BeanFactoryStepLocator;
import org.springframework.batch.integration.partition.StepExecutionRequestHandler;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;

import srping.boot.batch.mysql.test.model.Person;
import srping.boot.batch.mysql.test.processor.PersonItemProcessor;

@Configuration
@EnableBatchProcessing
public class SlaveBatchConfiguration {
    private static final String OVERRIDDEN_BY_EXPRESSION = null;
    @Autowired
    private DataSource dataSource;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private BeanFactory beanFactory;

    @Autowired
    private ResourcePatternResolver resourcePatternResolver;

    @Autowired
    private JobBuilderFactory jobBuilders;

    @Autowired
    private StepBuilderFactory stepBuilders;

    @Autowired
    private JpaVendorAdapter JpaVendorAdapter;

    @Autowired
    LocalContainerEntityManagerFactoryBean entityManageFactory;

    @Bean
    @StepScope
    public FlatFileItemReader<Person> reader(){
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();

        reader.setResource(new ClassPathResource("PersonData.txt"));
        reader.setLineMapper(new DefaultLineMapper<Person>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[] {"firstName","lastName","year"});
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }});
        }});
        System.out.println(">>> reader" );
        return reader;
    }

    @Bean
    public ItemProcessor<Person, Person> processor() {
        System.out.println(">>> processor");
        return new PersonItemProcessor();
    }

    @Bean
    public ItemWriter<Person> writer() {
        JpaItemWriter<Person> itemWriter = new JpaItemWriter<Person>();
        itemWriter.setEntityManagerFactory(this.entityManageFactory.getObject());
        return itemWriter;
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                                 .<Person, Person> chunk(10)
                                 .reader(reader())
                                 .processor(processor())
                                 .writer(writer())
                                 .build();
    }

    @Bean
    public StepExecutionRequestHandler remoteStepExecutionRequestHandler() {
        StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
        BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
        beanFactoryStepLocator.setBeanFactory(this.beanFactory);

        stepExecutionRequestHandler.setStepLocator(beanFactoryStepLocator);

        JobExplorer jobExplorer = null;
        try {
            JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
            jobExplorerFactoryBean.setDataSource(this.dataSource);
            jobExplorerFactoryBean.afterPropertiesSet();

            jobExplorer = jobExplorerFactoryBean.getObject();
        } catch (Exception e) {
            e.printStackTrace();
        }

        stepExecutionRequestHandler.setJobExplorer(jobExplorer);

        return stepExecutionRequestHandler;
    }
}
PersonItemProcessor

import org.springframework.batch.item.ItemProcessor;

import srping.boot.batch.mysql.test.model.Person;

public class PersonItemProcessor implements ItemProcessor<Person, Person>{

    @Override
    public Person process(final Person item) throws Exception {
        return item;
    }

}
SlvaeTasklet.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

public class TestSlaveTasklet implements Tasklet {
    private static final Logger LOG = LoggerFactory.getLogger(TestSlaveTasklet.class);

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        LOG.info("step execution context : " + chunkContext.getStepContext().getStepExecutionContext());

        return RepeatStatus.FINISHED;
    }
}