1. 封装实体类,并对应excel表中的列
@Data public class User { private String md5; private String id; @ExcelProperty(value = "age") private String age; @ExcelProperty(value = "username") private String name; }
2. 批量入库
private void insertBatchToES(List dataList, String indexName) { try { BulkProcessor bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> elasticsearchClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) { // 准备执行前的操作 } @Override public void afterBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request, org.elasticsearch.action.bulk.BulkResponse response) { if (response != null) { int insertedCount = request.numberOfActions(); // 获取请求中操作的数量,即插入的条数 log.info("批量插入 " + insertedCount + " 条数据成功"); } } @Override public void afterBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request, Throwable failure) { log.info("批量插入 error"); } }) // 设置每1000个请求执行一次批处理 .setBulkActions(500) .build(); for(User user : dataList) { String jsonString = convertToJson(user); IndexRequest indexRequest = new IndexRequest(indexName) .id(user.getId()) .source(jsonString, XContentType.JSON); bulkProcessor.add(indexRequest); } bulkProcessor.awaitClose(10, TimeUnit.MINUTES); bulkProcessor.close(); } catch (InterruptedException | JsonProcessingException e) { e.printStackTrace(); } }
将对象转json工具类:
public String convertToJson(user) throws JsonProcessingException { String objStr = JSON.toJSONString(user, SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty, SerializerFeature.NotWriteDefaultValue); return objStr; }
3. 读指定文件excel , 封装List
public void importExcelToES(String excelFilePath, String indexName) { try { EasyExcel.read(excelFilePath, User.class, new AnalysisEventListener() { private List dataList = new ArrayList<>(); @Override public void invoke(UserFansExcel data, AnalysisContext analysisContext) { long id = generator.nextId(); data.setId(String.valueOf(id)); if (dataList.size() >= 500) { insertBatchToES(filteredList, indexName); dataList.clear(); } } @Override public void doAfterAllAnalysed(AnalysisContext analysisContext) { if (!dataList.isEmpty()) { insertBatchToES(dataList, indexName); } } }).sheet().doRead(); } catch (Exception e) { e.printStackTrace(); }
3.1 读执行目录下的所有excel文件,这些文件的格式是一样的
public void readExcelFilesFromDirectory(String directoryPath) throws IOException { List dataList = new ArrayList<>(); File dir = new File(directoryPath); File[] files = dir.listFiles((d, name) -> name.endsWith(".xlsx")); if (files != null) { for (File file : files) { System.out.println(file.getName()); try { String primaryUserId = file.getName().replace(".xlsx", ""); try (FileInputStream fis = new FileInputStream(file)) { EasyExcel.read(fis, User.class, new AnalysisEventListener() { @Override public void invoke(User data, AnalysisContext context) { data.setName(primaryUserId); dataList.add(data); } @Override public void doAfterAllAnalysed(AnalysisContext analysisContext) { } }).sheet().doRead(); } } catch (Exception e) { e.printStackTrace(); } if(dataList.size() >0){ //这里可以插入数据库 dataList.clear(); } } } }