详细介绍:MyData 基于 Web API 的数据集成平台 v0.7.0
部署文档:[用 Docker 部署 MyData v0.7.1]( https://www.mydata.work/docs#/./docker/用 Docker 部署 MyData)
使用手册:MyData 使用手册 v0.7.1
交流 Q 群:430089673
MyData 的后端由 3 个子服务组成,分别是管理服务
、任务服务
、业务数据服务
;
依赖的组件:
下图从数据流角度 展示 3 个子服务的关联: 注:开源版本采用单体 SpringBoot ;
任务主要包括:项目环境、数据标准、应用 API 、任务类型、字段映射、任务周期;
提供数据
表示从应用 API 读取业务员数据、消费数据
表示向应用 API 发送业务数据;数据集成的任务执行流程如下图:
任务服务启动时(即 MyData 系统启动),查询所有运行状态的任务;
public class JobExecutor implements ApplicationRunner {
...
@Override
public void run(ApplicationArguments args) {
// 移除已有缓存
jobCache.removeAll();
// 查询已启动的任务
List<Task> tasks = taskService.listRunningTasks();
log.info("tasks.size() = " + tasks.size());
if (CollUtil.isNotEmpty(tasks)) {
tasks.forEach(this::startTask);
}
}
...
}
根据任务的 cron 表达式,计算任务的下次执行时间;
/**
* 根据 任务的上次执行时间 和 设定间隔规则,计算任务的 下次执行时间
*
* @param taskInfo 定时任务
*/
private void calculateNextRunTime(TaskInfo taskInfo) {
Assert.notNull(taskInfo);
Assert.notEmpty(taskInfo.getTaskPeriod());
Date date = taskInfo.getStartTime();
if (taskInfo.getFailCount() > 0) {
date = taskInfo.getNextRunTime();
}
CronExpression cronExpression = new CronExpression(taskInfo.getTaskPeriod());
Date nextRunTime = cronExpression.getNextValidTimeAfter(date);
taskInfo.setNextRunTime(nextRunTime);
}
计算任务的下次执行时间 与 当前时间的时间差,以时间差作为缓存失效期 将任务存入 redis 缓存;
/**
* 缓存任务
*
* @param taskInfo 任务对象
* @throws IllegalArgumentException 缓存时长无效
*/
public void cacheJob(TaskInfo taskInfo) throws IllegalArgumentException {
// 计算任务缓存有效时长
long expire = DateUtil.between(taskInfo.getStartTime(), taskInfo.getNextRunTime(), DateUnit.SECOND);
if (expire <= 0) {
throw new IllegalArgumentException(StrUtil.format("expire <= 0, startTime = {}, nextRunTime = {}"
, DateUtil.format(taskInfo.getStartTime(), DatePattern.NORM_DATETIME_MS_PATTERN)
, DateUtil.format(taskInfo.getNextRunTime(), DatePattern.NORM_DATETIME_MS_PATTERN)));
}
redisUtil.set(CACHE_TASK + taskInfo.getId(), taskInfo);
redisUtil.set(CACHE_JOB + taskInfo.getId(), taskInfo.getId(), expire);
taskInfo.appendLog("任务存入 redis ,缓存时长 {} 秒", expire);
}
通过监听 redis 的 key 失效事件,获得待执行的任务;
public class RedisKeyExpiredListener implements MessageListener {
private final JobExecutor jobExecutor;
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
if (StrUtil.startWith(expiredKey, JobCache.CACHE_JOB)) {
String taskId = StrUtil.subSuf(expiredKey, JobCache.CACHE_JOB.length());
jobExecutor.notify(taskId);
}
}
}
将任务加入待执行的线程池,随后即可执行
/**
* 任务存入执行队列
*
* @param taskInfo 任务
*/
private void executeJob(TaskInfo taskInfo) {
taskInfo.appendLog("任务存入执行队列");
Runnable runnable = new JobThread(taskInfo);
getThreadPoolExecutor().execute(runnable);
}
根据任务类型分别执行提供数据
和消费数据
流程;
提供数据
case MdConstant.DATA_PRODUCER:
// 调用 api 获取 json
String json = ApiUtil.read(taskInfo);
// 将 json 按字段映射 解析为业务数据
jobDataService.parseData(taskInfo, json);
// 根据条件过滤数据
jobDataFilterService.doFilter(taskInfo);
// 保存业务数据
jobDataService.saveTaskData(taskInfo);
// 更新环境变量
jobVarService.saveVarValue(taskInfo, json);
break;
消费数据
case MdConstant.DATA_CONSUMER:
List<BizDataFilter> filters = taskInfo.getDataFilters();
if (CollUtil.isNotEmpty(filters)) {
// 解析过滤条件值中的 自定义字符串
parseFilterValue(filters);
// 排除值为 null 的条件
filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());
}
// 根据过滤条件 查询数据
String dataCode = taskInfo.getDataCode();
if (StrUtil.isNotEmpty(dataCode)) {
List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);
taskInfo.setConsumeDataList(dataList);
// 根据字段映射转换为 api 参数
jobDataService.convertData(taskInfo);
}
// 调用 api 传输数据
ApiUtil.write(taskInfo);
break;
保存任务执行日志;