博客
关于我
ElasicJob分布式定时任务
阅读量:305 次
发布时间:2019-03-03

本文共 6125 字,大约阅读时间需要 20 分钟。

Elastic Job 定时任务配置与实现

一、配置Zookeeper注册中心

首先,需要配置Zookeeper作为分布式协调服务注册中心。下载相应的Zookeeper包并解压后,运行bin目录下的start命令即可启动Zookeeper服务。完成后,可通过ZooKeeper客户端工具(如zooinspector)连接到注册中心,完成后续配置。

二、添加Maven依赖

在项目的POM文件中添加以下依赖,确保能够正确引用Elastic Job Lite相关组件:

com.dangdang
elastic-job-lite-core
2.1.5

三、编写定时任务业务逻辑

创建定时任务处理类ArchivieJob,实现业务逻辑处理。以下是核心代码逻辑:

import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.api.simple.SimpleJob;import java.util.List;import java.util.Map;public class ArchivieJob implements SimpleJob {    @Override    public void execute(ShardingContext shardingContext) {        int shardingItem = shardingContext.getShardingItem();        String shardingParameter = shardingContext.getShardingParameter();        // 从resume表中查询未归档记录        List
> maps = JdbcUtil.executeQuery( "SELECT * FROM resume WHERE state = '未归档' AND education = '" + shardingParameter + "' LIMIT 1;" ); if (maps == null) { System.out.println("数据处理完毕!!!!"); return; } // 更新记录状态为已归档 Map
stringObjectMap = maps.get(0); Object id = stringObjectMap.get("id"); Object name = stringObjectMap.get("name"); Object education = stringObjectMap.get("education"); System.out.println("=====>>>>>" + id + "," + name + "," + education); // 插入到resume_bak表中 JdbcUtil.executeUpdate( "UPDATE resume SET state = '已归档' WHERE id = ?", id ); JdbcUtil.executeUpdate( "INSERT INTO resume_bak SELECT * FROM resume WHERE id = ?", id ); }}

四、启动定时任务调度

ElasicJobMain类中配置任务调度:

import com.dangdang.ddframe.job.config.JobCoreConfiguration;import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;import com.dangdang.ddframe.job.lite.api.JobScheduler;import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;public class ElasicJobMain {    public static void main(String[] args) {        // 配置Zookeeper注册中心        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("192.168.1.6:2181", "data-archive-job");        ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);        zookeeperRegistryCenter.init();        // 配置任务        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job", "*/2 * * * * ?", 3)            .shardingItemParameters("0=doctor,1=bachelor,2=master")            .build();        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, ArchivieJob.class.getName());        JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build());        jobScheduler.init();    }}

五、JDBC辅助工具类

提供数据库操作功能,包含连接数据库、执行查询和更新操作等功能:

import java.sql.*;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;public class JdbcUtil {    private static String url = "jdbc:mysql://localhost:3306/bank?characterEncoding=utf8&useSSL=false";    private static String user = "root";    private static String password = "123456";    private static String driver = "com.mysql.jdbc.Driver";    static {        try {            Class.forName(driver);        } catch (ClassNotFoundException e) {            e.printStackTrace();        }    }    public static Connection getConnection() {        try {            return DriverManager.getConnection(url, user, password);        } catch (SQLException e) {            e.printStackTrace();        }        return null;    }    public static void close(ResultSet rs, PreparedStatement ps, Connection con) {        if (rs != null) {            try {                rs.close();            } catch (SQLException e) {                e.printStackTrace();            } finally {                if (ps != null) {                    try {                        ps.close();                    } catch (SQLException e) {                        e.printStackTrace();                    } finally {                        if (con != null) {                            try {                                con.close();                            } catch (SQLException e) {                                e.printStackTrace();                            }                        }                    }                }            }        }    }    public static void executeUpdate(String sql, Object... obj) {        Connection con = getConnection();        PreparedStatement ps = null;        try {            ps = con.prepareStatement(sql);            for (int i = 0; i < obj.length; i++) {                ps.setObject(i + 1, obj[i]);            }            ps.executeUpdate();        } catch (SQLException e) {            e.printStackTrace();        } finally {            close(null, ps, con);        }    }    public static List
> executeQuery(String sql, Object... obj) { Connection con = getConnection(); ResultSet rs = null; PreparedStatement ps = null; try { ps = con.prepareStatement(sql); for (int i = 0; i < obj.length; i++) { ps.setObject(i + 1, obj[i]); } rs = ps.executeQuery(); List
> list = new ArrayList<>(); int count = rs.getMetaData().getColumnCount(); while (rs.next()) { Map
map = new HashMap<>(); for (int i = 0; i < count; i++) { Object ob = rs.getObject(i + 1); String key = rs.getMetaData().getColumnName(i + 1); map.put(key, ob); } list.add(map); } return list; } catch (SQLException e) { e.printStackTrace(); } finally { close(rs, ps, con); } return null; }}

转载地址:http://tduq.baihongyu.com/

你可能感兴趣的文章
Netpas:不一样的SD-WAN+ 保障网络通讯品质
查看>>
netty底层源码探究:启动流程;EventLoop中的selector、线程、任务队列;监听处理accept、read事件流程;
查看>>
Netty核心模块组件
查看>>
Netty源码—4.客户端接入流程一
查看>>
Netty源码—7.ByteBuf原理四
查看>>
Nginx 学习总结(17)—— 8 个免费开源 Nginx 管理系统,轻松管理 Nginx 站点配置
查看>>
Objective-C实现BellmanFord贝尔曼-福特算法(附完整源码)
查看>>
Objective-C实现binary exponentiation二进制幂运算算法(附完整源码)
查看>>
Objective-C实现hardy ramanujana定理算法(附完整源码)
查看>>
Objective-C实现insertion sort插入排序算法(附完整源码)
查看>>
Objective-C实现Interpolation search插值查找算法(附完整源码)
查看>>
Objective-C实现k-nearest算法(附完整源码)
查看>>
Objective-C实现KPCA(附完整源码)
查看>>
Objective-C实现max subarray sum最大子数组和算法(附完整源码)
查看>>
Objective-C实现md5算法(附完整源码)
查看>>
Objective-C实现miller rabin米勒-拉宾素性检验算法(附完整源码)
查看>>
Objective-C实现not gate非门算法(附完整源码)
查看>>
Objective-C实现perfect cube完全立方数算法(附完整源码)
查看>>
Objective-C实现segment tree段树算法(附完整源码)
查看>>
Objective-C实现SinglyLinkedList单链表算法(附完整源码)
查看>>