博客
关于我
ElasicJob分布式定时任务
阅读量:305 次
发布时间:2019-03-03

本文共 6270 字,大约阅读时间需要 20 分钟。

Elastic Job 定时任务配置与实现

一、配置Zookeeper注册中心

首先,需要配置Zookeeper作为分布式协调服务注册中心。下载相应的Zookeeper包并解压后,运行bin目录下的start命令即可启动Zookeeper服务。完成后,可通过ZooKeeper客户端工具(如zooinspector)连接到注册中心,完成后续配置。

二、添加Maven依赖

在项目的POM文件中添加以下依赖,确保能够正确引用Elastic Job Lite相关组件:

com.dangdang
elastic-job-lite-core
2.1.5

三、编写定时任务业务逻辑

创建定时任务处理类ArchivieJob,实现业务逻辑处理。以下是核心代码逻辑:

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import java.util.List;
import java.util.Map;
public class ArchivieJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int shardingItem = shardingContext.getShardingItem();
String shardingParameter = shardingContext.getShardingParameter();
// 从resume表中查询未归档记录
List
> maps = JdbcUtil.executeQuery(
"SELECT * FROM resume WHERE state = '未归档' AND education = '" + shardingParameter + "' LIMIT 1;"
);
if (maps == null) {
System.out.println("数据处理完毕!!!!");
return;
}
// 更新记录状态为已归档
Map
stringObjectMap = maps.get(0);
Object id = stringObjectMap.get("id");
Object name = stringObjectMap.get("name");
Object education = stringObjectMap.get("education");
System.out.println("=====>>>>>" + id + "," + name + "," + education);
// 插入到resume_bak表中
JdbcUtil.executeUpdate(
"UPDATE resume SET state = '已归档' WHERE id = ?",
id
);
JdbcUtil.executeUpdate(
"INSERT INTO resume_bak SELECT * FROM resume WHERE id = ?",
id
);
}
}

四、启动定时任务调度

ElasicJobMain类中配置任务调度:

import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
public class ElasicJobMain {
public static void main(String[] args) {
// 配置Zookeeper注册中心
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("192.168.1.6:2181", "data-archive-job");
ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
zookeeperRegistryCenter.init();
// 配置任务
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job", "*/2 * * * * ?", 3)
.shardingItemParameters("0=doctor,1=bachelor,2=master")
.build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, ArchivieJob.class.getName());
JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build());
jobScheduler.init();
}
}

五、JDBC辅助工具类

提供数据库操作功能,包含连接数据库、执行查询和更新操作等功能:

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class JdbcUtil {
private static String url = "jdbc:mysql://localhost:3306/bank?characterEncoding=utf8&useSSL=false";
private static String user = "root";
private static String password = "123456";
private static String driver = "com.mysql.jdbc.Driver";
static {
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static Connection getConnection() {
try {
return DriverManager.getConnection(url, user, password);
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
public static void close(ResultSet rs, PreparedStatement ps, Connection con) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
}
}
public static void executeUpdate(String sql, Object... obj) {
Connection con = getConnection();
PreparedStatement ps = null;
try {
ps = con.prepareStatement(sql);
for (int i = 0; i < obj.length; i++) {
ps.setObject(i + 1, obj[i]);
}
ps.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
} finally {
close(null, ps, con);
}
}
public static List
> executeQuery(String sql, Object... obj) {
Connection con = getConnection();
ResultSet rs = null;
PreparedStatement ps = null;
try {
ps = con.prepareStatement(sql);
for (int i = 0; i < obj.length; i++) {
ps.setObject(i + 1, obj[i]);
}
rs = ps.executeQuery();
List
> list = new ArrayList<>();
int count = rs.getMetaData().getColumnCount();
while (rs.next()) {
Map
map = new HashMap<>();
for (int i = 0; i < count; i++) {
Object ob = rs.getObject(i + 1);
String key = rs.getMetaData().getColumnName(i + 1);
map.put(key, ob);
}
list.add(map);
}
return list;
} catch (SQLException e) {
e.printStackTrace();
} finally {
close(rs, ps, con);
}
return null;
}
}

转载地址:http://tduq.baihongyu.com/

你可能感兴趣的文章
NIO Selector实现原理
查看>>
nio 中channel和buffer的基本使用
查看>>
NIO基于UDP协议的网络编程
查看>>
NISP一级,NISP二级报考说明,零基础入门到精通,收藏这篇就够了
查看>>
Nitrux 3.8 发布!性能全面提升,带来非凡体验
查看>>
NI笔试——大数加法
查看>>
NLog 自定义字段 写入 oracle
查看>>
NLP 基于kashgari和BERT实现中文命名实体识别(NER)
查看>>
NLP 项目:维基百科文章爬虫和分类【01】 - 语料库阅读器
查看>>
NLP_什么是统计语言模型_条件概率的链式法则_n元统计语言模型_马尔科夫链_数据稀疏(出现了词库中没有的词)_统计语言模型的平滑策略---人工智能工作笔记0035
查看>>
NLP学习笔记:使用 Python 进行NLTK
查看>>
NLP问答系统:使用 Deepset SQUAD 和 SQuAD v2 度量评估
查看>>
NLP:使用 SciKit Learn 的文本矢量化方法
查看>>
Nmap扫描教程之Nmap基础知识
查看>>
Nmap端口扫描工具Windows安装和命令大全(非常详细)零基础入门到精通,收藏这篇就够了
查看>>
NMAP网络扫描工具的安装与使用
查看>>
NMF(非负矩阵分解)
查看>>
NN&DL4.1 Deep L-layer neural network简介
查看>>
NN&DL4.3 Getting your matrix dimensions right
查看>>
NN&DL4.8 What does this have to do with the brain?
查看>>