教程:用golang从零开始手写一个bt下载客户端(7)

背景

我们现在有了下载一个torrent的所有工具:一个包含peers的列表,可以和其他peer进行tcp通信,初始化一个握手协议,发送和接收消息。但是我们还面临的最大的问题就是如何处理和peers交互的

并发问题,并且管理peers交互的状态问题,这也是比较典型的两个老大难的问题,通常使用队列解决此类问题。

管理并发

在go语言中,我们通过通信共享内存,而不是通过共享内存通信,我们可以考虑使用channel作为轻量级队列。

我们将设置两个队列来同步并发的任务,一个用来从peers下载分片内容,一个用来收集下载后的结果,最后把结果放入一个缓冲区,组装成一个完整文件。

<code>// 初始化队列
workQueue := make(chan *pieceWork, len(t.PieceHashs))
\tresults := make(chan *pieceResult, len(t.PieceHashs))

\tfor index, hash := range t.PieceHashs {
\t\tlength := t.calculatePieceSize(index)
\t\tworkQueue \t}
// 开启下载任务
\tfor _, peer := range t.Peers {
\t\tgo t.startDownloadWorker(peer, workQueue, results)
\t}

// 收集结果\t
buf := make([]byte, t.Length)

\tdonePieces := 0
\tfor donePieces < len(t.PieceHashs) {
\t\tres := \t\tbegin, end := t.calculateBoundsForPiece(res.index)
\t\tcopy(buf[begin:end], res.buf)
\t\tdonePieces++
\t\tpercent := float64(donePieces) / float64(len(t.PieceHashs)) * 100
\t\tnumWorkers := runtime.NumGoroutine() - 1

\t\tlog.Printf("(%0.2f%%) Downloaded pieces #%d from %d peers\\n", percent, res.index, numWorkers)
\t}
\tclose(workQueue)/<code>
<code>func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) {
c, err := client.New(peer, t.PeerID, t.InfoHash)
if err != nil {
log.Printf("Could not handshake with %s. Disconnecting\\n", peer.IP)
return
}
defer c.Conn.Close()
log.Printf("Completed handshake with %s\\n", peer.IP)

c.SendUnchoke()
c.SendInterested()

for pw := range workQueue {
if !c.Bitfield.HasPiece(pw.index) {
workQueue continue
}

// 下载分片
buf, err := attemptDownloadPiece(c, pw)
if err != nil {
log.Println("Exiting", err)
workQueue return
}

err = checkIntegrity(pw, buf)
if err != nil {
log.Printf("Piece #%d failed integrity check\\n", pw.index)
workQueue continue
}

c.SendHave(pw.index)
results }
}/<code>

我们对每个peer建立了一个下载任务,任务从队列workQueue里面获取任务信息并执行,把执行结果放入results队列,下载之前先发送释放阻塞的请求,下载完对比分片hash是否相同,完成后把结果放入results队列中,并告诉peer我们已经有这个分片了。每当results队列中有结果的时候就把它取出来放入buffer。

状态管理

我们使用一个结构体来跟踪peer的状态,读取消息后改变结构体状态信息,它包括我们从peer下载了多少内容,请求了多少次,是否被阻塞了等。

<code>type pieceProgress struct {
index int
client *client.Client
buf []byte
downloaded int
requested int
backlog int
}

func (state *pieceProgress) readMessage() error {
msg, err := state.client.Read() // this call blocks
switch msg.ID {
case message.MsgUnchoke:
state.client.Choked = false
case message.MsgChoke:
state.client.Choked = true
case message.MsgHave:
index, err := message.ParseHave(msg)
state.client.Bitfield.SetPiece(index)
case message.MsgPiece:
n, err := message.ParsePiece(state.index, state.buf, msg)
state.downloaded += n
state.backlog--
}
return nil
}/<code>

发起请求

文件,分片,hash这些还不算完,我们需要把分片再分块,一个分块为分片内的索引index,offset,length,我们向peer请求数据实际上是请求块,每个块大小通常是16KB,一个256KB大小的分片实际上要请求16次peer。一个peer在收到超过16KB的数据时应该断开连接,但根据经验达到128KB也是完全能接受的,适当的增大块能提升速度,只要遵守规范。

流水线

网络往返代价是昂贵的,一个块接一个块顺序下载是会显著降低下载性能的,因此把请求流水线化是非常重要的,它能保持持续的请求压力,使得吞吐量数量级的提升。

典型地,BitTorrent客户端会使用队列化的5条流水线请求,我发现提高这个数量会加倍提速,可以选择合适的大小适应现代网络速度,这个需要不断调优才能优化性能。

<code>const MaxBlockSize = 16384

const MaxBacklog = 5

func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) {
state := pieceProgress{
index: pw.index,
client: c,
buf: make([]byte, pw.length),
}

// 设置超时防止卡死,30秒足够下载256KB
c.Conn.SetDeadline(time.Now().Add(30 * time.Second))
defer c.Conn.SetDeadline(time.Time{}) // Disable the deadline

for state.downloaded < pw.length {
// 如果没有阻塞就发送请求
if !state.client.Choked {
for state.backlog < MaxBacklog && state.requested < pw.length {
blockSize := MaxBlockSize
// 最后一个块大小可能比较小
if pw.length-state.requested < blockSize {
blockSize = pw.length - state.requested
}

err := c.SendRequest(pw.index, state.requested, blockSize)
if err != nil {
return nil, err
}
state.backlog++
state.requested += blockSize
}
}

err := state.readMessage()
if err != nil {
return nil, err
}
}

return state.buf, nil
}/<code>

好,到目前为止,我们已经完成了整个客户端,只需要在main.go中把这些组织起来,然后运行它!

下一篇我们将分享源代码和一些参考文献,敬请关注。


分享到:


相關文章: