Coverage Report

Created: 2022-07-04 16:17

src/client/connection.rs
Line
Count
Source (jump to first uncovered line)
1
use std::collections::{HashMap, VecDeque};
2
3
use futures_util::{Sink, SinkExt, Stream, StreamExt};
4
use tokio::sync::{oneshot, Mutex};
5
use tokio_tungstenite::tungstenite::Message;
6
use tracing::debug;
7
8
use super::InnerError;
9
use crate::{
10
    requests::{ClientRequest, EventSubscription, Identify},
11
    responses::{Hello, Identified, RequestResponse, ServerMessage, Status},
12
};
13
14
/// Wrapper for the list of ongoing requests that wait for response.
15
18
#[derive(Default)]
<obws::client::connection::ReceiverList as core::default::Default>::default
Line
Count
Source
15
18
#[derive(Default)]
Unexecuted instantiation: <obws::client::connection::ReceiverList as core::default::Default>::default
Unexecuted instantiation: <obws::client::connection::ReceiverList as core::default::Default>::default
16
pub(super) struct ReceiverList(Mutex<HashMap<u64, oneshot::Sender<(Status, serde_json::Value)>>>);
17
18
impl ReceiverList {
19
    /// Add a new receiver to the wait list, that will be notified once a request with the given
20
    /// ID is received.
21
346
    pub async fn add(&self, id: u64) -> oneshot::Receiver<(Status, serde_json::Value)> {
<obws::client::connection::ReceiverList>::add
Line
Count
Source
21
346
    pub async fn add(&self, id: u64) -> oneshot::Receiver<(Status, serde_json::Value)> {
Unexecuted instantiation: <obws::client::connection::ReceiverList>::add
22
346
        let (tx, rx) = oneshot::channel();
23
346
        self.0.lock()
.await0
.insert(id, tx);
24
346
        rx
25
346
    }
<obws::client::connection::ReceiverList>::add::{closure#0}
Line
Count
Source
21
346
    pub async fn add(&self, id: u64) -> oneshot::Receiver<(Status, serde_json::Value)> {
22
346
        let (tx, rx) = oneshot::channel();
23
346
        self.0.lock()
.await0
.insert(id, tx);
24
346
        rx
25
346
    }
Unexecuted instantiation: <obws::client::connection::ReceiverList>::add::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReceiverList>::add::{closure#0}
26
27
    /// Remove a previously added receiver. Used to free up resources, in case sending the request
28
    /// failed.
29
0
    pub async fn remove(&self, id: u64) {
Unexecuted instantiation: <obws::client::connection::ReceiverList>::remove
Unexecuted instantiation: <obws::client::connection::ReceiverList>::remove
30
0
        self.0.lock().await.remove(&id);
31
0
    }
Unexecuted instantiation: <obws::client::connection::ReceiverList>::remove::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReceiverList>::remove::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReceiverList>::remove::{closure#0}
32
33
    /// Notify a waiting receiver with the response to a request.
34
346
    pub async fn notify(&self, response: RequestResponse) -> Result<(), InnerError> {
<obws::client::connection::ReceiverList>::notify
Line
Count
Source
34
346
    pub async fn notify(&self, response: RequestResponse) -> Result<(), InnerError> {
Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify
35
346
        let RequestResponse {
36
346
            r#type: _,
37
346
            id,
38
346
            status,
39
346
            data,
40
        } = response;
41
42
346
        let id = id
43
346
            .parse()
44
346
            .map_err(|e| 
InnerError::InvalidRequestId(e, id)0
)
?0
;
Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify::{closure#0}::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify::{closure#0}::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify::{closure#0}::{closure#0}
45
46
346
        if let Some(tx) = self.0.lock()
.await0
.remove(&id) {
47
346
            tx.send((status, data)).ok();
48
346
        }
0
49
50
346
        Ok(())
51
346
    }
<obws::client::connection::ReceiverList>::notify::{closure#0}
Line
Count
Source
34
346
    pub async fn notify(&self, response: RequestResponse) -> Result<(), InnerError> {
35
346
        let RequestResponse {
36
346
            r#type: _,
37
346
            id,
38
346
            status,
39
346
            data,
40
        } = response;
41
42
346
        let id = id
43
346
            .parse()
44
346
            .map_err(|e| InnerError::InvalidRequestId(e, id))
?0
;
45
46
346
        if let Some(tx) = self.0.lock()
.await0
.remove(&id) {
47
346
            tx.send((status, data)).ok();
48
346
        }
0
49
50
346
        Ok(())
51
346
    }
Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReceiverList>::notify::{closure#0}
52
53
    /// Reset the list, canceling any outstanding receivers.
54
0
    pub async fn reset(&self) {
Unexecuted instantiation: <obws::client::connection::ReceiverList>::reset
Unexecuted instantiation: <obws::client::connection::ReceiverList>::reset
55
0
        self.0.lock().await.clear();
56
0
    }
Unexecuted instantiation: <obws::client::connection::ReceiverList>::reset::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReceiverList>::reset::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReceiverList>::reset::{closure#0}
57
}
58
59
/// Wrapper around a thread-safe queue to park and notify re-identify listener.
60
18
#[derive(Default)]
<obws::client::connection::ReidentifyReceiverList as core::default::Default>::default
Line
Count
Source
60
18
#[derive(Default)]
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList as core::default::Default>::default
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList as core::default::Default>::default
61
pub(super) struct ReidentifyReceiverList(Mutex<VecDeque<oneshot::Sender<Identified>>>);
62
63
impl ReidentifyReceiverList {
64
    /// Add a new receiver to the wait list, returning a channel to await the result on.
65
0
    pub async fn add(&self) -> oneshot::Receiver<Identified> {
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::add
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::add
66
0
        let (tx, rx) = oneshot::channel();
67
0
        self.0.lock().await.push_back(tx);
68
0
        rx
69
0
    }
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::add::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::add::{closure#0}
70
71
    /// Notify the next listener in the queue, transferring it the response.
72
0
    pub async fn notify(&self, identified: Identified) {
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::notify
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::notify
73
0
        if let Some(tx) = self.0.lock().await.pop_front() {
74
0
            tx.send(identified).ok();
75
0
        }
76
0
    }
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::notify::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::notify::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::notify::{closure#0}
77
78
    /// Reset the list, canceling any outstanding receivers.
79
0
    pub async fn reset(&self) {
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::reset
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::reset
80
0
        self.0.lock().await.clear();
81
0
    }
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::reset::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::reset::{closure#0}
Unexecuted instantiation: <obws::client::connection::ReidentifyReceiverList>::reset::{closure#0}
82
}
83
84
/// Errors that can occur while performing the initial handshake with obs-websocket.
85
0
#[derive(Debug, thiserror::Error)]
Unexecuted instantiation: <obws::client::connection::HandshakeError as core::fmt::Debug>::fmt
Unexecuted instantiation: <obws::client::connection::HandshakeError as core::fmt::Debug>::fmt
Unexecuted instantiation: <obws::client::connection::HandshakeError as core::fmt::Display>::fmt
Unexecuted instantiation: <obws::client::connection::HandshakeError as std::error::Error>::source
Unexecuted instantiation: <obws::client::connection::HandshakeError as std::error::Error>::source
Unexecuted instantiation: <obws::client::connection::HandshakeError as core::fmt::Display>::fmt
86
pub enum HandshakeError {
87
    /// The connection to obs-websocket was interrupted while trying to read a message.
88
    #[error("connection to obs-websocket was closed")]
89
    ConnectionClosed,
90
    /// Receiving a message did not succeed.
91
    #[error("failed reading websocket message")]
92
    Receive(#[source] tokio_tungstenite::tungstenite::Error),
93
    /// The web-socket message was not convertible to text.
94
    #[error("websocket message not convertible to text")]
95
    IntoText(#[source] tokio_tungstenite::tungstenite::Error),
96
    /// A message from obs-websocket could not be deserialized.
97
    #[error("failed deserializing message")]
98
    DeserializeMessage(#[source] serde_json::Error),
99
    /// A message could not be serialized for sending.
100
    #[error("failed serializing message")]
101
    SerializeMessage(#[source] serde_json::Error),
102
    /// Sending a message to obs-websocket failed.
103
    #[error("failed to send message to obs-websocket")]
104
    Send(#[source] tokio_tungstenite::tungstenite::Error),
105
    /// Didn't receive the initial `Hello` message from obs-websocket after connecting.
106
    #[error("didn't receive a `Hello` message after connecting")]
107
    NoHello,
108
    /// Didn't receive a `Identified` message from obs-websocket after authentication.
109
    #[error("didn't receive a `Identified` message")]
110
    NoIdentified,
111
}
112
113
18
pub(super) async fn handshake(
114
18
    write: &mut (impl Sink<Message, Error = tokio_tungstenite::tungstenite::Error> + Unpin),
115
18
    read: &mut (impl Stream<Item = tokio_tungstenite::tungstenite::Result<Message>> + Unpin),
116
18
    password: Option<&str>,
117
18
    event_subscriptions: Option<EventSubscription>,
118
18
) -> Result<(), HandshakeError> {
obws::client::connection::handshake::<futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>, tungstenite::protocol::message::Message>, futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>>
Line
Count
Source
113
18
pub(super) async fn handshake(
114
18
    write: &mut (impl Sink<Message, Error = tokio_tungstenite::tungstenite::Error> + Unpin),
115
18
    read: &mut (impl Stream<Item = tokio_tungstenite::tungstenite::Result<Message>> + Unpin),
116
18
    password: Option<&str>,
117
18
    event_subscriptions: Option<EventSubscription>,
118
18
) -> Result<(), HandshakeError> {
Unexecuted instantiation: obws::client::connection::handshake::<_, _>
Unexecuted instantiation: obws::client::connection::handshake::<_, _>
119
36
    async fn read_message(
120
36
        read: &mut (impl Stream<Item = tokio_tungstenite::tungstenite::Result<Message>> + Unpin),
121
36
    ) -> Result<ServerMessage, HandshakeError> {
obws::client::connection::handshake::{closure#0}::read_message::<futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>>
Line
Count
Source
119
36
    async fn read_message(
120
36
        read: &mut (impl Stream<Item = tokio_tungstenite::tungstenite::Result<Message>> + Unpin),
121
36
    ) -> Result<ServerMessage, HandshakeError> {
Unexecuted instantiation: obws::client::connection::handshake::{closure#0}::read_message::<_>
Unexecuted instantiation: obws::client::connection::handshake::{closure#0}::read_message::<_>
122
36
        let message = read
123
36
            .next()
124
23
            .await
125
36
            .ok_or(HandshakeError::ConnectionClosed)
?0
126
36
            .map_err(HandshakeError::Receive)
?0
127
36
            .into_text()
128
36
            .map_err(HandshakeError::IntoText)
?0
;
129
130
36
        serde_json::from_str::<ServerMessage>(&message).map_err(HandshakeError::DeserializeMessage)
131
36
    }
obws::client::connection::handshake::{closure#0}::read_message::<futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>>::{closure#0}
Line
Count
Source
122
36
        let message = read
123
36
            .next()
124
23
            .await
125
36
            .ok_or(HandshakeError::ConnectionClosed)
?0
126
36
            .map_err(HandshakeError::Receive)
?0
127
36
            .into_text()
128
36
            .map_err(HandshakeError::IntoText)
?0
;
129
130
36
        serde_json::from_str::<ServerMessage>(&message).map_err(HandshakeError::DeserializeMessage)
131
36
    }
Unexecuted instantiation: obws::client::connection::handshake::{closure#0}::read_message::<_>::{closure#0}
Unexecuted instantiation: obws::client::connection::handshake::{closure#0}::read_message::<_>::{closure#0}
132
133
18
    match read_message(read)
.await5
?0
{
134
        ServerMessage::Hello(Hello {
135
            obs_web_socket_version: _,
136
18
            rpc_version,
137
18
            authentication,
138
18
        }) => {
139
18
            let authentication = authentication.zip(password).map(|(auth, password)| {
140
18
                create_auth_response(&auth.challenge, &auth.salt, password)
141
18
            });
obws::client::connection::handshake::<futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>, tungstenite::protocol::message::Message>, futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>>::{closure#0}::{closure#0}
Line
Count
Source
139
18
            let authentication = authentication.zip(password).map(|(auth, password)| {
140
18
                create_auth_response(&auth.challenge, &auth.salt, password)
141
18
            });
Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0}::{closure#0}
Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0}::{closure#0}
142
143
18
            let req = serde_json::to_string(&ClientRequest::Identify(Identify {
144
18
                rpc_version,
145
18
                authentication,
146
18
                event_subscriptions,
147
18
            }))
148
18
            .map_err(HandshakeError::SerializeMessage)
?0
;
149
150
18
            write
151
18
                .send(Message::Text(req))
152
0
                .await
153
18
                .map_err(HandshakeError::Send)
?0
;
154
        }
155
0
        _ => return Err(HandshakeError::NoHello),
156
    }
157
158
18
    match read_message(read).await
?0
{
159
        ServerMessage::Identified(Identified {
160
18
            negotiated_rpc_version,
161
        }) => {
162
18
            debug!(rpc_version = %negotiated_rpc_version, "identified against obs-websocket");
obws::client::connection::handshake::<futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>, tungstenite::protocol::message::Message>, futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>>::{closure#0}::{closure#1}
Line
Count
Source
162
18
            debug!(rpc_version = %negotiated_rpc_version, "identified against obs-websocket");
Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0}::{closure#1}
Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0}::{closure#1}
163
        }
164
0
        _ => return Err(HandshakeError::NoIdentified),
165
    }
166
167
18
    Ok(())
168
18
}
obws::client::connection::handshake::<futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>, tungstenite::protocol::message::Message>, futures_util::stream::stream::split::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::MaybeTlsStream<tokio::net::tcp::stream::TcpStream>>>>::{closure#0}
Line
Count
Source
133
18
    match read_message(read)
.await5
?0
{
134
        ServerMessage::Hello(Hello {
135
            obs_web_socket_version: _,
136
18
            rpc_version,
137
18
            authentication,
138
18
        }) => {
139
18
            let authentication = authentication.zip(password).map(|(auth, password)| {
140
                create_auth_response(&auth.challenge, &auth.salt, password)
141
18
            });
142
143
18
            let req = serde_json::to_string(&ClientRequest::Identify(Identify {
144
18
                rpc_version,
145
18
                authentication,
146
18
                event_subscriptions,
147
18
            }))
148
18
            .map_err(HandshakeError::SerializeMessage)
?0
;
149
150
18
            write
151
18
                .send(Message::Text(req))
152
0
                .await
153
18
                .map_err(HandshakeError::Send)
?0
;
154
        }
155
0
        _ => return Err(HandshakeError::NoHello),
156
    }
157
158
18
    match read_message(read).await
?0
{
159
        ServerMessage::Identified(Identified {
160
18
            negotiated_rpc_version,
161
        }) => {
162
18
            debug!(rpc_version = %negotiated_rpc_version, "identified against obs-websocket");
163
        }
164
0
        _ => return Err(HandshakeError::NoIdentified),
165
    }
166
167
18
    Ok(())
168
18
}
Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0}
Unexecuted instantiation: obws::client::connection::handshake::<_, _>::{closure#0}
169
170
18
fn create_auth_response(challenge: &str, salt: &str, password: &str) -> String {
171
18
    use sha2::{Digest, Sha256};
172
18
173
18
    let mut hasher = Sha256::new();
174
18
    hasher.update(password.as_bytes());
175
18
    hasher.update(salt.as_bytes());
176
18
177
18
    let mut auth = String::with_capacity(Sha256::output_size() * 4 / 3 + 4);
178
18
179
18
    base64::encode_config_buf(hasher.finalize_reset(), base64::STANDARD, &mut auth);
180
18
181
18
    hasher.update(auth.as_bytes());
182
18
    hasher.update(challenge.as_bytes());
183
18
    auth.clear();
184
18
185
18
    base64::encode_config_buf(hasher.finalize(), base64::STANDARD, &mut auth);
186
18
187
18
    auth
188
18
}
obws::client::connection::create_auth_response
Line
Count
Source
170
18
fn create_auth_response(challenge: &str, salt: &str, password: &str) -> String {
171
18
    use sha2::{Digest, Sha256};
172
18
173
18
    let mut hasher = Sha256::new();
174
18
    hasher.update(password.as_bytes());
175
18
    hasher.update(salt.as_bytes());
176
18
177
18
    let mut auth = String::with_capacity(Sha256::output_size() * 4 / 3 + 4);
178
18
179
18
    base64::encode_config_buf(hasher.finalize_reset(), base64::STANDARD, &mut auth);
180
18
181
18
    hasher.update(auth.as_bytes());
182
18
    hasher.update(challenge.as_bytes());
183
18
    auth.clear();
184
18
185
18
    base64::encode_config_buf(hasher.finalize(), base64::STANDARD, &mut auth);
186
18
187
18
    auth
188
18
}
Unexecuted instantiation: obws::client::connection::create_auth_response