cs144-lab1:
lab1 开始有那么点麻烦了…需要阅读理解封装好的模块。
配置环境:
记得做前将代码与之前 lab0 合并一下即可,至于 debug ,本地的所有测试都在 build 中,挨着测试即可。
由于握手环节需要 cmu 的内部网络,所以跳过, ip-raw.cc 我就没有去实现。
以下是 lab1 的实践内容:
作为实验作业的一部分,你正在实现一个 TCP 接收器:该模块接收数据报并将其转换为可靠的字节流,以便应用程序可以从套接字中读取——就像你的 webget 程序在检查点 0 中从 web 服务器读取字节流一样。
TCP 发送器将其字节流划分为短段(每个段不超过约 1,460 字节)以便它们可以放入数据报中。但是,网络可能会重新排序这些数据报,或者丢弃它们,或者多次投递它们。接收器必须将这些段重新组装成它们最初开始的连续字节流。
在这个实验室中,您将编写负责此重组的数据结构:一个重组器。它将接收子字符串,由字节字符串组成,以及该字符串中第一个字节的索引,该索引位于更大的流中。流中的每个字节都有自己的唯一索引,从零开始向上计数。一旦重组器知道流中的下一个字节,它就会将其写入 ByteStream 的写入端——与您在检查点 0 中实现的相同的 ByteStream。重组器的“客户”可以从同一 ByteStream 的读取端读取。
其实我们就是在这里是在实现 tcp 的重组部分,需要联系上一个 lab0 中的 byte_stream 内容。在此之前,或许回顾一下 tcp 的报文结构有助于我们实现 lab1。

而这里面与我们要实现的 Reassembler 相关联最大的两个字段就是
序列号 (Sequence Number)
- 位置:TCP 头部的前 32 位。
- 作用:序列号标识了数据段在整个数据流中的位置。发送端为每个数据段分配一个序列号,接收端根据序列号对数据进行排序和重组。如果数据包乱序到达,接收端会利用序列号重新排列它们。
- 重组相关性:这是重组的核心字段,确保数据按顺序拼接。
确认号 (Acknowledgment Number)
- 位置:TCP 头部的第 33-64 位。
- 作用:确认号表示接收端期望接收的下一个字节的序列号。它用于确认已经成功接收的数据,帮助发送端知道哪些数据需要重传。
- 重组相关性:通过确认号,接收端可以间接通知发送端是否存在丢失的数据包,从而辅助重组过程。
如果对于 tcp 不够熟悉,可以阅读这一篇 blog,或者看一看黑书的 tcp 部分也不错。
而我们则需要根据序列号与确认号来构建 Reassembler。
Reassembler:
因为我们的数据包有着对应的编号,也就是模拟我们之前提到的序列号,我们可以使用有序键值对类似 map 或者 set 来实现,我采用的是 map。除此之外,我们需要维护一个 next-index(这里的 next _index 即可理解为对应的确认号)来作为重叠以及是否丢包等的判断依据。
构造之前一定先阅读 byte_stream_helper ,明白 byte_stream 是如何调用内部派生的 writer 和 reader 接口的。理解 output_.writer() 和 output_.reader() 其实是转换出了对应的派生类。避免了多重继承复杂性,非常优雅,但是对于 c艹 掌握不好的人来说是一种痛苦…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| #include "byte_stream.hh"
class Reassembler { public: explicit Reassembler( ByteStream&& output ) : output_( std::move( output ) ), pending_data(), next_index(0), eof_index(0), eof_rec(false){}
void insert( uint64_t first_index, std::string data, bool is_last_substring );
uint64_t count_bytes_pending() const;
Reader& reader() { return output_.reader(); } const Reader& reader() const { return output_.reader(); }
const Writer& writer() const { return output_.writer(); }
private: ByteStream output_; std::map<uint64_t, std::string> pending_data; uint64_t next_index = 0; uint64_t eof_index = 0; bool eof_rec = false; };
|
对于重组器的设置,我们需要考虑到以下因素。
- 乱序: 缓存并排序。
- 丢失: 请求重传。
- 重复: 丢弃冗余。
- 重叠: 裁剪多余。
- 大小异常: 截断或丢弃。
- 中断: 停止重组。
不过在 lab1 中我们不用考虑这么全面,只需要考虑其中的一部分。
而在这里我们需要考虑的是5个值构建出来的区域:
1 2 3 4 5
| uint64_t first_index uint64_t len = data.size() uint64_t next_index uint64_t count = count_bytes_pending() uint64_t available = output_.writer().available_capacity()
|
实现:
分析实验指导书上给出的示意图:

于是整个 lab1 中象征存储的部分其实有三个,因为我们在 Reassembler 中并没有涉及到 pop 相关的操作:
1 2 3 4 5 6 7
| total_capacity = output_.writer().available_capacity() + output_.reader().bytes_buffered();
available_space = output_.writer().available_capacity();
pending_data
|
所以对于我们需要满足以上的所有逻辑,比如对于 available_space ,其实模拟的是我们的接收窗口 rwnd:
1 2 3 4
| if (available_space == 0) { return; }
|
按照道理来说,我们应该可以存储在 pending_data 中,但实际上,我们需要模仿 tcp 的流量控制,所以不能存储。
TCP流量控制原理
在TCP协议中,接收窗口(receive window)是一个关键概念:
- 接收窗口大小表示接收方当前愿意接收的数据量
- 当接收窗口为0时,发送方必须停止发送数据
- 只有当接收方处理了一些数据并增加了窗口大小,发送方才能继续发送
同样的,我们也需要遵循存储限制:
1 2 3 4
| if (new_index > next_index && new_index - next_index + len > total_capacity) { if (new_index - next_index >= total_capacity) { return; }
|
细节:
整个流中第一个字节的索引是多少?Zero.
指导书上简化了我们对于重组的处理,由于第一个传来的是0,而我们默认一定会将其推送出去。比较明显的是,next_index 应该在我们能连续处理数据时才更新。
之后的动作均是检查存储空间是否足够,足够便推送,不够就丢弃的思维来写,需要仔细考虑的部分就是重叠该如何处理,如果有很多重叠数据的话可能会造成很多内存浪费。
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
| #include "reassembler.hh" #include "debug.hh"
using namespace std;
void Reassembler::insert(uint64_t first_index, string data, bool is_last_substring) { debug("unimplemented insert({}, {}, {}) called", first_index, data, is_last_substring); if (data.empty()) { if (is_last_substring) { eof_rec = true; eof_index = first_index; if (next_index >= eof_index) { output_.writer().close(); } } return; }
uint64_t len = data.size(); if (is_last_substring) { eof_rec = true; eof_index = first_index + len; if (next_index >= eof_index) { output_.writer().close(); } }
uint64_t total_capacity = output_.writer().available_capacity() + output_.reader().bytes_buffered(); uint64_t available_space = output_.writer().available_capacity(); if (available_space == 0) { return; } uint64_t new_index = first_index; if (first_index < next_index) { if (first_index + len <= next_index) { return; } uint64_t offset = next_index - first_index; data = data.substr(offset); new_index = next_index; len = data.size(); } auto upper_bound = pending_data.upper_bound(new_index); if (upper_bound != pending_data.begin()) { auto prev = upper_bound; --prev; if (prev->first <= new_index && new_index < prev->first + prev->second.size()) { uint64_t overlap = prev->first + prev->second.size() - new_index; if (overlap >= len) { return; } data = data.substr(overlap); new_index += overlap; len = data.size(); } } auto it = pending_data.lower_bound(new_index); while (it != pending_data.end() && new_index + len > it->first) { if (it->first < new_index + len) { if (new_index + len >= it->first + it->second.size()) { pending_data.erase(it++); } else { len = it->first - new_index; data = data.substr(0, len); break; } } else { break; } } if (new_index > next_index && new_index - next_index + len > total_capacity) { if (new_index - next_index >= total_capacity) { return; } uint64_t available = total_capacity - (new_index - next_index); data = data.substr(0, available); len = data.size(); }
if (new_index == next_index) { if (available_space >= len) { output_.writer().push(data); next_index += len; } else { string write_data = data.substr(0, available_space); output_.writer().push(write_data); next_index += available_space; } bool found_next = true; while (!pending_data.empty() && found_next && output_.writer().available_capacity() > 0) { found_next = false; auto next_it = pending_data.find(next_index); if (next_it != pending_data.end()) { uint64_t avail = output_.writer().available_capacity(); string pending_data_str = next_it->second; if (avail >= pending_data_str.size()) { output_.writer().push(pending_data_str); next_index += pending_data_str.size(); pending_data.erase(next_it); found_next = true; } else if (avail > 0) { string write_part = pending_data_str.substr(0, avail); output_.writer().push(write_part); next_index += avail; pending_data.erase(next_it); break; } else { break; } } } } else if (!data.empty() && new_index > next_index) { if (new_index - next_index + len <= total_capacity && available_space > 0) { pending_data[new_index] = data; } } if (eof_rec && next_index >= eof_index) { output_.writer().close(); } return; }
uint64_t Reassembler::count_bytes_pending() const { debug("unimplemented count_bytes_pending() called"); uint64_t count = 0; for (const auto& pair : pending_data) { count += pair.second.size(); } return count; }
|
测试结果:
可以看到测试还算不错。不过由于没有做模块化处理,可能会显得有些赘余,俗称💩山。
