您好,登錄后才能下訂單哦!
這篇“Rust多線程Web服務器怎么搭建”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Rust多線程Web服務器怎么搭建”文章吧。
在 socket 上監聽 TCP 連接
解析少量的 HTTP 請求
創建一個合適的 HTTP 響應
使用線程池改進服務器的吞吐量
優雅的停機和清理
注意:并不是最佳實踐
創建項目
~/rust ? cargo new hello Created binary (application) `hello` package ~/rust ?
main.rs 文件
use std::net::TcpListener; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); println!("Connection established!"); } }
修改一:
use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); println!("Request: {}", String::from_utf8_lossy(&buffer[..])); }
修改二:
use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); // 請求 // Method Request-URI HTTP-Version CRLF // headers CRLF // message-body // 響應 // HTTP-Version Status-Code Reason-Phrase CRLF // headers CRLF // message-body let response = "HTTP/1.1 200 OK\r\n\r\n"; stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); println!("Request: {}", String::from_utf8_lossy(&buffer[..])); }
修改三:
use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); // 請求 // Method Request-URI HTTP-Version CRLF // headers CRLF // message-body // 響應 // HTTP-Version Status-Code Reason-Phrase CRLF // headers CRLF // message-body let contents = fs::read_to_string("hello.html").unwrap(); let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
修改四:
use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; if buffer.starts_with(get) { let contents = fs::read_to_string("hello.html").unwrap(); let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); } else { let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n"; let contents = fs::read_to_string("404.html").unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); } }
修改五:
use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
hello.html 文件
<!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>Hello</title> </head> <body> <h2>Hello</h2> <p>Hi from Rust</p> </body> </html>
404.html 文件
<!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>Hello!</title> </head> <body> <h2>Oops!</h2> <p>Sorry, I don't know what you're asking for.</p> </body> </html>
單線程Web服務器
use std::fs; use std::thread; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; use std::time::Duration; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
開啟線程
use std::fs; use std::thread; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; use std::time::Duration; fn main() { let listener = TcpListener::bind("localhost:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
lib.rs 文件
use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, } impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // create some threads and store them in the vector } ThreadPool { threads } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } }
lib.rs 修改一
use std::thread; pub struct ThreadPool { workers: Vec<Worker>, } impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread } } }
lib.rs 修改二
use std::thread; use std::sync::mpsc; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } }
lib.rs
修改三
use std::thread; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } }
lib.rs
修改四
use std::thread; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // struct Job; type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {} got a job; executing.", id); (*job)(); }); Worker { id, thread } } }
lib.rs
修改五
use std::thread; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // struct Job; // type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<dyn FnBox + Send + 'static>; impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {} got a job; executing.", id); // (*job)(); job.call_box(); }); Worker { id, thread } } }
lib.rs
修改六
use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; use std::thread; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // struct Job; // type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<dyn FnBox + Send + 'static>; impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {} got a job; executing.", id); job.call_box(); } }); Worker { id, thread } } }
use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; use std::thread; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // struct Job; // type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap() } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<dyn FnBox + Send + 'static>; impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {} got a job; executing.", id); job.call_box(); } }); Worker { id, thread } } }
修改優化一:
use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; use std::thread; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // struct Job; // type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<dyn FnBox + Send + 'static>; impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {} got a job; executing.", id); job.call_box(); } }); Worker { id, thread: Some(thread), } } }
最終版 lib.rs 文件
use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; use std::thread; enum Message { NewJob(Job), Terminate, } pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Message>, } // struct Job; // type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(Message::NewJob(job)).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { println!("Sending terminate message to all workers."); for _ in &mut self.workers { self.sender.send(Message::Terminate).unwrap(); } println!("Shutting down all workers."); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<dyn FnBox + Send + 'static>; impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv().unwrap(); match message { Message::NewJob(job) => { println!("Worker {} got a job; executing.", id); job.call_box(); } Message::Terminate => { println!("Worker {} got a job; executing.", id); break; } } }); Worker { id, thread: Some(thread), } } }
最終版 main.rs 文件
use hello::ThreadPool; use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; use std::thread; use std::time::Duration; fn main() { let listener = TcpListener::bind("localhost:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
運行
hello on master [?] is ???? 0.1.0 via ???? 1.67.1 ? cargo run Compiling hello v0.1.0 (/Users/qiaopengjun/rust/hello) Finished dev [unoptimized + debuginfo] target(s) in 0.43s Running `target/debug/hello` Worker 0 got a job; executing. Shutting down. Sending terminate message to all workers. Shutting down all workers. Shutting down worker 0 Worker 1 got a job; executing. Worker 2 got a job; executing. Worker 3 got a job; executing. Worker 1 got a job; executing. Worker 0 got a job; executing. Shutting down worker 1 Shutting down worker 2 Shutting down worker 3 hello on master [?] is ???? 0.1.0 via ???? 1.67.1 took 21.9s ?
以上就是關于“Rust多線程Web服務器怎么搭建”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。