From 52a2527f6b570dc7b8836e91d3b9cf9ba04677ba Mon Sep 17 00:00:00 2001 From: jiang Date: Mon, 2 Dec 2024 02:34:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Impl/DataSetBasicFileServiceImpl.java | 2 +- .../java/com/bonus/ai/utils/MinioUtil.java | 63 ++++++++++++++++--- 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/bonus-modules/bonus-ai/src/main/java/com/bonus/ai/service/Impl/DataSetBasicFileServiceImpl.java b/bonus-modules/bonus-ai/src/main/java/com/bonus/ai/service/Impl/DataSetBasicFileServiceImpl.java index 557363a..ce74229 100644 --- a/bonus-modules/bonus-ai/src/main/java/com/bonus/ai/service/Impl/DataSetBasicFileServiceImpl.java +++ b/bonus-modules/bonus-ai/src/main/java/com/bonus/ai/service/Impl/DataSetBasicFileServiceImpl.java @@ -259,7 +259,7 @@ public class DataSetBasicFileServiceImpl implements DataSetBasicFileService { minioUtil.uploadFile(file, objectName); // 检查是否是最后一个分片,若是则合并 if (chunk == totalChunks) { - DataSetBasicFileEntity entity = minioUtil.mergeChunks(filename, totalChunks, minioUtil.joinPath(SecurityUtils.getUserId().toString(), fileUrl)); + DataSetBasicFileEntity entity = minioUtil.mergeChunksWithMultithreading(filename, totalChunks, minioUtil.joinPath(SecurityUtils.getUserId().toString(), fileUrl)); entity.setCreateBy(SecurityUtils.getUserId().toString()); entity.setCreateTime(DateUtils.getNowDate()); entity.setIsDirectory("0"); diff --git a/bonus-modules/bonus-ai/src/main/java/com/bonus/ai/utils/MinioUtil.java b/bonus-modules/bonus-ai/src/main/java/com/bonus/ai/utils/MinioUtil.java index ccb6210..3cf4320 100644 --- a/bonus-modules/bonus-ai/src/main/java/com/bonus/ai/utils/MinioUtil.java +++ b/bonus-modules/bonus-ai/src/main/java/com/bonus/ai/utils/MinioUtil.java @@ -21,6 +21,7 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** * MinioUtil 工具类 @@ -112,22 +113,59 @@ public class MinioUtil { * @param folderPath 文件夹路径 * @return 合并后的文件 */ - public DataSetBasicFileEntity mergeChunks(String filename, int totalChunks ,String folderPath) { + public DataSetBasicFileEntity mergeChunksWithMultithreading(String filename, int totalChunks, String folderPath) { + // 创建线程池 + ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); try { + // 分片信息 List partNames = getPartNames(filename, totalChunks); List sources = new ArrayList<>(); for (String partName : partNames) { - sources.add(ComposeSource.builder().bucket(minioConfig.getBucketName()).object(partName).build()); + sources.add(ComposeSource.builder() + .bucket(minioConfig.getBucketName()) + .object(partName) + .build()); } + String finalPath = joinPath(folderPath, filename); + + // 分批次提交合并任务 + int batchSize = 40; // 每批次的最大分片数 + List> futures = new ArrayList<>(); + for (int i = 0; i < sources.size(); i += batchSize) { + int end = Math.min(i + batchSize, sources.size()); + List batch = sources.subList(i, end); + String tempPath = finalPath + "_temp_" + (i / batchSize); + futures.add(threadPool.submit(() -> mergeBatch(tempPath, batch))); + } + + // 等待所有批次合并完成,并再次合并最终结果 + List mergedSources = new ArrayList<>(); + for (Future future : futures) { + String tempPath = future.get(); + mergedSources.add(ComposeSource.builder() + .bucket(minioConfig.getBucketName()) + .object(tempPath) + .build()); + } + + // 合并最终文件 minioClient.composeObject(ComposeObjectArgs.builder() .bucket(minioConfig.getBucketName()) .object(finalPath) - .sources(sources) + .sources(mergedSources) .build()); + + // 删除临时文件和分片文件 removeParts(partNames); - LOGGER.info("Merged and uploaded file: {}", finalPath); - // 获取合并后文件的详细信息 + for (ComposeSource source : mergedSources) { + minioClient.removeObject(RemoveObjectArgs.builder() + .bucket(minioConfig.getBucketName()) + .object(source.object()) + .build()); + } + + LOGGER.info("Merged final file: {}", finalPath); StatObjectResponse stat = minioClient.statObject(StatObjectArgs.builder() .bucket(minioConfig.getBucketName()) .object(finalPath) @@ -138,11 +176,22 @@ public class MinioUtil { entity.setFileSize(stat.size()); return entity; } catch (Exception e) { - LOGGER.error("Error merging chunks: {}", e.getMessage(), e); - return null; + LOGGER.error("Error in multithreaded merge: {}", e.getMessage(), e); + return null; + } finally { + threadPool.shutdown(); } } + private String mergeBatch(String tempPath, List batch) throws Exception { + minioClient.composeObject(ComposeObjectArgs.builder() + .bucket(minioConfig.getBucketName()) + .object(tempPath) + .sources(batch) + .build()); + LOGGER.info("Merged batch file: {}", tempPath); + return tempPath; + } /** * 下载指定文件,以 InputStream 形式返回