用Rust實現一個多線程的web server

在本文之前,我們用Rust實現一個單線程的web server的例子,但是單線程的web server不夠高效,所以本篇文章就來實現一個多線程的例子。

單線程web server存在的問題

請求只能串行處理,也就是說當第一個連結處理完之前不會處理第二個連結。考慮如下例子:

<code>use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::fs;
use std::{thread, time};

fn handle_client(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\\r\\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\\r\\n\\r\\n", "main.html")
} else {
("HTTP/1.1 404 NOT FOUND\\r\\n\\r\\n", "404.html")
};

let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}", status_line, contents);

stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();

let ten_millis = time::Duration::from_millis(10000);
thread::sleep(ten_millis);\t\t\t\t//睡眠一段時間,模擬處理時間很長
}

fn main() -> std::io::Result {
let listener = TcpListener::bind("127.0.0.1:8080")? ;

for stream in listener.incoming() {
handle_client(stream?);
}
Ok(())
}/<code>

在瀏覽器中打開兩個窗口,分別輸入127.0.0.1:8080,會發現在第一個處理完之前,第二個不會響應。

使用多線程來解決問題

  • 解決方式

修改main函數代碼:

<code>fn main() -> std::io::Result {
let listener = TcpListener::bind("127.0.0.1:8080")?;
let mut thread_vec: Vec<:joinhandle>> = Vec::new();

for stream in listener.incoming() {
// handle_client(stream?);
let stream = stream.unwrap();
let handle = thread::spawn(|| {
handle_client(stream);
});
thread_vec.push(handle);
}

for handle in thread_vec {
handle.join().unwrap();
}

Ok(())
}/<code>

從瀏覽器打開兩個標籤,進行測試,可以發現第一個沒有處理完之前,第二個請求已經開始處理。

  • 存在問題

當存在海量請求時,系統也會跟著創建海量的線程,最終造成系統崩潰。

使用線程池來解決問題

  • 線程池


用Rust實現一個多線程的web server

  • 知識點

多線程、管道。

從主線程將任務發送到管道,工作線程等待在管道的接收端,當收到任務時,進行處理。

線程池方式實現

1、初步設計

  • 定義ThreadPool結構
<code>use std::thread;
pub struct ThreadPool {
\t\t\t\tthread: Vec<:joinhandle>>,
}/<code>
  • 定義ThreadPool的方法
<code>impl ThreadPool {
\tpub fn new(size: usize) -> ThreadPool {
\t\t//--snip--
\t}
\t
\tpub fn execute()
\t//pub fn execute(&self, f: F)
// where
// F: FnOnce() + Send + 'static
{
\t\t//--snip--

}
}
/<code>
  • 下面我們考慮new函數,可能的實現是這樣
<code>pub fn new(size: usize) -> ThreadPool {
\tassert!(size > 0);
\tlet mut threads = Vec::with_capacity(size);
\tfor _ in 0..size {
\t\t//創建線程:
\t\t//問題來了,創建線程的時候需要傳入閉包,也就是具體做的動作,
\t\t//可是這個時候我們還沒有具體的任務,怎麼辦?
\t}
\t\t
\tThreadPool {
\t\tthreads
\t}
}/<code>
  • execute函數
<code>//設計execute的函數,可以參考thread::spawn
pub fn execute(&self, f: F)
where
F: FnOnce() + Send + 'static
{

}
/<code>

初步設計的問題總結:

主要是在創建線程池的new函數中,需要傳入具體的任務,可是此時還沒有具體的任務,如何解決?

2、解決線程創建的問題

  • 重新定義ThreadPool結構體
<code>pub struct ThreadPool {
workers: Vec<worker>,
}/<worker>/<code>
  • ThreadPool的new方法
<code>pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let mut workers = Vec::with_capacity(size);

for id in 0..size {
workers.push(Worker::new(id));
}

ThreadPool {
workers
}
}/<code>
  • 在worker中創建線程
<code>struct Worker {
id: usize,
thread: thread::JoinHandle,
}

impl Worker {
fn new(id: usize) -> Worker {

let thread = thread::spawn(|| {});

Worker {
id,
thread,
}
}
}/<code>

3、發送任務

  • 進一步將ThreadPool結構設計為
<code>use std::sync::mpsc;

pub struct ThreadPool {
workers: Vec<worker>,
sender: mpsc::Sender,
}

struct Job;
/<worker>/<code>
  • 完善new方法
<code>impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let (sender, receiver) = mpsc::channel();//add
let mut workers = Vec::with_capacity(size);

for id in 0..size {
//workers.push(Worker::new(id));
workers.push(Worker::new(id, receiver));
}

ThreadPool {
workers,
sender,//add
}

}
// --snip--
}

//--snip--

impl Worker {
fn new(id: usize, receiver: mpsc::Receiver) -> Worker {
let thread = thread::spawn(|| {
receiver;
});

Worker {
id,
thread,
}
}
}
/<code>

此段代碼錯誤,因為receiver要在線程間傳遞,但是是非線程安全的。因此應該使用Arc<mutex>>。重新撰寫new方法如下:/<mutex>

<code>use std::sync::Arc;
use std::sync::Mutex;
// --snip--

impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);

let (sender, receiver) = mpsc::channel();

let receiver = Arc::new(Mutex::new(receiver));//add

let mut workers = Vec::with_capacity(size);

for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}

ThreadPool {
workers,
sender,
}

}

// --snip--
}

impl Worker {
fn new(id: usize, receiver: Arc<mutex>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();

println!("Worker {} got a job; executing.", id);

job();
}
});

Worker {
id,
thread,
}
}
}/<mutex>/<code>
  • 實現execute方法
<code>type Job = Box;//修改Job為trait對象的類別名稱

impl ThreadPool {
// --snip--

pub fn execute(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);

self.sender.send(job).unwrap();
}
}
/<code>

完整代碼

src/main.rs

<code>use std::fs;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::{thread, time};
use mylib::ThreadPool;

fn handle_client(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\\r\\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\\r\\n\\r\\n", "main.html")
} else {
("HTTP/1.1 404 NOT FOUND\\r\\n\\r\\n", "404.html")
};

let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}", status_line, contents);

stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();

let ten_millis = time::Duration::from_millis(10000);
thread::sleep(ten_millis);
}

fn main() -> std::io::Result {
let listener = TcpListener::bind("127.0.0.1:8080")?;
// let mut thread_vec: Vec<:joinhandle>> = Vec::new();

let pool = ThreadPool::new(4);


for stream in listener.incoming() {
// // handle_client(stream?);
let stream = stream.unwrap();
// let handle = thread::spawn(|| {
// handle_client(stream);
// });
// thread_vec.push(handle);

pool.execute(|| {
handle_client(stream);
});
}

// for handle in thread_vec {

// handle.join().unwrap();
// }

Ok(())
}/<code>

src/mylib/lib.rs

<code>use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;

struct Worker {
id: usize,
thread: thread::JoinHandle,
}

impl Worker {
// fn new(id: usize) -> Worker {
// let thread = thread::spawn(|| {});

// Worker {
// id,
// thread,
// }
// }

// fn new(id: usize, receiver: mpsc::Receiver) -> Worker {
// let thread = thread::spawn(|| {
// receiver;
// });

// Worker {
// id,
// thread,
// }
// }

fn new(id: usize, receiver: Arc<mutex>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();

println!("Worker {} got a job; executing.", id);

job();
}

});

Worker {
id,
thread,
}
}
}

pub struct ThreadPool {
workers: Vec<worker>,
sender: mpsc::Sender,
}

// struct Job;
type Job = Box;//修改Job為trait對象的類別名稱

impl ThreadPool {
\tpub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
// let mut threads = Vec::with_capacity(size);
// for _ in 0..size {
// //創建線程:
// //問題來了,創建線程的時候需要傳入閉包,也就是具體做的動作,
// //可是這個時候我們還沒有具體的任務,怎麼辦?
// }

// ThreadPool {
// threads
// }

let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);

for id in 0..size {
//workers.push(Worker::new(id));
//workers.push(Worker::new(id, receiver));
workers.push(Worker::new(id, Arc::clone(&receiver)));
}

ThreadPool {

workers,
sender,
}
\t}
\t
pub fn execute(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);

self.sender.send(job).unwrap();
}
}
/<worker>/<mutex>
/<code>

在main的Cargo.toml添加如下依賴:

<code>[dependencies]
mylib = {path = "./mylib"}/<code>

當前版本存在的問題

線程池中的線程怎麼結束?

想知道如何解決這個問題,請關注令狐一衝,下回為您分解。


分享到:


相關文章: