UpdateDataSyncJob.java
6.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package com.idss.vulsync.schedule;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import com.idss.vulsync.constants.CommonConstant;
import com.idss.vulsync.entity.AssetReportVO;
import com.idss.vulsync.mvc.mapper.ConsultVulmanageSyncEntityMapper;
import com.idss.vulsync.mvc.pojo.db.ConsultVulmanageSyncEntity;
import com.idss.vulsync.mvc.service.CommonService;
import com.idss.vulsync.mvc.service.ConsultVulmanageSyncService;
import com.idss.vulsync.mvc.service.EsService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.QueryBuilders;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.stream.Collectors;
/**
* @Author: zc
* @Date: 2024/8/16 16:58
* 漏管增量数据同步
*/
@Service
@Slf4j
public class UpdateDataSyncJob implements Job {
@Autowired(required = false)
@Qualifier("QuartzScheduler")
public Scheduler scheduler;
@Autowired
EsService esService;
@Autowired
ConsultVulmanageSyncEntityMapper consultVulmanageSyncEntityMapper;
@Autowired
ConsultVulmanageSyncService consultVulmanageSyncService;
@Autowired
CommonService commonService;
@Autowired
private RedisTemplate redisTemplate;
@Override
public void execute(JobExecutionContext jobExecutionContext) {
log.info("quartz启动" + this.getClass().getSimpleName());
this.run(jobExecutionContext);
}
public void run(JobExecutionContext jobExecutionContext) {
log.info("开始同步增量数据--------");
Date now = new Date();
//初始同步任务同步部侧全量指令数据DataGxb为初始漏管数据latestEsId至漏管
//每次上报部侧数据后更新部侧数据DataGxb ???是否只处理部侧全量指令数据的情况
//增量同步任务比对DataGxb与latestEsId,然后更新DataVul并同步至漏管
//获取最近一次上报部侧的全量数据文件 作为DataGxb
List<AssetReportVO> gxbDataList = consultVulmanageSyncService.getLatestDataGxb();
ConsultVulmanageSyncEntity syncEntity = consultVulmanageSyncEntityMapper.selectByPrimaryKey(CommonConstant.CONSULT_UNIQUE_ID);
String latestEsId = syncEntity.getLatestEsId();
List<AssetReportVO> vulDataList = esService.selectList(latestEsId, QueryBuilders.boolQuery());
log.info("增量同步任务,漏管快照数据{}条,上报部侧数据{}条", vulDataList.size(), gxbDataList.size());
List<AssetReportVO> removeList = new ArrayList<>();
//漏管DataVul为存量数据,最近一次部侧全量数据为本次需要比对的数据
Set<String> vulDataIds = vulDataList.stream().map(AssetReportVO::getAssetId).collect(Collectors.toSet());
Set<String> gxbDataIds = gxbDataList.stream().map(AssetReportVO::getAssetId).collect(Collectors.toSet());
Set<String> removeIds = new HashSet<>();
for (String vulDataId : vulDataIds) {
if (!gxbDataIds.contains(vulDataId)) {
removeIds.add(vulDataId);
AssetReportVO remove = new AssetReportVO();
remove.setAssetId(vulDataId);
remove.setState("3");
removeList.add(remove);
}
}
//部侧数据中废弃资产 认为0下线 2停用 3删除均为废弃资产, state=1为新增和变更资产
Set<AssetReportVO> gxbRemoveIds = gxbDataList.stream().filter(s -> !"1".equals(s.getState())).collect(Collectors.toSet());
for (AssetReportVO reportVO : gxbRemoveIds) {
AssetReportVO remove = new AssetReportVO();
remove.setAssetId(reportVO.getAssetId());
remove.setState(reportVO.getState());
removeList.add(remove);
}
log.info("本次资产废弃或删除{}条", removeList.size());
//1资产新增+属性变更
List<AssetReportVO> addList = gxbDataList.stream().filter(s -> "1".equals(s.getState())).collect(Collectors.toList());
log.info("本次资产新增和属性变更共{}条", addList.size());
//待更新数据
List<AssetReportVO> syncDataList = new ArrayList<>();
syncDataList.addAll(addList);
syncDataList.addAll(removeList);
try {
//latestEsId删除旧数据,插入新增资产和变更资产, *废弃资产不入库
esService.delete(latestEsId);
if (ObjectUtil.isNotEmpty(addList)) {
esService.addBatch(addList, latestEsId);
log.error("增量更新插入es{}条", addList.size());
}
} catch (Exception e) {
log.error("增量更新插入es异常", e);
}
commonService.sync(syncDataList, syncEntity, now, "UPD");
try {
JobKey jobKey=new JobKey(UpdateDataSyncJob.class.getSimpleName(), UpdateDataSyncJob.class.getSimpleName());
scheduler.deleteJob(jobKey);
}catch (Exception e){
e.printStackTrace();
}
try {
//新增增量更新同步任务
String updCycle = syncEntity.getUpdCycle();
String numStr = updCycle.substring(0, 2);
String unit = updCycle.substring(2, updCycle.length());
Integer num = numStr.startsWith("0") ? Integer.parseInt(numStr.substring(1, numStr.length())) : Integer.parseInt(numStr);
JobDetail jobDetail = JobBuilder.newJob(UpdateDataSyncJob.class).
withIdentity(UpdateDataSyncJob.class.getSimpleName(), UpdateDataSyncJob.class.getSimpleName())
.build();
DateTime executionTime = new DateTime();
if (unit.equals("天")) {
executionTime = DateUtil.offsetDay(now, num);
} else if (unit.equals("时")) {
executionTime = DateUtil.offsetHour(now, num);
} else {
executionTime = new DateTime(now);
}
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(UpdateDataSyncJob.class.getSimpleName(), UpdateDataSyncJob.class.getSimpleName())
.startAt(executionTime).build();
scheduler.scheduleJob(jobDetail, trigger);
log.error("新增增量更新同步任务,下次执行时间{}", DateUtil.formatDateTime(executionTime));
} catch (Exception e) {
log.error("同步增量数据异常", e);
}
log.info("结束同步增量数据--------");
}
}