教程:用golang從零開始手寫一個bt下載客戶端(7)

背景

我們現在有了下載一個torrent的所有工具:一個包含peers的列表,可以和其他peer進行tcp通信,初始化一個握手協議,發送和接收消息。但是我們還面臨的最大的問題就是如何處理和peers交互的

併發問題,並且管理peers交互的狀態問題,這也是比較典型的兩個老大難的問題,通常使用隊列解決此類問題。

管理併發

在go語言中,我們通過通信共享內存,而不是通過共享內存通信,我們可以考慮使用channel作為輕量級隊列。

我們將設置兩個隊列來同步併發的任務,一個用來從peers下載分片內容,一個用來收集下載後的結果,最後把結果放入一個緩衝區,組裝成一個完整文件。

<code>// 初始化隊列
workQueue := make(chan *pieceWork, len(t.PieceHashs))
\tresults := make(chan *pieceResult, len(t.PieceHashs))

\tfor index, hash := range t.PieceHashs {
\t\tlength := t.calculatePieceSize(index)
\t\tworkQueue \t}
// 開啟下載任務
\tfor _, peer := range t.Peers {
\t\tgo t.startDownloadWorker(peer, workQueue, results)
\t}

// 收集結果\t
buf := make([]byte, t.Length)

\tdonePieces := 0
\tfor donePieces < len(t.PieceHashs) {
\t\tres := \t\tbegin, end := t.calculateBoundsForPiece(res.index)
\t\tcopy(buf[begin:end], res.buf)
\t\tdonePieces++
\t\tpercent := float64(donePieces) / float64(len(t.PieceHashs)) * 100
\t\tnumWorkers := runtime.NumGoroutine() - 1

\t\tlog.Printf("(%0.2f%%) Downloaded pieces #%d from %d peers\\n", percent, res.index, numWorkers)
\t}
\tclose(workQueue)/<code>
<code>func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) {
c, err := client.New(peer, t.PeerID, t.InfoHash)
if err != nil {
log.Printf("Could not handshake with %s. Disconnecting\\n", peer.IP)
return
}
defer c.Conn.Close()
log.Printf("Completed handshake with %s\\n", peer.IP)

c.SendUnchoke()
c.SendInterested()

for pw := range workQueue {
if !c.Bitfield.HasPiece(pw.index) {
workQueue continue
}

// 下載分片
buf, err := attemptDownloadPiece(c, pw)
if err != nil {
log.Println("Exiting", err)
workQueue return
}

err = checkIntegrity(pw, buf)
if err != nil {
log.Printf("Piece #%d failed integrity check\\n", pw.index)
workQueue continue
}

c.SendHave(pw.index)
results }
}/<code>

我們對每個peer建立了一個下載任務,任務從隊列workQueue裡面獲取任務信息並執行,把執行結果放入results隊列,下載之前先發送釋放阻塞的請求,下載完對比分片hash是否相同,完成後把結果放入results隊列中,並告訴peer我們已經有這個分片了。每當results隊列中有結果的時候就把它取出來放入buffer。

狀態管理

我們使用一個結構體來跟蹤peer的狀態,讀取消息後改變結構體狀態信息,它包括我們從peer下載了多少內容,請求了多少次,是否被阻塞了等。

<code>type pieceProgress struct {
index int
client *client.Client
buf []byte
downloaded int
requested int
backlog int
}

func (state *pieceProgress) readMessage() error {
msg, err := state.client.Read() // this call blocks
switch msg.ID {
case message.MsgUnchoke:
state.client.Choked = false
case message.MsgChoke:
state.client.Choked = true
case message.MsgHave:
index, err := message.ParseHave(msg)
state.client.Bitfield.SetPiece(index)
case message.MsgPiece:
n, err := message.ParsePiece(state.index, state.buf, msg)
state.downloaded += n
state.backlog--
}
return nil
}/<code>

發起請求

文件,分片,hash這些還不算完,我們需要把分片再分塊,一個分塊為分片內的索引index,offset,length,我們向peer請求數據實際上是請求塊,每個塊大小通常是16KB,一個256KB大小的分片實際上要請求16次peer。一個peer在收到超過16KB的數據時應該斷開連接,但根據經驗達到128KB也是完全能接受的,適當的增大塊能提升速度,只要遵守規範。

流水線

網絡往返代價是昂貴的,一個塊接一個塊順序下載是會顯著降低下載性能的,因此把請求流水線化是非常重要的,它能保持持續的請求壓力,使得吞吐量數量級的提升。

典型地,BitTorrent客戶端會使用隊列化的5條流水線請求,我發現提高這個數量會加倍提速,可以選擇合適的大小適應現代網絡速度,這個需要不斷調優才能優化性能。

<code>const MaxBlockSize = 16384

const MaxBacklog = 5

func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) {
state := pieceProgress{
index: pw.index,
client: c,
buf: make([]byte, pw.length),
}

// 設置超時防止卡死,30秒足夠下載256KB
c.Conn.SetDeadline(time.Now().Add(30 * time.Second))
defer c.Conn.SetDeadline(time.Time{}) // Disable the deadline

for state.downloaded < pw.length {
// 如果沒有阻塞就發送請求
if !state.client.Choked {
for state.backlog < MaxBacklog && state.requested < pw.length {
blockSize := MaxBlockSize
// 最後一個塊大小可能比較小
if pw.length-state.requested < blockSize {
blockSize = pw.length - state.requested
}

err := c.SendRequest(pw.index, state.requested, blockSize)
if err != nil {
return nil, err
}
state.backlog++
state.requested += blockSize
}
}

err := state.readMessage()
if err != nil {
return nil, err
}
}

return state.buf, nil
}/<code>

好,到目前為止,我們已經完成了整個客戶端,只需要在main.go中把這些組織起來,然後運行它!

下一篇我們將分享源代碼和一些參考文獻,敬請關注。


分享到:


相關文章: