diff --git a/Cargo.lock b/Cargo.lock index fbdfb51..fad3ade 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -127,6 +127,7 @@ dependencies = [ name = "coffer-server" version = "0.2.0" dependencies = [ + "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "coffer-common 0.1.0", "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -137,7 +138,7 @@ dependencies = [ "serde_cbor 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "sodiumoxide 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "structopt 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -780,7 +781,7 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1009,7 +1010,7 @@ dependencies = [ "checksum termcolor 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "96d6098003bde162e4277c70665bd87c326f5a0c3f3fbfb285787fa482d54e6e" "checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" "checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b" -"checksum tokio 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "a9d5acfe1b1130d50ac2286a2f1f8cf49309680366ceb7609ce369b75c9058d4" +"checksum tokio 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ffa2fdcfa937b20cb3c822a635ceecd5fc1a27a6a474527e5516aa24b8c8820a" "checksum tokio-macros 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "50a61f268a3db2acee8dcab514efc813dc6dbe8a00e86076f935f94304b59a7a" "checksum toml 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "01d1404644c8b12b16bfcffa4322403a91a451584daaaa7c28d3152e6cbc98cf" "checksum unicode-segmentation 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0" diff --git a/Design.org b/Design.org index 298b9aa..f26d61f 100644 --- a/Design.org +++ b/Design.org @@ -15,7 +15,8 @@ | 3 | Get | Coffer (sealed) | C -> S | OkGet | Retrieve a ~Coffer~ for the client | | 4 | OkPut | | S -> C | Put, Get | ~Coffer~ was successfully merged | | 5 | OkGet | Coffer (sealed) | S -> C | Put, Get | Return a sealed ~Coffer~ for a ~Get~ request | - | 128 | Error | | S -> C | | Generic server error | + | 63 | Bye | | C -> S | | Close connection | + | 127 | Error | | S -> C | | Generic server error | - Error can be returned at any stage - Communication can end at any stage. Communication ends when connection is closed by either side. diff --git a/coffer-common/src/coffer.rs b/coffer-common/src/coffer.rs index 1dedeb6..6605950 100644 --- a/coffer-common/src/coffer.rs +++ b/coffer-common/src/coffer.rs @@ -33,7 +33,7 @@ pub enum CofferValue { /// A path to a value #[derive(Clone, Eq, PartialEq, Hash, Debug)] -pub struct CofferPath(Vec); +pub struct CofferPath(pub Vec); /// Interface for interacting with a `Coffer` pub trait Coffer { diff --git a/coffer-server/Cargo.toml b/coffer-server/Cargo.toml index ed334d6..01fece2 100644 --- a/coffer-server/Cargo.toml +++ b/coffer-server/Cargo.toml @@ -15,9 +15,10 @@ lazy_static = "^1.4" # Key management/Cryptography sodiumoxide = "^0.2" # Communication -tokio = { version="^0.2.8", features = ["full"]} +tokio = { version="^0.2.9", features = ["full"]} serde = { version = "^1.0", features = ["derive"]} serde_cbor = "^0.10.2" futures = { version = "0.3.1", features = ["thread-pool"]} +bytes = "^0.5" coffer-common = { path = "../coffer-common" } \ No newline at end of file diff --git a/coffer-server/src/command_parser.rs b/coffer-server/src/command_parser.rs new file mode 100644 index 0000000..0bc663d --- /dev/null +++ b/coffer-server/src/command_parser.rs @@ -0,0 +1,36 @@ +#[allow(unused_imports)] +use log::{debug, error, info, trace, warn}; + +use tokio::io::AsyncRead; + +use quick_error::quick_error; + +quick_error! { + #[derive(Debug)] + pub enum CommandParserError { + Msg(err: &'static str) { + from(err) + display("{}", err) + } + Other(err: Box) { + cause(&**err) + } + } +} + +enum Command { + None +} + +struct CommandParser +where T: AsyncRead { + reader: T +} + +impl Stream for CommandParser { + type Item = Command; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + Poll::Ready(Some(Command::None)) + } +} diff --git a/coffer-server/src/main.rs b/coffer-server/src/main.rs index 3862793..18f9272 100644 --- a/coffer-server/src/main.rs +++ b/coffer-server/src/main.rs @@ -12,6 +12,7 @@ use coffer_common::keyring::Keyring; mod server; mod coffer_map; +mod protocol; use server::ServerBuilder; use coffer_map::CofferMap; diff --git a/coffer-server/src/protocol.rs b/coffer-server/src/protocol.rs new file mode 100644 index 0000000..ea91490 --- /dev/null +++ b/coffer-server/src/protocol.rs @@ -0,0 +1,225 @@ +#[allow(unused_imports)] +use log::{debug, error, info, trace, warn}; + +use std::sync::Arc; +use std::convert::TryInto; +use std::convert::TryFrom; +use std::net::Shutdown; + +use bytes::BytesMut; + +use tokio::prelude::*; +use tokio::net::TcpStream; +use tokio::sync::RwLock; + +use quick_error::quick_error; + +use coffer_common::coffer::Coffer; +use coffer_common::coffer::CofferValue; +use coffer_common::coffer::CofferPath; +use coffer_common::keyring::Keyring; + +quick_error! { + #[derive(Debug)] + pub enum ProtocolError { + Msg(err: &'static str) { + from(err) + display("{}", err) + } + Other(err: Box) { + cause(&**err) + } + } +} + +#[derive(Debug, PartialEq, Eq)] +enum State { + Start, + Link, + Error, + End +} + +#[derive(Debug)] +enum Request { + Hello, + Put(Vec), + Get(Vec), + Bye, + Error +} + +enum Response { + OkGet(Vec) +} + +pub struct Protocol +where C: Coffer +{ + stream: TcpStream, + coffer: Arc>, + keyring: Arc>, + state: State +} + +impl Protocol +where C: Coffer +{ + pub fn new( + stream: TcpStream, + coffer: Arc>, + keyring: Arc> + ) -> Protocol + { + + let state = State::Start; + Protocol {stream, coffer, keyring, state} + } + + pub async fn run(mut self) { + while self.state != State::End + { + debug!{"In state: {:?}", self.state} + let event = self.event().await; + self.transit(event).await; + } + + self.stream.shutdown(Shutdown::Both).unwrap(); + } + + async fn event(&mut self) -> Request { + let (mut reader, _writer) = self.stream.split(); + + // TODO restrict msg_size more, otherwise bad client could bring server + // to allocate vast amounts of memory + let (msg_size, msg_type) = Self::read_header(&mut reader).await + .unwrap(); + + // TODO only read message if message expected by message type + // currently relies on client sending good message + // (0x00 message size) + let message = Self::read_message(msg_size, &mut reader).await + .unwrap(); + + match msg_type { + 0x00 => Request::Hello, + 0x02 => Request::Put((*message).into()), + 0x03 => Request::Get((*message).into()), + 0x63 => Request::Bye, + 0xff => Request::Error, + _ => Request::Error + } + } + + async fn read_header(reader: &mut T) -> Option<(u64, u8)> + where T: AsyncRead + Unpin + { + let mut header: [u8; 9] = [0u8;9]; // header buffer + match reader.read_exact(&mut header).await + { + Ok(size) => debug!{"Read {} bytes for header", size}, + Err(err) => { + error!{"Error while reading header: {}", err} + return None; + } + } + + trace!{"Header buffer {:?}", header} + + let msg_size: u64 = u64::from_be_bytes( + header[0..8] + .try_into() + .unwrap()); + + let msg_type: u8 = u8::from_be_bytes( + header[8..9] + .try_into() + .unwrap()); + + debug!{"Message size: {}, Message type: {}", msg_size, msg_type} + Some((msg_size, msg_type)) + } + + async fn read_message(msg_size: u64, reader: &mut T) -> Option> + where T: AsyncRead + Unpin + { + // TODO: possible to use unallocated memory instead? + // -> https://doc.rust-lang.org/beta/std/mem/union.MaybeUninit.html + // TODO: 32 bit usize? Can't allocate a 64 bit length buffer anyway? + let mut message = Vec::with_capacity(msg_size.try_into().unwrap()); + // need to set the size, because otherwise it is assumed to be 0, since + // the vec is allocated but uninitialized at this point, we don't want to + // pre-allocate a potentially huge buffer with 0x00, so unsafe set size. + unsafe {message.set_len(msg_size.try_into().unwrap());} + + match reader.read_exact(&mut message).await + { + Ok(size) => debug!{"Read {} bytes for message", size}, + Err(err) => { + error!{"Error while reading message: {}", err} + return None; + } + } + trace!{"Read message {:?}", message} + + Some(message) + } + + async fn transit(&mut self, event: Request) { + match (&self.state, event) { + (State::Start, Request::Hello) => self.state = State::Link, + (State::Link, Request::Get(_)) => { + debug!{"Writing response"} + let get_res = self.coffer.read().await + .get(CofferPath(vec!["a".into(), "b".into(), "c".into()])) + .unwrap(); + + if let CofferValue::Blob(b) = get_res { + let response = Response::OkGet(b); + Self::write_response(response, &mut self.stream).await; + self.state = State::Link; + } + } + (State::Link, Request::Put(p)) => { + self.coffer.write().await + .put(CofferPath(vec!["a".into(), "b".into(), "c".into()]), + CofferValue::Blob(p)); + self.state = State::Link; + } + (_, Request::Bye) => self.state = State::End, + (_, Request::Error) => self.state = State::End, + _ => self.state = State::End + } + } + + async fn write_response(response: Response, writer: &mut T) + where T: AsyncWrite + Unpin + { + match response { + Response::OkGet(get) => { + let frame = Self::make_frame(0x05u8, get).await; + trace!{"OkGet Frame: {:?}", frame} + writer.write_all(&frame).await.unwrap(); + } + } + } + + async fn make_frame(msg_type: u8, data: Vec) -> Vec { + trace!{"Creating frame for type: {:?}, data: {:?}", msg_type, data} + + // TODO magic number + let mut frame: Vec = Vec::with_capacity(data.len() + 72); + unsafe {frame.set_len(8);} + + frame.splice(0..8, u64::try_from(data.len()) + .unwrap() + .to_be_bytes() + .iter() + .cloned()); + + frame.push(msg_type); + frame.extend(&data); + + frame + } +} diff --git a/coffer-server/src/server.rs b/coffer-server/src/server.rs index f326004..40ce3cf 100644 --- a/coffer-server/src/server.rs +++ b/coffer-server/src/server.rs @@ -3,7 +3,7 @@ use log::{debug, error, info, trace, warn}; use quick_error::quick_error; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::{TcpListener}; use tokio::stream::StreamExt; use tokio::sync::RwLock; @@ -14,6 +14,8 @@ use coffer_common::keyring::Keyring; use coffer_common::coffer::Coffer; use coffer_common::certificate::{Certificate, CertificateError}; +use crate::protocol::Protocol; + quick_error! { #[derive(Debug)] pub enum ServerError { @@ -61,13 +63,17 @@ where C: Coffer + Send + Sync + 'static while let Some(connection) = incoming.next().await { debug!{"New incoming connection"} match connection { - Ok(mut tcp_stream) => { - debug!{"Connection ok"} - debug!{"Spawning off connection handler"} + Ok(tcp_stream) => { + debug!{"Connection ok\nSpawning off connection handler"} let keyring = self.keyring.clone(); let coffer = self.coffer.clone(); - tokio::spawn(Self::handle_connection(keyring, coffer, tcp_stream)); + + let protocol = Protocol::new(tcp_stream, coffer, keyring); + tokio::spawn(async move { + protocol.run().await; + }); + } Err(err) => error!{"Connection could not be established {}", err} } @@ -77,13 +83,6 @@ where C: Coffer + Send + Sync + 'static server.await } - - async fn handle_connection(keyring: Arc>, - coffer: Arc>, - mut tcp_stream: TcpStream) - { - let (reader, mut writer) = tcp_stream.split(); - } } pub struct ServerBuilder