Line | Count | Source (jump to first uncovered line) |
1 | | use std::collections::{HashMap, VecDeque}; |
2 | | |
3 | | use futures_util::{Sink, SinkExt, Stream, StreamExt}; |
4 | | use tokio::sync::{oneshot, Mutex}; |
5 | | use tokio_tungstenite::tungstenite::Message; |
6 | | use tracing::debug; |
7 | | |
8 | | use super::InnerError; |
9 | | use crate::{ |
10 | | requests::{ClientRequest, EventSubscription, Identify}, |
11 | | responses::{Hello, Identified, RequestResponse, ServerMessage, Status}, |
12 | | }; |
13 | | |
14 | | /// Wrapper for the list of ongoing requests that wait for response. |
15 | 18 | #[derive(Default)] <obws::client::connection::ReceiverList as core::default::Default>::default Line | Count | Source | 15 | 18 | #[derive(Default)] |
Unexecuted instantiation: <obws::client::connection::ReceiverList as core::default::Default>::default Unexecuted instantiation: <obws::client::connection::ReceiverList as core::default::Default>::default |
16 | | pub(super) struct ReceiverList(Mutex<HashMap<u64, oneshot::Sender<(Status, serde_json::Value)>>>); |
17 | | |
18 | | impl ReceiverList { |
19 | | /// Add a new receiver to the wait list, that will be notified once a request with the given |
20 | | /// ID is received. |
21 | 346 | pub async fn add(&self, id: u64) -> oneshot::Receiver<(Status, serde_json::Value)> { <obws::client::connection::ReceiverList>::add Line | Count | Source | 21 | 346 | pub async fn add(&self, id: u64) -> oneshot::Receiver<(Status, serde_json::Value)> { |
Unexecuted instantiation: <obws::client::connection::ReceiverList>::add |
22 | 346 | let (tx, rx) = oneshot::channel(); |
23 | 346 | self.0.lock().await0 .insert(id, tx); |
24 | 346 | rx |
25 | 346 | } <obws::client::connection::ReceiverList>::add::{closure#0} Line | Count | Source | 21 | 346 | pub async fn add(&self, id: u64) -> oneshot::Receiver<(Status, serde_json::Value)> { | 22 | 346 | let (tx, rx) = oneshot::channel(); | 23 | 346 | self.0.lock().await0 .insert(id, tx); | 24 | 346 | rx | 25 | 346 | } |
Unexecuted instantiation: <obws::client::connection::ReceiverList>::add::{closure#0} Unexecuted instantiation: <obws::client::connection::ReceiverList>::add::{closure#0} |
26 | | |
27 | | /// Remove a previously added receiver. Used to free up resources, in case sending the request |
28 | | /// failed. |
29 | 0 | pub async fn remove(&self, id: u64) { Unexecuted instantiation: <obws::client::connection::ReceiverList>::remove Unexecuted instantiation: <obws::client::connection::ReceiverList>::remove |
30 | 0 | self.0.lock().await.remove(&id); |
31 | 0 | } Unexecuted instantiation: <obws::client::connection::ReceiverList>::remove::{closure#0} Unexecuted instantiation: <obws::client::connection::ReceiverList>::remove::{closure#0} Unexecuted instantiation: <obws::client::connection::ReceiverList>::remove::{closure#0} |
32 | | |
33 | | /// Notify a waiting receiver with the response to a request. |
34 | 346 | pub async fn notify(&self, response: RequestResponse) -> Result<(), InnerError> { <obws::client::connection::ReceiverList>::notify Line | Count | Source | 34 | 346 | pub async fn notify(&self, response: RequestResponse) -> Result<(), InnerError> { |
Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify |
35 | 346 | let RequestResponse { |
36 | 346 | r#type: _, |
37 | 346 | id, |
38 | 346 | status, |
39 | 346 | data, |
40 | | } = response; |
41 | | |
42 | 346 | let id = id |
43 | 346 | .parse() |
44 | 346 | .map_err(|e| InnerError::InvalidRequestId(e, id)0 )?0 ; Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify::{closure#0}::{closure#0} Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify::{closure#0}::{closure#0} Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify::{closure#0}::{closure#0} |
45 | | |
46 | 346 | if let Some(tx) = self.0.lock().await0 .remove(&id) { |
47 | 346 | tx.send((status, data)).ok(); |
48 | 346 | }0 |
49 | | |
50 | 346 | Ok(()) |
51 | 346 | } <obws::client::connection::ReceiverList>::notify::{closure#0} Line | Count | Source | 34 | 346 | pub async fn notify(&self, response: RequestResponse) -> Result<(), InnerError> { | 35 | 346 | let RequestResponse { | 36 | 346 | r#type: _, | 37 | 346 | id, | 38 | 346 | status, | 39 | 346 | data, | 40 | | } = response; | 41 | | | 42 | 346 | let id = id | 43 | 346 | .parse() | 44 | 346 | .map_err(|e| InnerError::InvalidRequestId(e, id))?0 ; | 45 | | | 46 | 346 | if let Some(tx) = self.0.lock().await0 .remove(&id) { | 47 | 346 | tx.send((status, data)).ok(); | 48 | 346 | }0 | 49 | | | 50 | 346 | Ok(()) | 51 | 346 | } |
Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify::{closure#0} Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify::{closure#0} |
52 | | |
53 | | /// Reset the list, canceling any outstanding receivers. |
54 | 0 | pub async fn reset(&self) { Unexecuted instantiation: <obws::client::connection::ReceiverList>::reset Unexecuted instantiation: <obws::client::connection::ReceiverList>::reset |
55 | 0 | self.0.lock().await.clear(); |
56 | 0 | } Unexecuted instantiation: <obws::client::connection::ReceiverList>::reset::{closure#0} Unexecuted instantiation: <obws::client::connection::ReceiverList>::reset::{closure#0} Unexecuted instantiation: <obws::client::connection::ReceiverList>::reset::{closure#0} |
57 | | } |
58 | | |
59 | | /// Wrapper around a thread-safe queue to park and notify re-identify listener. |
60 | 18 | #[derive(Default)] <obws::client::connection::ReidentifyReceiverList as core::default::Default>::default Line | Count | Source | 60 | 18 | #[derive(Default)] |
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList as core::default::Default>::default Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList as core::default::Default>::default |
61 | | pub(super) struct ReidentifyReceiverList(Mutex<VecDeque<oneshot::Sender<Identified>>>); |
62 | | |
63 | | impl ReidentifyReceiverList { |
64 | | /// Add a new receiver to the wait list, returning a channel to await the result on. |
65 | 0 | pub async fn add(&self) -> oneshot::Receiver<Identified> { Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::add Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::add |
66 | 0 | let (tx, rx) = oneshot::channel(); |
67 | 0 | self.0.lock().await.push_back(tx); |
68 | 0 | rx |
69 | 0 | } Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::add::{closure#0} Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::add::{closure#0} |
70 | | |
71 | | /// Notify the next listener in the queue, transferring it the response. |
72 | 0 | pub async fn notify(&self, identified: Identified) { Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::notify Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::notify |
73 | 0 | if let Some(tx) = self.0.lock().await.pop_front() { |
74 | 0 | tx.send(identified).ok(); |
75 | 0 | } |
76 | 0 | } Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::notify::{closure#0} Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::notify::{closure#0} Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::notify::{closure#0} |
77 | | |
78 | | /// Reset the list, canceling any outstanding receivers. |
79 | 0 | pub async fn reset(&self) { Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::reset Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::reset |
80 | 0 | self.0.lock().await.clear(); |
81 | 0 | } Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::reset::{closure#0} Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::reset::{closure#0} Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::reset::{closure#0} |
82 | | } |
83 | | |
84 | | /// Errors that can occur while performing the initial handshake with obs-websocket. |
85 | 0 | #[derive(Debug, thiserror::Error)] Unexecuted instantiation: <obws::client::connection::HandshakeError as core::fmt::Debug>::fmt Unexecuted instantiation: <obws::client::connection::HandshakeError as core::fmt::Debug>::fmt Unexecuted instantiation: <obws::client::connection::HandshakeError as core::fmt::Display>::fmt Unexecuted instantiation: <obws::client::connection::HandshakeError as std::error::Error>::source Unexecuted instantiation: <obws::client::connection::HandshakeError as std::error::Error>::source Unexecuted instantiation: <obws::client::connection::HandshakeError as core::fmt::Display>::fmt |
86 | | pub enum HandshakeError { |
87 | | /// The connection to obs-websocket was interrupted while trying to read a message. |
88 | | #[error("connection to obs-websocket was closed")] |
89 | | ConnectionClosed, |
90 | | /// Receiving a message did not succeed. |
91 | | #[error("failed reading websocket message")] |
92 | | Receive(#[source] tokio_tungstenite::tungstenite::Error), |
93 | | /// The web-socket message was not convertible to text. |
94 | | #[error("websocket message not convertible to text")] |
95 | | IntoText(#[source] tokio_tungstenite::tungstenite::Error), |
96 | | /// A message from obs-websocket could not be deserialized. |
97 | | #[error("failed deserializing message")] |
98 | | DeserializeMessage(#[source] serde_json::Error), |
99 | | /// A message could not be serialized for sending. |
100 | | #[error("failed serializing message")] |
101 | | SerializeMessage(#[source] serde_json::Error), |
102 | | /// Sending a message to obs-websocket failed. |
103 | | #[error("failed to send message to obs-websocket")] |
104 | | Send(#[source] tokio_tungstenite::tungstenite::Error), |
105 | | /// Didn't receive the initial `Hello` message from obs-websocket after connecting. |
106 | | #[error("didn't receive a `Hello` message after connecting")] |
107 | | NoHello, |
108 | | /// Didn't receive a `Identified` message from obs-websocket after authentication. |
109 | | #[error("didn't receive a `Identified` message")] |
110 | | NoIdentified, |
111 | | } |
112 | | |
113 | 18 | pub(super) async fn handshake( |
114 | 18 | write: &mut (impl Sink<Message, Error = tokio_tungstenite::tungstenite::Error> + Unpin), |
115 | 18 | read: &mut (impl Stream<Item = tokio_tungstenite::tungstenite::Result<Message>> + Unpin), |
116 | 18 | password: Option<&str>, |
117 | 18 | event_subscriptions: Option<EventSubscription>, |
118 | 18 | ) -> Result<(), HandshakeError> { obws::client::connection::handshake::<futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>, tungstenite::protocol::message::Message>, futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>> Line | Count | Source | 113 | 18 | pub(super) async fn handshake( | 114 | 18 | write: &mut (impl Sink<Message, Error = tokio_tungstenite::tungstenite::Error> + Unpin), | 115 | 18 | read: &mut (impl Stream<Item = tokio_tungstenite::tungstenite::Result<Message>> + Unpin), | 116 | 18 | password: Option<&str>, | 117 | 18 | event_subscriptions: Option<EventSubscription>, | 118 | 18 | ) -> Result<(), HandshakeError> { |
Unexecuted instantiation: obws::client::connection::handshake::<_, _> Unexecuted instantiation: obws::client::connection::handshake::<_, _> |
119 | 36 | async fn read_message( |
120 | 36 | read: &mut (impl Stream<Item = tokio_tungstenite::tungstenite::Result<Message>> + Unpin), |
121 | 36 | ) -> Result<ServerMessage, HandshakeError> { obws::client::connection::handshake::{closure#0}::read_message::<futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>> Line | Count | Source | 119 | 36 | async fn read_message( | 120 | 36 | read: &mut (impl Stream<Item = tokio_tungstenite::tungstenite::Result<Message>> + Unpin), | 121 | 36 | ) -> Result<ServerMessage, HandshakeError> { |
Unexecuted instantiation: obws::client::connection::handshake::{closure#0}::read_message::<_> Unexecuted instantiation: obws::client::connection::handshake::{closure#0}::read_message::<_> |
122 | 36 | let message = read |
123 | 36 | .next() |
124 | 23 | .await |
125 | 36 | .ok_or(HandshakeError::ConnectionClosed)?0 |
126 | 36 | .map_err(HandshakeError::Receive)?0 |
127 | 36 | .into_text() |
128 | 36 | .map_err(HandshakeError::IntoText)?0 ; |
129 | | |
130 | 36 | serde_json::from_str::<ServerMessage>(&message).map_err(HandshakeError::DeserializeMessage) |
131 | 36 | } obws::client::connection::handshake::{closure#0}::read_message::<futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>>::{closure#0} Line | Count | Source | 122 | 36 | let message = read | 123 | 36 | .next() | 124 | 23 | .await | 125 | 36 | .ok_or(HandshakeError::ConnectionClosed)?0 | 126 | 36 | .map_err(HandshakeError::Receive)?0 | 127 | 36 | .into_text() | 128 | 36 | .map_err(HandshakeError::IntoText)?0 ; | 129 | | | 130 | 36 | serde_json::from_str::<ServerMessage>(&message).map_err(HandshakeError::DeserializeMessage) | 131 | 36 | } |
Unexecuted instantiation: obws::client::connection::handshake::{closure#0}::read_message::<_>::{closure#0} Unexecuted instantiation: obws::client::connection::handshake::{closure#0}::read_message::<_>::{closure#0} |
132 | | |
133 | 18 | match read_message(read).await5 ?0 { |
134 | | ServerMessage::Hello(Hello { |
135 | | obs_web_socket_version: _, |
136 | 18 | rpc_version, |
137 | 18 | authentication, |
138 | 18 | }) => { |
139 | 18 | let authentication = authentication.zip(password).map(|(auth, password)| { |
140 | 18 | create_auth_response(&auth.challenge, &auth.salt, password) |
141 | 18 | }); obws::client::connection::handshake::<futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>, tungstenite::protocol::message::Message>, futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>>::{closure#0}::{closure#0} Line | Count | Source | 139 | 18 | let authentication = authentication.zip(password).map(|(auth, password)| { | 140 | 18 | create_auth_response(&auth.challenge, &auth.salt, password) | 141 | 18 | }); |
Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0}::{closure#0} Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0}::{closure#0} |
142 | | |
143 | 18 | let req = serde_json::to_string(&ClientRequest::Identify(Identify { |
144 | 18 | rpc_version, |
145 | 18 | authentication, |
146 | 18 | event_subscriptions, |
147 | 18 | })) |
148 | 18 | .map_err(HandshakeError::SerializeMessage)?0 ; |
149 | | |
150 | 18 | write |
151 | 18 | .send(Message::Text(req)) |
152 | 0 | .await |
153 | 18 | .map_err(HandshakeError::Send)?0 ; |
154 | | } |
155 | 0 | _ => return Err(HandshakeError::NoHello), |
156 | | } |
157 | | |
158 | 18 | match read_message(read).await?0 { |
159 | | ServerMessage::Identified(Identified { |
160 | 18 | negotiated_rpc_version, |
161 | | }) => { |
162 | 18 | debug!(rpc_version = %negotiated_rpc_version, "identified against obs-websocket"); obws::client::connection::handshake::<futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>, tungstenite::protocol::message::Message>, futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>>::{closure#0}::{closure#1} Line | Count | Source | 162 | 18 | debug!(rpc_version = %negotiated_rpc_version, "identified against obs-websocket"); |
Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0}::{closure#1} Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0}::{closure#1} |
163 | | } |
164 | 0 | _ => return Err(HandshakeError::NoIdentified), |
165 | | } |
166 | | |
167 | 18 | Ok(()) |
168 | 18 | } obws::client::connection::handshake::<futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>, tungstenite::protocol::message::Message>, futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>>::{closure#0} Line | Count | Source | 133 | 18 | match read_message(read).await5 ?0 { | 134 | | ServerMessage::Hello(Hello { | 135 | | obs_web_socket_version: _, | 136 | 18 | rpc_version, | 137 | 18 | authentication, | 138 | 18 | }) => { | 139 | 18 | let authentication = authentication.zip(password).map(|(auth, password)| { | 140 | | create_auth_response(&auth.challenge, &auth.salt, password) | 141 | 18 | }); | 142 | | | 143 | 18 | let req = serde_json::to_string(&ClientRequest::Identify(Identify { | 144 | 18 | rpc_version, | 145 | 18 | authentication, | 146 | 18 | event_subscriptions, | 147 | 18 | })) | 148 | 18 | .map_err(HandshakeError::SerializeMessage)?0 ; | 149 | | | 150 | 18 | write | 151 | 18 | .send(Message::Text(req)) | 152 | 0 | .await | 153 | 18 | .map_err(HandshakeError::Send)?0 ; | 154 | | } | 155 | 0 | _ => return Err(HandshakeError::NoHello), | 156 | | } | 157 | | | 158 | 18 | match read_message(read).await?0 { | 159 | | ServerMessage::Identified(Identified { | 160 | 18 | negotiated_rpc_version, | 161 | | }) => { | 162 | 18 | debug!(rpc_version = %negotiated_rpc_version, "identified against obs-websocket"); | 163 | | } | 164 | 0 | _ => return Err(HandshakeError::NoIdentified), | 165 | | } | 166 | | | 167 | 18 | Ok(()) | 168 | 18 | } |
Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0} Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0} |
169 | | |
170 | 18 | fn create_auth_response(challenge: &str, salt: &str, password: &str) -> String { |
171 | 18 | use sha2::{Digest, Sha256}; |
172 | 18 | |
173 | 18 | let mut hasher = Sha256::new(); |
174 | 18 | hasher.update(password.as_bytes()); |
175 | 18 | hasher.update(salt.as_bytes()); |
176 | 18 | |
177 | 18 | let mut auth = String::with_capacity(Sha256::output_size() * 4 / 3 + 4); |
178 | 18 | |
179 | 18 | base64::encode_config_buf(hasher.finalize_reset(), base64::STANDARD, &mut auth); |
180 | 18 | |
181 | 18 | hasher.update(auth.as_bytes()); |
182 | 18 | hasher.update(challenge.as_bytes()); |
183 | 18 | auth.clear(); |
184 | 18 | |
185 | 18 | base64::encode_config_buf(hasher.finalize(), base64::STANDARD, &mut auth); |
186 | 18 | |
187 | 18 | auth |
188 | 18 | } obws::client::connection::create_auth_response Line | Count | Source | 170 | 18 | fn create_auth_response(challenge: &str, salt: &str, password: &str) -> String { | 171 | 18 | use sha2::{Digest, Sha256}; | 172 | 18 | | 173 | 18 | let mut hasher = Sha256::new(); | 174 | 18 | hasher.update(password.as_bytes()); | 175 | 18 | hasher.update(salt.as_bytes()); | 176 | 18 | | 177 | 18 | let mut auth = String::with_capacity(Sha256::output_size() * 4 / 3 + 4); | 178 | 18 | | 179 | 18 | base64::encode_config_buf(hasher.finalize_reset(), base64::STANDARD, &mut auth); | 180 | 18 | | 181 | 18 | hasher.update(auth.as_bytes()); | 182 | 18 | hasher.update(challenge.as_bytes()); | 183 | 18 | auth.clear(); | 184 | 18 | | 185 | 18 | base64::encode_config_buf(hasher.finalize(), base64::STANDARD, &mut auth); | 186 | 18 | | 187 | 18 | auth | 188 | 18 | } |
Unexecuted instantiation: obws::client::connection::create_auth_response |