Spring Boot + Spring Batch (Spring Batch Integration : Remote Partitioning) + Mysql + JPA +Gradle
- batch 설정 javaConfig, integration 설정 xml
- Job : resources 인 person.txt의 내용을 DB(mysql)에 저장시킨다.
- Batch : Job > Step > tasklet > Chunk (read-process-write) 의 포함관계
- partitioning 이란 Job이 step1 을 실행시키고, step1 이 partionHandler를 이용해 partition 한 후 step2 실행. 실질적인 일은 step2 (reader, processor, writer)가 한다.
- remote partioning은 서로 다른 서버에 job과 스텝이 있을때, master 서버와 slave 서버를 분리시킨 후 integration을 이용해 통신.
- Master 서버에서는 job 등록 -> step1 수행 -> partitioner 까지. 후에 메세지 큐를 이용해 Slave로 보냄
- Slave 서버에서는 master에서 보낸 메시지 큐에서 partition한 step2를 실질적으로 수행.(read, process, write)
- 그림
참고 [http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html]
Master 코드
패키지 구조
- src/main/java
- kr.co.kware.batch.test
- kr.co.kware.batch.test.model
- kr.co.kware.batch.test.partition
- kr.co.kware.batch.test.tasklet
- kr.co.kware.batch.test.config
- MasterBatchConfiguration.java
- src/main/resouces
- application.properties
- integration-master.xml
- PersonData.txt (first_name, last_name, year 데이터만 있음)
build.gradle
buildscript {
ext {
springBootVersion = '1.2.3.RELEASE'
}
repositories {
jcenter()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: 'spring-boot'
sourceCompatibility = 1.7
targetCompatibility = 1.7
jar {
baseName = 'batch-master-mysql'
version = '0.0.1'
}
repositories {
jcenter()
maven { url 'https://repo.spring.io/release' }
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
maven { url 'https://repository.jboss.org/nexus/content/repositories/releases' }
maven { url 'https://oss.sonatype.org/content/repositories/releases' }
maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
maven { url 'http://repo.maven.apache.org/maven2' }
}
dependencies {
compile("org.springframework.boot:spring-boot-autoconfigure")
compile("org.springframework.boot:spring-boot-starter-data-jpa")
compile("org.springframework.boot:spring-boot-starter-actuator")
compile("org.springframework.boot:spring-boot-starter-logging")
compile("org.springframework.boot:spring-boot-starter-aop")
compile("org.springframework.boot:spring-boot-starter-batch")
compile("org.springframework.batch:spring-batch-infrastructure")
compile("org.springframework.batch:spring-batch-integration")
compile("mysql:mysql-connector-java")
compile("org.springframework.boot:spring-boot-starter-integration")
compile("org.springframework.integration:spring-integration-jms")
compile("org.springframework.integration:spring-integration-amqp")
compile("org.apache.activemq:activemq-broker:5.11.1")
testCompile("org.springframework.boot:spring-boot-starter-test")
}
application.properties
spring.jpa.hibernate.ddl-auto=create
spring.jpa.show-sql=false
spring.datasource.driver-class-name= com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql:
spring.datasource.username=
spring.datasource.password=
spring.datasource.platform=mysql
spring.datasource.initialize=false
spring.jpa.database=MYSQL
spring.jpa.open-in-view=false
spring.jpa.show-sql=true
spring.jpa.generate-ddl=false
spring.jpa.hibernate.ddl-auto=create-drop
spring.jpa.hibernate.naming-strategy=org.hibernate.cfg.ImprovedNamingStrategy
spring.jpa.properties.hibernate.dialect= org.hibernate.dialect.MySQL5Dialect
spring.activemq.broker-url=
spring.activemq.user=
spring.activemq.password=
spring.batch.initializer.enabled=false
spring.batch.grid.size=5
spring.batch.admin.job.configuration.file.dir=target/config # /META-INF/spring/batch/bootstrap/integration/configuration-context.xml
spring.batch.admin.job.service.reaper.interval= 60000
batch.job.configuration.file.dir= target/config
batch.job.service.reaper.interval= 60000
logging.file=./logs/master.log
logging.level=INFO
integration_master.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd">
<int:channel id="requestsChannel" />
<int-jms:outbound-channel-adapter connection-factory="jmsConnectionFactory"
channel="requestsChannel"
destination-name="requestsQueue" />
<int:channel id="replyChannel"/>
<int-jms:message-driven-channel-adapter connection-factory="jmsConnectionFactory"
channel="replyChannel"
destination-name="replyQueue"/>
<int:channel id="aggregatedReplyChannel">
<int:queue/>
</int:channel>
<int:aggregator ref="remotePartitionHandler"
input-channel="replyChannel"
output-channel="aggregatedReplyChannel"
send-timeout="3600000"/>
</beans>
Person.java
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@Entity(name="person")
public class Person {
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
private int id = 1;
@Column(name="first_name")
private String firstName;
@Column(name="last_name")
private String lastName;
@Column(name="year")
private int year;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
}
MasterApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ImportResource;
@SpringBootApplication
@ImportResource("classpath:integration-master.xml")
public class MasterApplication {
public static void main(String[] args) {
ApplicationContext context= SpringApplication.run(MasterApplication.class, args);
for (String name : context.getBeanDefinitionNames()) {
System.out.println("BEAN NAME : " + name);
}
}
}
MasterBatchCnfiguration.java
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.integration.partition.MessageChannelPartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.orm.jpa.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import spring.boot.batch.mysql.test.model.Person;
import spring.boot.batch.mysql.test.partition.MasterPartitioner;
import spring.boot.batch.mysql.test.step.MasterTasklet;
@SpringBootApplication
@EnableBatchProcessing
@EntityScan(basePackageClasses = Person.class)
@PropertySource("classpath:application.properties")
public class MasterBatchConfiguration {
@Value("${spring.batch.grid.size}")
private int gridSize;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private PollableChannel aggregatedReplyChannel;
@Autowired
private MessageChannel requestsChannel;
@Bean
public Job importPerson(Step step1) {
return jobBuilderFactory.get("importPerson")
.incrementer(new RunIdIncrementer())
.start(partitionStep())
.build();
}
@Bean
public Step partitionStep() {
return stepBuilderFactory.get("partitionStep")
.partitioner(step1())
.partitioner("step1", new MasterPartitioner(MasterPartitioner.DEFAULT_TEST_VALUE_SIZE))
.partitionHandler(remotePartitionHandler())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new MasterTasklet())
.build();
}
@Bean
public PartitionHandler remotePartitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(this.gridSize);
partitionHandler.setReplyChannel(this.aggregatedReplyChannel);
MessagingTemplate messagingTemplate = new MessagingTemplate(this.requestsChannel);
partitionHandler.setMessagingOperations(messagingTemplate);
return partitionHandler;
}
}
MasterTasklet.java
- partitioner에서 job을 전달받은 step2는 slave에서 처리되야하기 때문에, 여기서는 비어있는 tasklet을 넣어준다. 이게 없으면 step2를 찾을수 없다는 에러..ㅠㅠ
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class MasterTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
}
MasterPartitioner : 어떻게 파티션할것인가.
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Value;
import java.util.HashMap;
import java.util.Map;
public class MasterPartitioner implements Partitioner {
public static long DEFAULT_TEST_VALUE_SIZE = 10L;
private long testValueSize = DEFAULT_TEST_VALUE_SIZE;
public MasterPartitioner(long testValueSize) {
this.testValueSize = testValueSize;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
long targetSize = testValueSize / gridSize;
Map<String, ExecutionContext> result = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + i, value);
value.putLong("minValue", i * targetSize);
value.putLong("maxValue", (i + 1) * targetSize -1);
}
return result;
}
}
Slave 코드
패키지 구조
- src/main/java
- kr.co.kware.batch.test.slave
- SlaveApplication.java (마스터와 상동, xml 파일명을 integration-slave.xml 변경)
- kr.co.kware.batch.test.slave.model
- kr.co.kware.batch.test.slave.config
- SlaveBatchConfiguration.java
- kr.co.kware.batch.test.slave.processor
- kr.co.kware.batch.test.slave.tasklet
- src/main/resources
- application.properties (마스터와 상동)
- PersonData.txt
- integration-slave.xml
integration-slave.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd">
<int:channel id="requestChannel"/>
<int-jms:message-driven-channel-adapter connection-factory="jmsConnectionFactory"
destination-name="requestsQueue"
channel="requestChannel"/>
<int:channel id="replyChannel"/>
<int-jms:outbound-channel-adapter connection-factory="jmsConnectionFactory"
destination-name="replyQueue"
channel="replyChannel"/>
<int:service-activator input-channel="requestChannel"
output-channel="replyChannel"
ref="remoteStepExecutionRequestHandler"/>
</beans>
# SlaveBachConfiguration
import javax.sql.DataSource;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
import org.springframework.batch.integration.partition.BeanFactoryStepLocator;
import org.springframework.batch.integration.partition.StepExecutionRequestHandler;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import srping.boot.batch.mysql.test.model.Person;
import srping.boot.batch.mysql.test.processor.PersonItemProcessor;
@Configuration
@EnableBatchProcessing
public class SlaveBatchConfiguration {
private static final String OVERRIDDEN_BY_EXPRESSION = null;
@Autowired
private DataSource dataSource;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private BeanFactory beanFactory;
@Autowired
private ResourcePatternResolver resourcePatternResolver;
@Autowired
private JobBuilderFactory jobBuilders;
@Autowired
private StepBuilderFactory stepBuilders;
@Autowired
private JpaVendorAdapter JpaVendorAdapter;
@Autowired
LocalContainerEntityManagerFactoryBean entityManageFactory;
@Bean
@StepScope
public FlatFileItemReader<Person> reader(){
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
reader.setResource(new ClassPathResource("PersonData.txt"));
reader.setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] {"firstName","lastName","year"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
System.out.println(">>> reader" );
return reader;
}
@Bean
public ItemProcessor<Person, Person> processor() {
System.out.println(">>> processor");
return new PersonItemProcessor();
}
@Bean
public ItemWriter<Person> writer() {
JpaItemWriter<Person> itemWriter = new JpaItemWriter<Person>();
itemWriter.setEntityManagerFactory(this.entityManageFactory.getObject());
return itemWriter;
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Person, Person> chunk(10)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
public StepExecutionRequestHandler remoteStepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
beanFactoryStepLocator.setBeanFactory(this.beanFactory);
stepExecutionRequestHandler.setStepLocator(beanFactoryStepLocator);
JobExplorer jobExplorer = null;
try {
JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
jobExplorerFactoryBean.setDataSource(this.dataSource);
jobExplorerFactoryBean.afterPropertiesSet();
jobExplorer = jobExplorerFactoryBean.getObject();
} catch (Exception e) {
e.printStackTrace();
}
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
return stepExecutionRequestHandler;
}
}
PersonItemProcessor
import org.springframework.batch.item.ItemProcessor;
import srping.boot.batch.mysql.test.model.Person;
public class PersonItemProcessor implements ItemProcessor<Person, Person>{
@Override
public Person process(final Person item) throws Exception {
return item;
}
}
SlvaeTasklet.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class TestSlaveTasklet implements Tasklet {
private static final Logger LOG = LoggerFactory.getLogger(TestSlaveTasklet.class);
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
LOG.info("step execution context : " + chunkContext.getStepContext().getStepExecutionContext());
return RepeatStatus.FINISHED;
}
}
Hi Sooyoung Park!
답글삭제It would be kind of you if you make this project on github project.
So we can see your project.
Thanks.