wxf666 最近的时间轴更新
wxf666

wxf666

V2EX 第 280897 号会员,加入于 2018-01-08 18:22:24 +08:00
今日活跃度排名 6794
wxf666 最近回复了
@Keuin #20 写入量还是太大了。我手撸,只写一遍,都觉得大。。

换成算 SHA-1 ,最差情况,只需要写 203e8 * (20 + 6 该行偏移量) / 2 ^ 30 = 492 GB 即可。

当然,自己写肯定不如久经考验的工具成熟稳定,第一次花的时间精力也多。。



@heguangyu5 #21 C/C++ 新人,看了下排行前三的 HashTable ,感觉每行只需 11 字节即可?

6 字节原始文件偏移量 + 5 字节与原始位置的距离( unordered_dense )或下一节点数组下标( emhash )?

狠一点儿,ceil(log2(6.20 * 2^40)) - 1 + ceil(log2(203e8)) = 77 bits / 行?总共需 182 GB 即可?

剩下几字节,你用来存啥了呢。。
@Keuin #18 你是说,类似这样吗?

awk '{print NR" "$0}' in.txt | sort -u -k2 | sort -n | cut -d' ' -f 2-

感觉写入量翻倍。。而且很难扩展到(可能跨多行的) csv ?


@heguangyu5 #13

1. 平均下来,每行花费 14.4 字节内存,肯定没存原字符串。

hash 冲突时,你要回源文件,具体比较两行吗?那随机读会不会很多。。

换句话说,随机生成 1E 行,又把这 1E 行打乱,追加到原文件。dedup-use-more-mem 会随机读 1E 次吗?

2. dedup-use-less-mem 需要额外写多大文件呢?有多少次随机读写呢?这个支持流式读源文件吗?
@Keuin #11 原帖还要求,保持文本原有顺序诶。。

分块归并排序确实好用,我在原帖也手撸了一个,i5-8250U 每分钟能排序 25 GB 。但读写量还是太大了。。

换成只写入 MD5/SHA-1 值的话,读写量能减少 95%。代价就是有极小概率会哈希冲突。。

但也能通过回原文件比较两行解决。代价就是可能会有不少的随机读,和重复行数量成正比。。
有点感兴趣,问一下楼主:

1. 楼主硬盘读写速度多少?

2. 可以指定限制多少内存完成吗?

3. 有不同的两行,恰好 hash 相同,会出问题吗?

4. 除顺序读一次原文件外,还需要额外读写多少文件吗?

5. 能轻而易举改造成,针对 CSV 文件(可能有字符串跨多行),且现有成绩影响不大,是吗?
@lsk569937453
@drymonfidelia #4

我在原贴 99 楼回复了,用的分块排序、归并去重方法,

6 秒(保持顺序地)去重 2.50 GB 文本( 900 万行,336 字/行),
照此速度,预计 4 小时即能去重完毕 6.20 TB 文本?

这是在七年前 i5-8250U 轻薄本上,限制 1 GB 内存,四线程跑出来的。(截图在原贴)
当然,文本存在内存盘里,读 8G/s ,写 3G/s ,1000 元 2TB 的固态都有的水平,应该不算过分。


@tool2dx #3

精确去重,不应该是每行两两比较吗?

但如果能承受极小概率冲突,用哈希应该会快很多。


@jsjscool #18

大佬是用什么写的,只需要几行就行呢?

我也是用的 merge sort + 多线程,但写了两三百行了。。


@securityCoding #23

能自己手撸一个跑吗?感觉也不算慢呀。。


@noparking188 #25

手撸了一个,不知对你有帮助吗?
C++ 新人,写个去重练练手。

- 结果:2.50 GB 文本( 900 万行,336 字/行),1GB 内存限制,6 秒保持顺序地去重完毕。
- 硬件:七年前 i5-8250U 轻薄本,读写在内存盘中(读 8G/s ,写 3G/s ,1000 元 2TB 固态都能有的速度,不过分吧?)
- 预计:4 小时能去重完毕 6.20TB ?

新人刚学会写,可能还有诸多不足之处。
写的过程中,还有很多优化点没写。比如:

1. 排序时,子范围太小,转为其他排序方式。
2. 读写文件,用的默认缓冲区大小( 4K ? 16K ?不知道多大,估计很小。。)
3. 分块时,可以去除重复行,减少稍后读写数据量。

继续改进点:

- 转用 hash 去重,大幅减少硬盘读写数据量。
- 只是要承担极小概率重复风险。但 Git 也在用这种方式。。
- 实在不行,发现重复 hash 时,再去读原文件完整比较。

## 截图


## 代码

```c++
// V 站吞空格,缩进改为全角空格了

#include <queue>
#include <vector>
#include <thread>
#include <cstring>
#include <sstream>
#include <fstream>
#include <iostream>
#include <algorithm>
#include <stdexcept>
#include <filesystem>
#include <string_view>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>

using std::ios;
using std::vector, std::string_view;
using std::to_string, std::ofstream;
namespace fs = std::filesystem;

int max_thread = 8;
size_t max_memory = 1ull << 30;
const auto tmpDir = fs::temp_directory_path();

struct Meta {
   ptrdiff_t offset;
   size_t length;
   friend ofstream& operator<< (ofstream& ofs, const Meta& self) {
     ofs.write(reinterpret_cast<const char*>(&self), sizeof self);
     return ofs;
  }
};

struct Line {
   int chunkIdx{};
   ptrdiff_t offset{};
   string_view str{};
   auto operator> (const Line& other) {
     return std::tie(str, chunkIdx, offset)
      > std::tie(other.str, other.chunkIdx, other.offset);
  }
};

template <class T = char>
class MappedFile {
   int fd = -1;
   const T* ptr{};
public:
   const T* data{};
   size_t size{};

   explicit MappedFile(const fs::path& file) {
     struct stat64 fs{};
     fd = open64(file.c_str(), O_RDONLY);
     if (fd != -1 && fstat64(fd, &fs) != -1) {
       size = static_cast<size_t>(fs.st_size) / sizeof(T);
       data = ptr = static_cast<T*>(mmap64(nullptr, fs.st_size, PROT_READ, MAP_SHARED, fd, 0));
    }
  }

   MappedFile(const MappedFile& other) = delete;

   MappedFile(MappedFile&& old) noexcept:
     fd(old.fd), ptr(old.ptr), data(old.data), size(old.size) {
     old.fd = -1;
     old.ptr = old.data = nullptr;
  }

  ~MappedFile() {
     if (data) munmap(const_cast<T*>(data), size * sizeof(T));
     if (fd != -1) close(fd);
  }

   auto end() const {
     return data + size;
  }

   operator const T*&() {
     return ptr;
  }
};

template <class Iter>
void mergeSort(Iter* src, Iter* dst, size_t len, int max_thread = 1, int id = 1) {
   if (id == 1)
     std::copy_n(src, len, dst);
   if (len > 1) {
     std::thread t;
     size_t half = len / 2;
     if (id < max_thread) // 只在左子树开启新线程
       t = std::thread(mergeSort<Iter>, dst, src, half, max_thread, id * 2);
     else
       mergeSort(dst, src, half, max_thread, id * 2);
     mergeSort(dst + half, src + half, len - half, max_thread, id * 2 + 1);
     if (t.joinable())
       t.join();
     std::merge(src, src + half, src + half, src + len, dst);
  }
}

// 步骤 1:分块,返回块数
int step1_SplitChunks(const fs::path& inFile) {

  // 映射源文件
   MappedFile text {inFile};
   if (!text) throw std::runtime_error("无法打开输入文件");

  // 分块,直到源文件结束
   int chunkCount = 0;
   for (auto chunkBegin = +text; (chunkBegin = text) < text.end();) {

    // 不断记录行,直到(此次遍历过的源文件大小 + 行数据数组大小 * 2 )到达内存限制
     vector<string_view> lines, sortedLines;
     while (text < text.end() && (text - chunkBegin + sizeof(string_view) * lines.size() * 2) < max_memory) {
       auto lineEnd = (char*) std::memchr(text, '\n', text.end() - text);
       auto lineLen = (lineEnd ? lineEnd : text.end()) - text;
       lines.emplace_back(text, lineLen);
       text += lineLen + 1;
    }

    // 准备写入(排序后)分块、行数据。
     ofstream chunkFile (tmpDir / (to_string(chunkCount) + ".txt"), ios::binary | ios::trunc);
     ofstream metaFile (tmpDir / (to_string(chunkCount) + ".meta"), ios::binary | ios::trunc);
     chunkCount++;

    // 多线程排序行数组
     sortedLines.resize(lines.size());
     mergeSort(lines.data(), sortedLines.data(), lines.size(), max_thread);

    // 保存(排序后)每行文本、偏移、长度
     for (auto line: sortedLines) {
       chunkFile << line;
       metaFile << Meta{line.data() - chunkBegin, line.size()};
    }

    // 检查
     if (!chunkFile || !metaFile) {
       std::stringstream buf;
       buf << "写入第 " << chunkCount << " 分块时出错!";
       throw std::runtime_error(buf.str());
    }
  }

   return chunkCount;
}

// 步骤 2:查找重复行
void step2_FindDupLines(int chunkCount) {

   vector<ofstream> chunkDups;
   vector<MappedFile<>> chunkText;
   vector<MappedFile<Meta>> chunkMeta;
   std::priority_queue<Line, vector<Line>, std::greater<>> lines;

  // 映射所有分块的文本、行数据文件,
  // 也准备好记录各分块重复行数据的文件
   for (int idx = 0; idx < chunkCount; idx++) {
     chunkText.emplace_back(tmpDir / (to_string(idx) + ".txt"));
     chunkMeta.emplace_back(tmpDir / (to_string(idx) + ".meta"));
     chunkDups.emplace_back(tmpDir / (to_string(idx) + ".dups"), ios::binary | ios::trunc);
     lines.push({idx});
  }

  // 利用小根堆,按(行内容,分块号,偏移量)顺序,流式多路归并
   string_view last{};
   while (!lines.empty()) {

    // 与上一行相同,则将偏移量写入,对应分块待删除行名单内
     auto line = lines.top(); lines.pop();
     if (last == line.str && !last.empty())
       chunkDups[line.chunkIdx].write((char*)&line.offset, sizeof line.offset);
     last = line.str;

    // 该分块行数据未遍历完,则继续将下一行添加进小根堆中
     auto& text = chunkText[line.chunkIdx];
     auto& meta = chunkMeta[line.chunkIdx];
     if (meta < meta.end()) {
       lines.push({line.chunkIdx, (*meta).offset, {text, (*meta).length}});
       text += (*meta).length;
       meta++;
    }
  }

  // 检查
   for (auto&& file: chunkDups) {
     if (!file) {
       std::stringstream buf;
       buf << "保存第 " << chunkCount << " 分块删除名单时出错!";
       throw std::runtime_error(buf.str());
    }
  }
}

// 步骤 3:合并分块
void step3_MergeChunks(int chunkCount, const fs::path& outFile) {

   ofstream textOut {outFile, ios::binary | ios::trunc};
   if (!textOut) throw std::runtime_error("无法打开输出文件");

   for (int idx = 0; idx < chunkCount; idx++) {

    // 映射分块(排序后)文本、行数据、删除名单
     MappedFile<> text {tmpDir / (to_string(idx) + ".txt")};
     MappedFile<Meta> meta {tmpDir / (to_string(idx) + ".meta")};
     MappedFile<decltype(Meta::offset)> dups {tmpDir / (to_string(idx) + ".dups")};

    // 剔除删除名单中的行
     vector<Line> lines; lines.reserve(meta.size);
     for (; meta < meta.end(); text += (*meta++).length) {
       if (dups < dups.end() && *dups == (*meta).offset)
         dups++;
       else
         lines.push_back({idx, (*meta).offset, {text, (*meta).length}});
    }

    // 再按偏移量顺序排序好
     std::sort(lines.begin(), lines.end(), [](auto&& a, auto&& b) {
       return a.offset < b.offset;
    });

    // 逐行输出
     for (auto&& line: lines)
       textOut << line.str << '\n';
  }

  // 检查
   if (!textOut)
     throw std::runtime_error("写入输出文件时出错!");
}

int main(int argc, const char* argv[]) {

   if (argc < 3) {
     std::stringstream buf;
     buf << "大文本去重并保持顺序工具\n\n"
      << "用法:" << argv[0] << " 输入文件 输出文件 "
      << "[内存限制 MB = " << (max_memory >> 20) << "] "
      << "[线程限制 = " << max_thread << "]";
     std::cerr << buf.str() << std::endl;
     return -1;
  }

   auto inFile = argv[1];
   auto outFile = argv[2];
   if (argc > 3) max_memory = (std::max)(std::stoull(argv[3]), 1ull) << 20ull;
   if (argc > 4) max_thread = (std::max)((std::min)(std::stoi(argv[4]), 256), 1);

   auto chunkCount = step1_SplitChunks(inFile);
   step2_FindDupLines(chunkCount);
   step3_MergeChunks(chunkCount, outFile);

  // 清空临时文件
   for (int i = 0; i < chunkCount; i++)
     for (auto&& suffix: {".txt", ".meta", ".dups"})
       fs::remove(tmpDir / (to_string(i) + suffix));
}
```
@phrack #15

突然想起来,zram 可能行不通。。

sha1 的结果,是不是相当于随机数据?随机数据,压缩不了啥吧。。

即,256 GB 内存,当不了 512 GB 用?全都用来存压缩比 100% 后的数据了?


@cabbage #57

需要保留 27 行原始 string ,在小根堆里。

第二步目的:检查出各块中,偏移量非最小的重复行,记录进删除名单中。

第二步时,已经有 26.5 个 240GB 的、排序好的块。

参考多路归并,可以流式构造出有序的 (string, offset, chunk_index)。

当 string 与上一个不同时,说明碰到偏移量最小的新行了(即,全文首次出现)。

当 string 与上一个相同时,说明重复行了,此时往 "to_del_${chunk_index}.txt" 里记录 offset 。

(可以攒多点再写,反正只用了 27 个字符串 + 27 * 1GB 缓冲区,还剩 200+GB 内存呢。。)

以前写过类似的,10+ MB 内存去重 13 GB 文件,里面也有用到多路归并:/t/1031275#reply15
36 37 楼,好像没 @ 成功。。再试一下。。


@opengps #3
@msg7086 #32

如果数据库,每秒写入 10W 条,总计要 203e8 / 1e5 / 3600 = 56 小时?


@hbcolorful #17
@NotLongNil #30

用布隆过滤,几十 GB 好像不够。
在线算了下,50 GiB + 15 函数,都会有 1 / 26000 概率出错,大约丢失 80W 行数据?
250 GiB + 11 函数,算完 203 亿行,才能有 83.8% 的概率,不丢失任何数据,也不保留任何重复行?


@dingwen07 #38 是的。37 楼有提及到,用多少哈希函数。


@dode #42

《写入到对应分区下面》这个是缓存尽可能多的文本(如 1GB ),再写入,是吗?
《检索特定行文本,是否在对应分区内存在》这个是如何做到,顺序读的呢?


@tonywangcn #44

平均每十亿条,就误认为一次,某行是重复行,导致丢失该行?
那你要问 @drymonfidelia 愿不愿意丢失几十行数据了。。
@cndenis #14 ,@hbcolorful #17 ,@NotLongNil #30:

用布隆过滤,几十 GB 好像不够。
在线算了下,50 GB + 15 函数,都会有 1 / 25000 概率出错。。
250 GB + 11 函数,算完 203 亿行,才能有 83.8% 的概率,一个不出错?


@phrack #15:

压缩内存,来存 hash ?好像真的可行。。
平均而言,写入 (372 << 30) / 4096 = 1 亿次,就会占满 372 GB 内存页。即,几乎一开始就会启用 zram ?
我在别处看了看,lz4 每秒能有 200W 次 IO ,算下来要 2.8 小时即可?
话说,这个和 Bloom Filter 相比,哪个出错概率小呢?


关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3120 人在线   最高记录 6679   ·     Select Language
创意工作者们的社区
World is powered by solitude
VERSION: 3.9.8.5 · 19ms · UTC 11:48 · PVG 19:48 · LAX 04:48 · JFK 07:48
Developed with CodeLauncher
♥ Do have faith in what you're doing.