大前端

前端学习之家-大前端

配置DB Job Repository 项目举例(三)

一、项目创建步骤

1.项目结构

BatchMain.java:

package com.xj.demo2;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * @Author : xjfu
 * @Date : 2021/11/04 20:01
 * @Description :demo2的启动主类
 */
public class BatchMain {
    public static void main(String[] args) {

        ApplicationContext context = new ClassPathXmlApplicationContext("demo2/job/demo2-job.xml");
        //Spring Batch的作业启动器,
        JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
        //在batch.xml中配置的一个作业
        Job job  = (Job)context.getBean("billJob");

        try{
            //开始执行这个作业,获得处理结果(要运行的job,job参数对象)
            JobExecution result = launcher.run(job, new JobParameters());
            System.out.println(result.toString());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

CreditBill.java:

package com.xj.demo2;

/**
 * @Author : xjfu
 * @Date : 2021/11/04 19:27
 * @Description :demo2的CreditBill
 */
public class CreditBill {
    //银行卡账户ID
    private String accountID = "";
    //持卡人姓名
    private String name = "";
    //消费金额
    private double amount = 0;
    //消费日期
    private String date = "";
    //消费场所
    private String address = "";

    public String getAccountID() {
        return accountID;
    }

    public void setAccountID(String accountID) {
        this.accountID = accountID;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public double getAmount() {
        return amount;
    }

    public void setAmount(double amount) {
        this.amount = amount;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return this.accountID + "," + this.name + "," + this.amount + "," + this.date + "," + this.address;
    }
}

CreditBillProcessor:

package com.xj.demo2;

import org.springframework.batch.item.ItemProcessor;

/**
 * @Author : xjfu
 * @Date : 2021/11/04 19:29
 * @Description :demo2的处理类
 */
public class CreditBillProcessor implements ItemProcessor<CreditBill, CreditBill> {
    @Override
    public CreditBill process(CreditBill bill) throws Exception {

        System.out.println(bill.toString());
        //做一些简单的处理
        bill.setAccountID(bill.getAccountID() + "1");
        bill.setName(bill.getName() + "2");
        bill.setAmount(bill.getAmount() + 3);
        bill.setDate(bill.getDate() + "4");
        bill.setAddress(bill.getAddress() + 5);

        return bill;
    }
}

demo2-inputFile.csv:

4101231234656,tom,100.00,2013-12-31 12:00:08,Lu lit
4101236543210,tom,120.00,2013-12-31 12:00:08,Lu Zui

demo2-job.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

    <!--导入文件-->
    <import resource="classpath:demo2/job/demo2-jobContext.xml"/>

    <!--定义名字为billJob的作业-->
    <batch:job id="billJob">
        <!--定义名字为billStep的作业步-->
        <batch:step id="billStep">
            <batch:tasklet transaction-manager="transactionManager">
                <!--定义读、处理、写操作,规定每处理两条数据,进行一次写入操作,这样可以提高写的效率-->
                <batch:chunk reader="csvItemReader" processor="creditBillProcessor" writer="csvItemWriter"   commit-interval="2">
                </batch:chunk>
            </batch:tasklet>
        </batch:step>
    </batch:job>
</beans>

demo2-jobContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

    <!--
         data-source:定义数据源,默认dataSource
         transaction-manager:定义事务管理器
         isolation-level-for-create:定义创建Job Execution时候的事务隔离级别,避免多个Job Execution执行一个Job Instance,默认SERIALIZABLE
         table_prefix:定义使用的数据库表的前缀为BATCH_,默认BATCH_
         max-varchar-length:定义varchar的最大长度为1000,默认值为2500
    -->
    <batch:job-repository
            id="jobRepository"
            data-source="dataSource"
            transaction-manager="transactionManager2"
            isolation-level-for-create="SERIALIZABLE"
            table-prefix="BATCH_"
            max-varchar-length="1000"/>

    <!--数据库的事务管理器-->
    <bean id="transactionManager2" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

    <!--数据源-->
    <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName">
            <value>com.mysql.jdbc.Driver</value>
        </property>
        <property name="url">
            <value>jdbc:mysql://127.0.0.1:3306/spring_batch_demo2</value>
        </property>
        <property name="username" value="root"></property>
        <property name="password" value="12345"></property>
    </bean>

    <!--定义作业调度器,用来启动job-->
    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <!--注入jobRepository-->
        <property name="jobRepository" ref="jobRepository"/>
    </bean>

    <!--定义事务管理器,用于Spring Batch框架中对数据操作提供事务能力-->
    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

    <!--读取信用卡账单文件,CSV 格式-->
    <!--使用FlatFileItemReader读文本文件-->
    <bean id="csvItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <!--指定读取的资源文件-->
        <property name="resource" value="classpath:demo2/data/demo2-inputFile.csv"/>
        <!--通过lineMapper把文本中的一行转换为领域对象creditBill-->
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <!--lineTokenizer定义文本中每行的分隔符号-->
                <property name="lineTokenizer" ref="lineTokenizer"/>
                <!--fieldSetMapper定义了转换结果映射,即具体映射到哪个Java类对象-->
                <property name="fieldSetMapper">
                    <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <property name="prototypeBeanName" value="creditBill"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>

    <!--lineTokenizer-->
    <bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
        <!--按","符号对行进行切割-->
        <property name="delimiter" value=","/>
        <!--属性名称列表,将切割后的行按顺序投入-->
        <property name="names">
            <list>
                <value>accountID</value>
                <value>name</value>
                <value>amount</value>
                <value>date</value>
                <value>address</value>
            </list>
        </property>
    </bean>

    <!--注入实体类-->
    <bean id="creditBill" class="com.xj.demo2.CreditBill" scope="prototype"></bean>

    <!--数据处理类-->
    <bean id="creditBillProcessor" class="com.xj.demo2.CreditBillProcessor" scope="step"></bean>

    <!--写信用卡账单文件,CSV格式-->
    <bean id="csvItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
        <!--要写入的文件位置,因为[classpath:]不是一个具体的目录,这里应当用[file:](从项目根目录开始)指明输出位置-->
        <property name="resource" value="file:src/main/resources/demo2/data/demo2-outputFile.csv"/>
        <!--[lineAggregator成员]指明行聚合器,用来将对象输出到文件时构造文件中的每行的格式-->
        <property name="lineAggregator">
            <!--这里使用Spring Batch自带的DelimitedLineAggregator来作为行聚合器(可以拼接一个个属性形成行)-->
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <!--使用","拼接-->
                <property name="delimiter" value=","/>
                <!--fieldExtractor成员用来将Java类的属性组成的数组拼接成行字符串-->
                <property name="fieldExtractor">
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                        <property name="names" value="accountID,name,amount,date,address">
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

二、配套数据库建立

1.寻找对应的数据库脚本语句

使用数据库的仓库时,需要首先根据Spring Batch框架提供的数据脚本完成数据库的初始化,数据库脚本的位置放在:

spring-batch-core\3.0.7.RELEASE\spring-batch-core-3.0.7.RELEASE.jar\org\springframework\batch\core\

Spring Batch框架JobRepository支持如下的数据库:DB2、Derby、H2、HSQLDB、MySQL、Oracle、PostgreSQL、SQLServer、Sybase。

因为我用的是MySQL数据库,所以我选择:schema-mysql.sql

 schema-mysql.sql:

-- Autogenerated: do not edit this file

CREATE TABLE BATCH_JOB_INSTANCE  (
	JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT ,
	JOB_NAME VARCHAR(100) NOT NULL,
	JOB_KEY VARCHAR(32) NOT NULL,
	constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION  (
	JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT  ,
	JOB_INSTANCE_ID BIGINT NOT NULL,
	CREATE_TIME DATETIME NOT NULL,
	START_TIME DATETIME DEFAULT NULL ,
	END_TIME DATETIME DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED DATETIME,
	JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
	constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
	references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
	JOB_EXECUTION_ID BIGINT NOT NULL ,
	TYPE_CD VARCHAR(6) NOT NULL ,
	KEY_NAME VARCHAR(100) NOT NULL ,
	STRING_VAL VARCHAR(250) ,
	DATE_VAL DATETIME DEFAULT NULL ,
	LONG_VAL BIGINT ,
	DOUBLE_VAL DOUBLE PRECISION ,
	IDENTIFYING CHAR(1) NOT NULL ,
	constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION  (
	STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
	VERSION BIGINT NOT NULL,
	STEP_NAME VARCHAR(100) NOT NULL,
	JOB_EXECUTION_ID BIGINT NOT NULL,
	START_TIME DATETIME NOT NULL ,
	END_TIME DATETIME DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	COMMIT_COUNT BIGINT ,
	READ_COUNT BIGINT ,
	FILTER_COUNT BIGINT ,
	WRITE_COUNT BIGINT ,
	READ_SKIP_COUNT BIGINT ,
	WRITE_SKIP_COUNT BIGINT ,
	PROCESS_SKIP_COUNT BIGINT ,
	ROLLBACK_COUNT BIGINT ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED DATETIME,
	constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
	STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT TEXT ,
	constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
	references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
	JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT TEXT ,
	constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
	ID BIGINT NOT NULL,
	UNIQUE_KEY CHAR(1) NOT NULL,
	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);

CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
	ID BIGINT NOT NULL,
	UNIQUE_KEY CHAR(1) NOT NULL,
	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);

CREATE TABLE BATCH_JOB_SEQ (
	ID BIGINT NOT NULL,
	UNIQUE_KEY CHAR(1) NOT NULL,
	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);

2.创建数据库,运行schema-mysql.sql中的脚本

 

3.各个表的关系

三、配套数据库建立

1.运行结果:

控制台:

demo2-outputFile.csv:

batch_job_instance:

batch_job_execution:

batch_step_execution:

 

 

四、一些问题总结

1.第二次调用无法写入数据的原因和解决办法?

2.各个数据表之间的关系总结?

3.删除数据库的脚本备份?

 

发表评论:

Copyright Your WebSite.Some Rights Reserved.