EsService.java 10.3 KB
package com.idss.vulsync.mvc.service;

import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.idss.vulsync.entity.AssetReportVO;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @Author: zc
 * @Date: 2021/11/19 14:35
 */
@Component
@Slf4j
public class EsService {


    @Autowired
    ObjectMapper objectMapper;
    @Autowired
    RestHighLevelClient client;



    public int delete(String indexName) {
        try {
            GetIndexRequest indexRequest = new GetIndexRequest(indexName);
            boolean inExists = client.indices().exists(indexRequest, RequestOptions.DEFAULT);

            if (inExists) {
                DeleteIndexRequest request = new DeleteIndexRequest(indexName);
                AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
                if (!response.isAcknowledged()) {
                    // 2:索引存在删除失败
                    log.info("删除索引失败{}",indexName);
                    return 2;
                }
            } else {
                // 1:索引不存在
                log.info("删除索引不存在{}",indexName);
                return 1;
            }
        } catch (IllegalArgumentException e) {
            throw e;
        } catch (Exception e) {
            log.error("删除索引异常{}",indexName,e);
        }
        // 0:索引存在并删除成功
        return 0;
    }



    /**
     * 批量插入
     */
    public void addBatch(List<AssetReportVO> models, String index) {
        if (!indexIsExist(index)) {
            createIndex(index);
        }
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request,
                                  BulkResponse response) {
//                    if (!response.hasFailures()) {
//                        log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
//                    } else {
//                        BulkItemResponse[] items = response.getItems();
//                        for (BulkItemResponse item : items) {
//                            if (item.isFailed()) {
//                                log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
//                                break;
//                            }
//                        }
//                    }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request,
                                  Throwable failure) {
                log.error("3. 【afterBulk-failure失败】es执行bluk失败", failure);
//                    List<DocWriteRequest<?>> requests = request.requests();
//                    List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
            }
        };
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                (request, bulkListener) ->
                        client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                listener)
                .setBulkActions(10000) // Execute the bulk every 10,000 requests
                .setBulkSize(new ByteSizeValue(100, ByteSizeUnit.MB)) // Execute the bulk every 5MB
                .build();

        try {
            for (AssetReportVO entity : models) {
                LinkedHashMap<String,Object> map = objectMapper.readValue(objectMapper.writeValueAsString(entity), TypeFactory.defaultInstance().constructMapType(LinkedHashMap.class, String.class, Object.class));
                IndexRequest indexRequest = new IndexRequest(index).id(entity.getAssetId())
                        .type(index)
                        .source(map);
                bulkProcessor.add(indexRequest);
            }
            bulkProcessor.flush();
            bulkProcessor.awaitClose(3, TimeUnit.MINUTES);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public boolean createIndex(String indexName) {
        if (indexIsExist(indexName)) {
            return true;
        }
        CreateIndexResponse createIndexResponse = null;
        try {
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
            createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        if (createIndexResponse.isAcknowledged()) {
            log.info("Index created");
            return true;
        } else {
            log.info("Index creation failed");
            return false;
        }
    }


    public boolean indexIsExist(String indexName) {
        Assert.notNull(indexName, "exist index name assert null!");
        GetIndexRequest getIndexRequest = new GetIndexRequest("fisher");
        boolean exists = false;
        try {
            exists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.info("indexIsExist found exception:" + e);
        }
        return exists;
    }


//    public boolean addBatch(List<Object> models, String index) {
//        for (Object model : models) {
//            String id = idGenerator.randomUUID();
//            IndexRequest indexRequest = new IndexRequest(index).id(id).source(JSON.toJSONString(model), XContentType.JSON);
//            try {
//                client.index(indexRequest, COMMON_OPTIONS);
//            } catch (IOException e) {
//                log.warn("inserts to es is error, errorMsg " + id, e);
//            }
//        }
//        return true;
//    }

    private IndicesOptions setIndicesOptionsParam() {
        return IndicesOptions.fromOptions(true, true, true, false, IndicesOptions.strictExpandOpenAndForbidClosed());
    }

    /**
     * @param index
     * @param queryBuilder
     * @return
     */
    public List<AssetReportVO> selectList(String index, QueryBuilder queryBuilder) {
        try {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(queryBuilder);
            searchSourceBuilder.size(10000);

            Scroll scroll = new Scroll(new TimeValue(600000));
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices(index);
            searchRequest.scroll(scroll);
            searchRequest.source(searchSourceBuilder);

            SearchResponse scrollResp = client.search(searchRequest, RequestOptions.DEFAULT);
            List<AssetReportVO> resultList = new ArrayList<>();
            AssetReportVO entity = null;


            //记录滚动id。
            String scrollId = scrollResp.getScrollId();
            //滚动查询部分,将从第10001条数据开始取。
            SearchHit[] scrollHits = scrollResp.getHits().getHits();

            for (SearchHit searchHit : scrollHits) {
                entity = JSON.parseObject(searchHit.getSourceAsString(), AssetReportVO.class);
                resultList.add(entity);
            }
            log.info("resultList size " + resultList.size());


            while (ObjectUtil.isNotNull(scrollHits) && scrollHits.length > 0) {
                //构造滚动查询条件
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                searchScrollRequest.scroll(scroll);
                //响应必须是上面的响应对象,需要对上一层进行覆盖。
                scrollResp = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
                scrollId = scrollResp.getScrollId();
                scrollHits = scrollResp.getHits().getHits();

                for (SearchHit searchHit : scrollHits) {
                    entity = JSON.parseObject(searchHit.getSourceAsString(), AssetReportVO.class);
                    resultList.add(entity);
                }
                log.info("resultList size " + resultList.size());

            }
            log.info("resultList final size " + resultList.size());
            return resultList;
        } catch (Exception e) {
            log.error("selectList timeindex is error,errorMsg ", e);
        }
        return new ArrayList<>();
    }


}