博客
关于我
ElasicJob分布式定时任务
阅读量:305 次
发布时间:2019-03-03

本文共 6125 字,大约阅读时间需要 20 分钟。

Elastic Job 定时任务配置与实现

一、配置Zookeeper注册中心

首先,需要配置Zookeeper作为分布式协调服务注册中心。下载相应的Zookeeper包并解压后,运行bin目录下的start命令即可启动Zookeeper服务。完成后,可通过ZooKeeper客户端工具(如zooinspector)连接到注册中心,完成后续配置。

二、添加Maven依赖

在项目的POM文件中添加以下依赖,确保能够正确引用Elastic Job Lite相关组件:

com.dangdang
elastic-job-lite-core
2.1.5

三、编写定时任务业务逻辑

创建定时任务处理类ArchivieJob,实现业务逻辑处理。以下是核心代码逻辑:

import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.api.simple.SimpleJob;import java.util.List;import java.util.Map;public class ArchivieJob implements SimpleJob {    @Override    public void execute(ShardingContext shardingContext) {        int shardingItem = shardingContext.getShardingItem();        String shardingParameter = shardingContext.getShardingParameter();        // 从resume表中查询未归档记录        List
> maps = JdbcUtil.executeQuery( "SELECT * FROM resume WHERE state = '未归档' AND education = '" + shardingParameter + "' LIMIT 1;" ); if (maps == null) { System.out.println("数据处理完毕!!!!"); return; } // 更新记录状态为已归档 Map
stringObjectMap = maps.get(0); Object id = stringObjectMap.get("id"); Object name = stringObjectMap.get("name"); Object education = stringObjectMap.get("education"); System.out.println("=====>>>>>" + id + "," + name + "," + education); // 插入到resume_bak表中 JdbcUtil.executeUpdate( "UPDATE resume SET state = '已归档' WHERE id = ?", id ); JdbcUtil.executeUpdate( "INSERT INTO resume_bak SELECT * FROM resume WHERE id = ?", id ); }}

四、启动定时任务调度

ElasicJobMain类中配置任务调度:

import com.dangdang.ddframe.job.config.JobCoreConfiguration;import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;import com.dangdang.ddframe.job.lite.api.JobScheduler;import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;public class ElasicJobMain {    public static void main(String[] args) {        // 配置Zookeeper注册中心        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("192.168.1.6:2181", "data-archive-job");        ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);        zookeeperRegistryCenter.init();        // 配置任务        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job", "*/2 * * * * ?", 3)            .shardingItemParameters("0=doctor,1=bachelor,2=master")            .build();        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, ArchivieJob.class.getName());        JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build());        jobScheduler.init();    }}

五、JDBC辅助工具类

提供数据库操作功能,包含连接数据库、执行查询和更新操作等功能:

import java.sql.*;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;public class JdbcUtil {    private static String url = "jdbc:mysql://localhost:3306/bank?characterEncoding=utf8&useSSL=false";    private static String user = "root";    private static String password = "123456";    private static String driver = "com.mysql.jdbc.Driver";    static {        try {            Class.forName(driver);        } catch (ClassNotFoundException e) {            e.printStackTrace();        }    }    public static Connection getConnection() {        try {            return DriverManager.getConnection(url, user, password);        } catch (SQLException e) {            e.printStackTrace();        }        return null;    }    public static void close(ResultSet rs, PreparedStatement ps, Connection con) {        if (rs != null) {            try {                rs.close();            } catch (SQLException e) {                e.printStackTrace();            } finally {                if (ps != null) {                    try {                        ps.close();                    } catch (SQLException e) {                        e.printStackTrace();                    } finally {                        if (con != null) {                            try {                                con.close();                            } catch (SQLException e) {                                e.printStackTrace();                            }                        }                    }                }            }        }    }    public static void executeUpdate(String sql, Object... obj) {        Connection con = getConnection();        PreparedStatement ps = null;        try {            ps = con.prepareStatement(sql);            for (int i = 0; i < obj.length; i++) {                ps.setObject(i + 1, obj[i]);            }            ps.executeUpdate();        } catch (SQLException e) {            e.printStackTrace();        } finally {            close(null, ps, con);        }    }    public static List
> executeQuery(String sql, Object... obj) { Connection con = getConnection(); ResultSet rs = null; PreparedStatement ps = null; try { ps = con.prepareStatement(sql); for (int i = 0; i < obj.length; i++) { ps.setObject(i + 1, obj[i]); } rs = ps.executeQuery(); List
> list = new ArrayList<>(); int count = rs.getMetaData().getColumnCount(); while (rs.next()) { Map
map = new HashMap<>(); for (int i = 0; i < count; i++) { Object ob = rs.getObject(i + 1); String key = rs.getMetaData().getColumnName(i + 1); map.put(key, ob); } list.add(map); } return list; } catch (SQLException e) { e.printStackTrace(); } finally { close(rs, ps, con); } return null; }}

转载地址:http://tduq.baihongyu.com/

你可能感兴趣的文章
oracle script
查看>>
Oracle select表要带双引号的原因
查看>>
Oracle SOA Suit Adapter
查看>>
Oracle Spatial GeoRaster 金字塔栅格存储
查看>>
Oracle spatial 周边查询SQL
查看>>
Oracle Spatial空间数据库建立
查看>>
UML— 活动图
查看>>
oracle sqlplus已停止工作,安装完成客户端后sqlplus报“段错误”
查看>>
oracle SQLserver 函数
查看>>
oracle sql分组(group,根据多个内容分组)在select之后from之前 再进行select查询,复杂子查询的使用
查看>>
Oracle Statspack分析报告详解(一)
查看>>
oracle tirger_在Oracle中,临时表和全局临时表有什么区别?
查看>>
Oracle Validated Configurations 安装使用 说明
查看>>
oracle where 条件的执行顺序分析1
查看>>
oracle 中的 CONCAT,substring ,MINUS 用法
查看>>
Oracle 中的 decode
查看>>
oracle 中表一对多取多方的最新的一条数据
查看>>
oracle 使用 PL/SQL Developer创建表并插入单条、多条数据
查看>>
oracle 使用leading, use_nl, rownum调优
查看>>
oracle 修改字段类型方法
查看>>