UpdateDataSyncJob.java 6.62 KB
package com.idss.vulsync.schedule;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import com.idss.vulsync.constants.CommonConstant;
import com.idss.vulsync.entity.AssetReportVO;
import com.idss.vulsync.mvc.mapper.ConsultVulmanageSyncEntityMapper;
import com.idss.vulsync.mvc.pojo.db.ConsultVulmanageSyncEntity;
import com.idss.vulsync.mvc.service.CommonService;
import com.idss.vulsync.mvc.service.ConsultVulmanageSyncService;
import com.idss.vulsync.mvc.service.EsService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.QueryBuilders;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.stream.Collectors;

/**
 * @Author: zc
 * @Date: 2024/8/16 16:58
 * 漏管增量数据同步
 */
@Service
@Slf4j
public class UpdateDataSyncJob implements Job {
    @Autowired(required = false)
    @Qualifier("QuartzScheduler")
    public Scheduler scheduler;
    @Autowired
    EsService esService;
    @Autowired
    ConsultVulmanageSyncEntityMapper consultVulmanageSyncEntityMapper;
    @Autowired
    ConsultVulmanageSyncService consultVulmanageSyncService;
    @Autowired
    CommonService commonService;
    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void execute(JobExecutionContext jobExecutionContext) {
        log.info("quartz启动" + this.getClass().getSimpleName());
        this.run(jobExecutionContext);
    }


    public void run(JobExecutionContext jobExecutionContext) {
        log.info("开始同步增量数据--------");
        Date now = new Date();
        //初始同步任务同步部侧全量指令数据DataGxb为初始漏管数据latestEsId至漏管
        //每次上报部侧数据后更新部侧数据DataGxb ???是否只处理部侧全量指令数据的情况
        //增量同步任务比对DataGxb与latestEsId,然后更新DataVul并同步至漏管

        //获取最近一次上报部侧的全量数据文件 作为DataGxb
        List<AssetReportVO> gxbDataList = consultVulmanageSyncService.getLatestDataGxb();


        ConsultVulmanageSyncEntity syncEntity = consultVulmanageSyncEntityMapper.selectByPrimaryKey(CommonConstant.CONSULT_UNIQUE_ID);
        String latestEsId = syncEntity.getLatestEsId();
        List<AssetReportVO> vulDataList = esService.selectList(latestEsId, QueryBuilders.boolQuery());

        log.info("增量同步任务,漏管快照数据{}条,上报部侧数据{}条", vulDataList.size(), gxbDataList.size());

        List<AssetReportVO> removeList = new ArrayList<>();
        //漏管DataVul为存量数据,最近一次部侧全量数据为本次需要比对的数据
        Set<String> vulDataIds = vulDataList.stream().map(AssetReportVO::getAssetId).collect(Collectors.toSet());
        Set<String> gxbDataIds = gxbDataList.stream().map(AssetReportVO::getAssetId).collect(Collectors.toSet());

        Set<String> removeIds = new HashSet<>();
        for (String vulDataId : vulDataIds) {
            if (!gxbDataIds.contains(vulDataId)) {
                removeIds.add(vulDataId);
                AssetReportVO remove = new AssetReportVO();
                remove.setAssetId(vulDataId);
                remove.setState("3");
                removeList.add(remove);
            }
        }
        //部侧数据中废弃资产 认为0下线 2停用 3删除均为废弃资产, state=1为新增和变更资产
        Set<AssetReportVO> gxbRemoveIds = gxbDataList.stream().filter(s -> !"1".equals(s.getState())).collect(Collectors.toSet());
        for (AssetReportVO reportVO : gxbRemoveIds) {
            AssetReportVO remove = new AssetReportVO();
            remove.setAssetId(reportVO.getAssetId());
            remove.setState(reportVO.getState());
            removeList.add(remove);
        }
        log.info("本次资产废弃或删除{}条", removeList.size());

        //1资产新增+属性变更
        List<AssetReportVO> addList = gxbDataList.stream().filter(s -> "1".equals(s.getState())).collect(Collectors.toList());
        log.info("本次资产新增和属性变更共{}条", addList.size());

        //待更新数据
        List<AssetReportVO> syncDataList = new ArrayList<>();
        syncDataList.addAll(addList);
        syncDataList.addAll(removeList);

        try {
            //latestEsId删除旧数据,插入新增资产和变更资产, *废弃资产不入库
            esService.delete(latestEsId);
            if (ObjectUtil.isNotEmpty(addList)) {
                esService.addBatch(addList, latestEsId);
                log.error("增量更新插入es{}条", addList.size());
            }
        } catch (Exception e) {
            log.error("增量更新插入es异常", e);
        }


        commonService.sync(syncDataList, syncEntity, now, "UPD");

        try {
            JobKey jobKey=new JobKey(UpdateDataSyncJob.class.getSimpleName(), UpdateDataSyncJob.class.getSimpleName());
            scheduler.deleteJob(jobKey);
        }catch (Exception e){
            e.printStackTrace();
        }


        try {
            //新增增量更新同步任务
            String updCycle = syncEntity.getUpdCycle();
            String numStr = updCycle.substring(0, 2);
            String unit = updCycle.substring(2, updCycle.length());
            Integer num = numStr.startsWith("0") ? Integer.parseInt(numStr.substring(1, numStr.length())) : Integer.parseInt(numStr);


            JobDetail jobDetail = JobBuilder.newJob(UpdateDataSyncJob.class).
                    withIdentity(UpdateDataSyncJob.class.getSimpleName(), UpdateDataSyncJob.class.getSimpleName())
                    .build();

            DateTime executionTime = new DateTime();

            if (unit.equals("天")) {
                executionTime = DateUtil.offsetDay(now, num);
            } else if (unit.equals("时")) {
                executionTime = DateUtil.offsetHour(now, num);
            } else {
                executionTime = new DateTime(now);
            }

            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(UpdateDataSyncJob.class.getSimpleName(), UpdateDataSyncJob.class.getSimpleName())
                    .startAt(executionTime).build();
            scheduler.scheduleJob(jobDetail, trigger);
            log.error("新增增量更新同步任务,下次执行时间{}", DateUtil.formatDateTime(executionTime));


        } catch (Exception e) {
            log.error("同步增量数据异常", e);
        }


        log.info("结束同步增量数据--------");
    }

}