C++ 新人,写个去重练练手。
- 结果:2.50 GB 文本( 900 万行,336 字/行),1GB 内存限制,6 秒保持顺序地去重完毕。
- 硬件:七年前 i5-8250U 轻薄本,读写在内存盘中(读 8G/s ,写 3G/s ,1000 元 2TB 固态都能有的速度,不过分吧?)
- 预计:4 小时能去重完毕 6.20TB ?
新人刚学会写,可能还有诸多不足之处。
写的过程中,还有很多优化点没写。比如:
1. 排序时,子范围太小,转为其他排序方式。
2. 读写文件,用的默认缓冲区大小( 4K ? 16K ?不知道多大,估计很小。。)
3. 分块时,可以去除重复行,减少稍后读写数据量。
继续改进点:
- 转用 hash 去重,大幅减少硬盘读写数据量。
- 只是要承担极小概率重复风险。但 Git 也在用这种方式。。
- 实在不行,发现重复 hash 时,再去读原文件完整比较。
## 截图
https://i.imgur.com/N29c3EY.png## 代码
```c++
// V 站吞空格,缩进改为全角空格了
#include <queue>
#include <vector>
#include <thread>
#include <cstring>
#include <sstream>
#include <fstream>
#include <iostream>
#include <algorithm>
#include <stdexcept>
#include <filesystem>
#include <string_view>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>
using std::ios;
using std::vector, std::string_view;
using std::to_string, std::ofstream;
namespace fs = std::filesystem;
int max_thread = 8;
size_t max_memory = 1ull << 30;
const auto tmpDir = fs::temp_directory_path();
struct Meta {
ptrdiff_t offset;
size_t length;
friend ofstream& operator<< (ofstream& ofs, const Meta& self) {
ofs.write(reinterpret_cast<const char*>(&self), sizeof self);
return ofs;
}
};
struct Line {
int chunkIdx{};
ptrdiff_t offset{};
string_view str{};
auto operator> (const Line& other) {
return std::tie(str, chunkIdx, offset)
> std::tie(other.str, other.chunkIdx, other.offset);
}
};
template <class T = char>
class MappedFile {
int fd = -1;
const T* ptr{};
public:
const T* data{};
size_t size{};
explicit MappedFile(const fs::path& file) {
struct stat64 fs{};
fd = open64(file.c_str(), O_RDONLY);
if (fd != -1 && fstat64(fd, &fs) != -1) {
size = static_cast<size_t>(fs.st_size) / sizeof(T);
data = ptr = static_cast<T*>(mmap64(nullptr, fs.st_size, PROT_READ, MAP_SHARED, fd, 0));
}
}
MappedFile(const MappedFile& other) = delete;
MappedFile(MappedFile&& old) noexcept:
fd(old.fd), ptr(old.ptr), data(old.data), size(old.size) {
old.fd = -1;
old.ptr = old.data = nullptr;
}
~MappedFile() {
if (data) munmap(const_cast<T*>(data), size * sizeof(T));
if (fd != -1) close(fd);
}
auto end() const {
return data + size;
}
operator const T*&() {
return ptr;
}
};
template <class Iter>
void mergeSort(Iter* src, Iter* dst, size_t len, int max_thread = 1, int id = 1) {
if (id == 1)
std::copy_n(src, len, dst);
if (len > 1) {
std::thread t;
size_t half = len / 2;
if (id < max_thread) // 只在左子树开启新线程
t = std::thread(mergeSort<Iter>, dst, src, half, max_thread, id * 2);
else
mergeSort(dst, src, half, max_thread, id * 2);
mergeSort(dst + half, src + half, len - half, max_thread, id * 2 + 1);
if (t.joinable())
t.join();
std::merge(src, src + half, src + half, src + len, dst);
}
}
// 步骤 1:分块,返回块数
int step1_SplitChunks(const fs::path& inFile) {
// 映射源文件
MappedFile text {inFile};
if (!text) throw std::runtime_error("无法打开输入文件");
// 分块,直到源文件结束
int chunkCount = 0;
for (auto chunkBegin = +text; (chunkBegin = text) < text.end();) {
// 不断记录行,直到(此次遍历过的源文件大小 + 行数据数组大小 * 2 )到达内存限制
vector<string_view> lines, sortedLines;
while (text < text.end() && (text - chunkBegin + sizeof(string_view) * lines.size() * 2) < max_memory) {
auto lineEnd = (char*) std::memchr(text, '\n', text.end() - text);
auto lineLen = (lineEnd ? lineEnd : text.end()) - text;
lines.emplace_back(text, lineLen);
text += lineLen + 1;
}
// 准备写入(排序后)分块、行数据。
ofstream chunkFile (tmpDir / (to_string(chunkCount) + ".txt"), ios::binary | ios::trunc);
ofstream metaFile (tmpDir / (to_string(chunkCount) + ".meta"), ios::binary | ios::trunc);
chunkCount++;
// 多线程排序行数组
sortedLines.resize(lines.size());
mergeSort(lines.data(), sortedLines.data(), lines.size(), max_thread);
// 保存(排序后)每行文本、偏移、长度
for (auto line: sortedLines) {
chunkFile << line;
metaFile << Meta{line.data() - chunkBegin, line.size()};
}
// 检查
if (!chunkFile || !metaFile) {
std::stringstream buf;
buf << "写入第 " << chunkCount << " 分块时出错!";
throw std::runtime_error(buf.str());
}
}
return chunkCount;
}
// 步骤 2:查找重复行
void step2_FindDupLines(int chunkCount) {
vector<ofstream> chunkDups;
vector<MappedFile<>> chunkText;
vector<MappedFile<Meta>> chunkMeta;
std::priority_queue<Line, vector<Line>, std::greater<>> lines;
// 映射所有分块的文本、行数据文件,
// 也准备好记录各分块重复行数据的文件
for (int idx = 0; idx < chunkCount; idx++) {
chunkText.emplace_back(tmpDir / (to_string(idx) + ".txt"));
chunkMeta.emplace_back(tmpDir / (to_string(idx) + ".meta"));
chunkDups.emplace_back(tmpDir / (to_string(idx) + ".dups"), ios::binary | ios::trunc);
lines.push({idx});
}
// 利用小根堆,按(行内容,分块号,偏移量)顺序,流式多路归并
string_view last{};
while (!lines.empty()) {
// 与上一行相同,则将偏移量写入,对应分块待删除行名单内
auto line =
lines.top(); lines.pop();
if (last == line.str && !last.empty())
chunkDups[line.chunkIdx].write((char*)&line.offset, sizeof line.offset);
last = line.str;
// 该分块行数据未遍历完,则继续将下一行添加进小根堆中
auto& text = chunkText[line.chunkIdx];
auto& meta = chunkMeta[line.chunkIdx];
if (meta < meta.end()) {
lines.push({line.chunkIdx, (*meta).offset, {text, (*meta).length}});
text += (*meta).length;
meta++;
}
}
// 检查
for (auto&& file: chunkDups) {
if (!file) {
std::stringstream buf;
buf << "保存第 " << chunkCount << " 分块删除名单时出错!";
throw std::runtime_error(buf.str());
}
}
}
// 步骤 3:合并分块
void step3_MergeChunks(int chunkCount, const fs::path& outFile) {
ofstream textOut {outFile, ios::binary | ios::trunc};
if (!textOut) throw std::runtime_error("无法打开输出文件");
for (int idx = 0; idx < chunkCount; idx++) {
// 映射分块(排序后)文本、行数据、删除名单
MappedFile<> text {tmpDir / (to_string(idx) + ".txt")};
MappedFile<Meta> meta {tmpDir / (to_string(idx) + ".meta")};
MappedFile<decltype(Meta::offset)> dups {tmpDir / (to_string(idx) + ".dups")};
// 剔除删除名单中的行
vector<Line> lines; lines.reserve(meta.size);
for (; meta < meta.end(); text += (*meta++).length) {
if (dups < dups.end() && *dups == (*meta).offset)
dups++;
else
lines.push_back({idx, (*meta).offset, {text, (*meta).length}});
}
// 再按偏移量顺序排序好
std::sort(lines.begin(), lines.end(), [](auto&& a, auto&& b) {
return a.offset < b.offset;
});
// 逐行输出
for (auto&& line: lines)
textOut << line.str << '\n';
}
// 检查
if (!textOut)
throw std::runtime_error("写入输出文件时出错!");
}
int main(int argc, const char* argv[]) {
if (argc < 3) {
std::stringstream buf;
buf << "大文本去重并保持顺序工具\n\n"
<< "用法:" << argv[0] << " 输入文件 输出文件 "
<< "[内存限制 MB = " << (max_memory >> 20) << "] "
<< "[线程限制 = " << max_thread << "]";
std::cerr << buf.str() << std::endl;
return -1;
}
auto inFile = argv[1];
auto outFile = argv[2];
if (argc > 3) max_memory = (std::max)(std::stoull(argv[3]), 1ull) << 20ull;
if (argc > 4) max_thread = (std::max)((std::min)(std::stoi(argv[4]), 256), 1);
auto chunkCount = step1_SplitChunks(inFile);
step2_FindDupLines(chunkCount);
step3_MergeChunks(chunkCount, outFile);
// 清空临时文件
for (int i = 0; i < chunkCount; i++)
for (auto&& suffix: {".txt", ".meta", ".dups"})
fs::remove(tmpDir / (to_string(i) + suffix));
}
```