CS144-lab1

2025年03月31日
15 min read
cs144

cs144-lab1:

lab1 开始有那么点麻烦了...需要阅读理解封装好的模块。

配置环境:

记得做前将代码与之前 lab0 合并一下即可,至于 debug ,本地的所有测试都在 build 中,挨着测试即可。

由于握手环节需要 cmu 的内部网络,所以跳过, ip-raw.cc 我就没有去实现。

以下是 lab1 的实践内容:

作为实验作业的一部分,你正在实现一个 TCP 接收器:该模块接收数据报并将其转换为可靠的字节流,以便应用程序可以从套接字中读取——就像你的 webget 程序在检查点 0 中从 web 服务器读取字节流一样。

TCP 发送器将其字节流划分为短段(每个段不超过约 1,460 字节)以便它们可以放入数据报中。但是,网络可能会重新排序这些数据报,或者丢弃它们,或者多次投递它们。接收器必须将这些段重新组装成它们最初开始的连续字节流。

在这个实验室中,您将编写负责此重组的数据结构:一个重组器。它将接收子字符串,由字节字符串组成,以及该字符串中第一个字节的索引,该索引位于更大的流中。流中的每个字节都有自己的唯一索引,从零开始向上计数。一旦重组器知道流中的下一个字节,它就会将其写入 ByteStream 的写入端——与您在检查点 0 中实现的相同的 ByteStream。重组器的“客户”可以从同一 ByteStream 的读取端读取。

其实我们就是在这里是在实现 tcp 的重组部分,需要联系上一个 lab0 中的 byte_stream 内容。在此之前,或许回顾一下 tcp 的报文结构有助于我们实现 lab1。

tcp报文结构
tcp报文结构

而这里面与我们要实现的 Reassembler 相关联最大的两个字段就是

序列号 (Sequence Number)

  • 位置:TCP 头部的前 32 位。
  • 作用:序列号标识了数据段在整个数据流中的位置。发送端为每个数据段分配一个序列号,接收端根据序列号对数据进行排序和重组。如果数据包乱序到达,接收端会利用序列号重新排列它们。
  • 重组相关性:这是重组的核心字段,确保数据按顺序拼接。

确认号 (Acknowledgment Number)

  • 位置:TCP 头部的第 33-64 位。
  • 作用:确认号表示接收端期望接收的下一个字节的序列号。它用于确认已经成功接收的数据,帮助发送端知道哪些数据需要重传。
  • 重组相关性:通过确认号,接收端可以间接通知发送端是否存在丢失的数据包,从而辅助重组过程。

如果对于 tcp 不够熟悉,可以阅读这一篇 blog,或者看一看黑书的 tcp 部分也不错。

而我们则需要根据序列号与确认号来构建 Reassembler。


Reassembler:

因为我们的数据包有着对应的编号,也就是模拟我们之前提到的序列号,我们可以使用有序键值对类似 map 或者 set 来实现,我采用的是 map。除此之外,我们需要维护一个 next-index(这里的 next _index 即可理解为对应的确认号)来作为重叠以及是否丢包等的判断依据。(做完 lab2 发现这样理解似乎有点问题,但在做 lab1 本身时也能凑合...)

构造之前一定先阅读 byte_stream_helper ,明白 byte_stream 是如何调用内部派生的 writer 和 reader 接口的。理解 output_.writer() 和 output_.reader() 其实是转换出了对应的派生类。避免了多重继承复杂性,非常优雅,但是对于 c艹 掌握不好的人来说是一种痛苦...

#include "byte_stream.hh"

class Reassembler
{
public:
  // 构建Reassembler以写入给定的ByteStream。
  explicit Reassembler( ByteStream&& output ) : 
  output_( std::move( output ) ),
  pending_data(),
  next_index(0),
  eof_index(0),
  eof_rec(false){}

  /*
   * 插入一个新的子字符串以重新组装到ByteStream中。
   *   `first_index`: 子字符串的第一个字节的索引
   *   `data`: 子字符串本身
   *   `is_last_substring`: 此子字符串表示流的结束
   *   `output`: 对Writer的可变引用
   *
   * Reassembler的任务是将索引的子字符串(可能是乱序或重叠的)重新组装回原始的ByteStream。
   * 一旦Reassembler知道流中的下一个字节,它应该立即将其写入输出。
   *
   * 如果Reassembler了解到字节适合流的可用容量但还不能写入(因为前面的字节仍然未知),
   * 它应该将它们存储在内部,直到填补了空缺。
   *
   * Reassembler应该丢弃任何超出流可用容量的字节
   * (即即使前面的空缺被填补也无法写入的字节)。
   *
   * Reassembler在写入最后一个字节后应关闭流。
   */
  void insert( uint64_t first_index, std::string data, bool is_last_substring );

  // Reassembler本身存储了多少字节?
  // 此函数仅用于测试;不要为了支持它而添加额外的状态。
  uint64_t count_bytes_pending() const;

  // 访问输出流读取器
  Reader& reader() { return output_.reader(); }
  const Reader& reader() const { return output_.reader(); }

  // 访问输出流写入器,但仅为const(不能从外部写入)
  const Writer& writer() const { return output_.writer(); }

private:
  ByteStream output_;
  std::map<uint64_t, std::string>     pending_data;// 存储数据的映射
  uint64_t next_index = 0; // 下一个索引
  uint64_t eof_index = 0;
  bool eof_rec = false;
};

对于重组器的设置,我们需要考虑到以下因素。

  • 乱序: 缓存并排序。
  • 丢失: 请求重传。
  • 重复: 丢弃冗余。
  • 重叠: 裁剪多余。
  • 大小异常: 截断或丢弃。
  • 中断: 停止重组。

不过在 lab1 中我们不用考虑这么全面,只需要考虑其中的一部分。

而在这里我们需要考虑的是5个值构建出来的区域:

uint64_t first_index;
uint64_t len = data.size();
uint64_t next_index;
uint64_t count = count_bytes_pending();
uint64_t available = output_.writer().available_capacity();

实现:

分析实验指导书上给出的示意图:

存储结构
存储结构

于是整个 lab1 中象征存储的部分其实有三个,因为我们在 Reassembler 中并没有涉及到 pop 相关的操作:

//两个依据 bytestream 中定义构建出来的数据
total_capacity = output_.writer().available_capacity() + output_.reader().bytes_buffered();

available_space = output_.writer().available_capacity();

//以及我们定义的 pending_data 用来存储还没 push 到 writer 中的值
pending_data

所以对于我们需要满足以上的所有逻辑,比如对于 available_space ,其实模拟的是我们的接收窗口 rwnd:

 // 如果缓冲区已满,不存储任何未来数据,直接返回
  if (available_space == 0) {
    return; 
 }

按照道理来说,我们应该可以存储在 pending_data 中,但实际上,我们需要模仿 tcp 的流量控制,所以不能存储。

TCP流量控制原理

在TCP协议中,接收窗口(receive window)是一个关键概念:

  • 接收窗口大小表示接收方当前愿意接收的数据量
  • 当接收窗口为0时,发送方必须停止发送数据
  • 只有当接收方处理了一些数据并增加了窗口大小,发送方才能继续发送

同样的,我们也需要遵循存储限制:

if (new_index > next_index && new_index - next_index + len > total_capacity) {
    if (new_index - next_index >= total_capacity) {
      return;  // 确实超出范围才返回
}

细节:

整个流中第一个字节的索引是多少?Zero.

指导书上简化了我们对于重组的处理,由于第一个传来的是0,而我们默认一定会将其推送出去。比较明显的是,next_index 应该在我们能连续处理数据时才更新。

之后的动作均是检查存储空间是否足够,足够便推送,不够就丢弃的思维来写,需要仔细考虑的部分就是重叠该如何处理,如果有很多重叠数据的话可能会造成很多内存浪费。

完整代码如下:

#include "reassembler.hh"
#include "debug.hh"

using namespace std;

void Reassembler::insert(uint64_t first_index, string data, bool is_last_substring)
{
  debug("unimplemented insert({}, {}, {}) called", first_index, data, is_last_substring);
  // 先检查索引是否在合理范围内
  // 如果first_index大于当前索引加上可用容量,直接丢弃
  if (first_index > next_index + output_.writer().available_capacity()) {
    // 只处理is_last_substring标志
    if (is_last_substring) {
      eof_rec = true;
      eof_index = first_index + data.size();
      
      if (next_index >= eof_index) {
        output_.writer().close();
      }
    }
    return; // 直接返回,不存储数据
  }
  // note: 一定先判断发过来的字节流类型,以及是否是最后一串字节流
  if (data.empty()) {
    if (is_last_substring) {
      eof_rec = true;
      eof_index = first_index;
      
      if (next_index >= eof_index) {
        output_.writer().close();
      }
    }
    return;
  }

  uint64_t len = data.size();
  
  // 处理EOF标记
  if (is_last_substring) {
    eof_rec = true;
    eof_index = first_index + len;
    
    if (next_index >= eof_index) {
      output_.writer().close();
    }
  }

  // 获取可处理的容量限制
  uint64_t total_capacity = output_.writer().available_capacity() + output_.reader().bytes_buffered();
  uint64_t available_space = output_.writer().available_capacity();
  
  // 如果缓冲区已满,不存储任何未来数据,直接返回
  if (available_space == 0) {
    return; 
  }
  /* note:数据重叠处理是最复杂的部分,因为都避免重复状态,所以都细致思考*/
  // 处理数据重叠部分 - 先处理与前面数据的重叠
  uint64_t new_index = first_index;
  
  // 如果起始位置在已处理数据之前,调整起始位置
  if (first_index < next_index) {
    // 完全重叠,直接丢弃
    if (first_index + len <= next_index) {
      return;
    }
    // 部分重叠,截取未处理部分
    uint64_t offset = next_index - first_index;
    data = data.substr(offset);
    new_index = next_index;
    len = data.size();
  }
  
  // 检查重叠情况 - 与pending_data中已有数据比较
  auto upper_bound = pending_data.upper_bound(new_index);
  if (upper_bound != pending_data.begin()) {
    auto prev = upper_bound;
    --prev;
    
    // 检查与前一个片段的重叠
    if (prev->first <= new_index && new_index < prev->first + prev->second.size()) {
      // 有重叠,调整起始位置
      uint64_t overlap = prev->first + prev->second.size() - new_index;
      if (overlap >= len) {
        // 完全被包含,丢弃
        return;
      }
      data = data.substr(overlap);
      new_index += overlap;
      len = data.size();
    }
  }
  
  // 处理与后续片段的重叠
  auto it = pending_data.lower_bound(new_index);
  while (it != pending_data.end() && new_index + len > it->first) {
    if (it->first < new_index + len) {
      if (new_index + len >= it->first + it->second.size()) {
        // 当前数据完全覆盖了这个片段
        pending_data.erase(it++);
      } else {
        // 部分重叠,截断当前数据
        len = it->first - new_index;
        data = data.substr(0, len);
        break;
      }
    } else {
      break;
    }
  }
  
  // 检查是否超出总容量限制
  if (new_index > next_index && new_index - next_index + len > total_capacity) {
    if (new_index - next_index >= total_capacity) {
      return;  // 确实超出范围才返回
    }
    uint64_t available = total_capacity - (new_index - next_index);
    data = data.substr(0, available);
    len = data.size();
  }

  // 处理数据写入
  if (new_index == next_index) {
    // 检查可用容量
    if (available_space >= len) {
      // 有足够容量,直接写入
      output_.writer().push(data);
      next_index += len;
    } else {
      // 容量不足,只写入能写入的部分,剩余部分丢弃
      string write_data = data.substr(0, available_space);
      output_.writer().push(write_data);
      next_index += available_space;
      // 不存储剩余部分到pending_data
    }
    
    // 查找并处理后续连续片段
    bool found_next = true;
    while (!pending_data.empty() && found_next && output_.writer().available_capacity() > 0) {
      found_next = false;
      auto next_it = pending_data.find(next_index);
      if (next_it != pending_data.end()) {
        // 检查可用容量
        uint64_t avail = output_.writer().available_capacity();
        string pending_data_str = next_it->second;
        
        if (avail >= pending_data_str.size()) {
          // 能全部写入
          output_.writer().push(pending_data_str);
          next_index += pending_data_str.size();
          pending_data.erase(next_it);
          found_next = true;
        } else if (avail > 0) {
          // 只能部分写入,剩余部分丢弃
          string write_part = pending_data_str.substr(0, avail);
          output_.writer().push(write_part);
          next_index += avail;
          pending_data.erase(next_it);
          // 不存储剩余部分
          break; 
        } else {
          break;
        }
      }
    }
  } else if (!data.empty() && new_index > next_index) {
    // 确保new_index与next_index之间的距离加上数据长度不超过总容量
    if (new_index - next_index + len <= total_capacity && available_space > 0) {
      // 存入pending_data
      pending_data[new_index] = data;
    }
    // 否则丢弃数据
  }
  
  // 检查是否需要关闭流
  if (eof_rec && next_index >= eof_index) {
    output_.writer().close();
  }
  return;
}

uint64_t Reassembler::count_bytes_pending() const
{
  debug("unimplemented count_bytes_pending() called");
  uint64_t count = 0;
  for (const auto& pair : pending_data) {
    count += pair.second.size();
  }
  return count;
}

测试结果:

可以看到测试还算不错。不过由于没有做模块化处理,可能会显得有些赘余,俗称💩山。

测试结果
测试结果

留下你的想法 💬

已开启评论审核