提交代码

This commit is contained in:
jiang 2024-12-02 02:34:56 +08:00
parent 8d043dd271
commit 52a2527f6b
2 changed files with 57 additions and 8 deletions

View File

@ -259,7 +259,7 @@ public class DataSetBasicFileServiceImpl implements DataSetBasicFileService {
minioUtil.uploadFile(file, objectName);
// 检查是否是最后一个分片若是则合并
if (chunk == totalChunks) {
DataSetBasicFileEntity entity = minioUtil.mergeChunks(filename, totalChunks, minioUtil.joinPath(SecurityUtils.getUserId().toString(), fileUrl));
DataSetBasicFileEntity entity = minioUtil.mergeChunksWithMultithreading(filename, totalChunks, minioUtil.joinPath(SecurityUtils.getUserId().toString(), fileUrl));
entity.setCreateBy(SecurityUtils.getUserId().toString());
entity.setCreateTime(DateUtils.getNowDate());
entity.setIsDirectory("0");

View File

@ -21,6 +21,7 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* MinioUtil 工具类
@ -112,22 +113,59 @@ public class MinioUtil {
* @param folderPath 文件夹路径
* @return 合并后的文件
*/
public DataSetBasicFileEntity mergeChunks(String filename, int totalChunks ,String folderPath) {
public DataSetBasicFileEntity mergeChunksWithMultithreading(String filename, int totalChunks, String folderPath) {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
try {
// 分片信息
List<String> partNames = getPartNames(filename, totalChunks);
List<ComposeSource> sources = new ArrayList<>();
for (String partName : partNames) {
sources.add(ComposeSource.builder().bucket(minioConfig.getBucketName()).object(partName).build());
sources.add(ComposeSource.builder()
.bucket(minioConfig.getBucketName())
.object(partName)
.build());
}
String finalPath = joinPath(folderPath, filename);
// 分批次提交合并任务
int batchSize = 40; // 每批次的最大分片数
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < sources.size(); i += batchSize) {
int end = Math.min(i + batchSize, sources.size());
List<ComposeSource> batch = sources.subList(i, end);
String tempPath = finalPath + "_temp_" + (i / batchSize);
futures.add(threadPool.submit(() -> mergeBatch(tempPath, batch)));
}
// 等待所有批次合并完成并再次合并最终结果
List<ComposeSource> mergedSources = new ArrayList<>();
for (Future<String> future : futures) {
String tempPath = future.get();
mergedSources.add(ComposeSource.builder()
.bucket(minioConfig.getBucketName())
.object(tempPath)
.build());
}
// 合并最终文件
minioClient.composeObject(ComposeObjectArgs.builder()
.bucket(minioConfig.getBucketName())
.object(finalPath)
.sources(sources)
.sources(mergedSources)
.build());
// 删除临时文件和分片文件
removeParts(partNames);
LOGGER.info("Merged and uploaded file: {}", finalPath);
// 获取合并后文件的详细信息
for (ComposeSource source : mergedSources) {
minioClient.removeObject(RemoveObjectArgs.builder()
.bucket(minioConfig.getBucketName())
.object(source.object())
.build());
}
LOGGER.info("Merged final file: {}", finalPath);
StatObjectResponse stat = minioClient.statObject(StatObjectArgs.builder()
.bucket(minioConfig.getBucketName())
.object(finalPath)
@ -138,11 +176,22 @@ public class MinioUtil {
entity.setFileSize(stat.size());
return entity;
} catch (Exception e) {
LOGGER.error("Error merging chunks: {}", e.getMessage(), e);
return null;
LOGGER.error("Error in multithreaded merge: {}", e.getMessage(), e);
return null;
} finally {
threadPool.shutdown();
}
}
private String mergeBatch(String tempPath, List<ComposeSource> batch) throws Exception {
minioClient.composeObject(ComposeObjectArgs.builder()
.bucket(minioConfig.getBucketName())
.object(tempPath)
.sources(batch)
.build());
LOGGER.info("Merged batch file: {}", tempPath);
return tempPath;
}
/**
* 下载指定文件 InputStream 形式返回