verity_client/
client.rs

1use std::str::FromStr;
2use std::time::Duration;
3
4use futures::stream::StreamExt;
5use http::{HeaderValue, Method};
6use reqwest::{IntoUrl, Response, Url};
7use reqwest_eventsource::{Event, EventSource};
8use serde::{Deserialize, Serialize};
9use tokio::select;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use tracing::error;
13use uuid::Uuid;
14
15use crate::request::RequestBuilder;
16
17/// Time to wait for a proof received over SSE connection since receiving HTTP response
18const PROOF_TIMEOUT: Duration = Duration::from_millis(1000);
19
20#[derive(Clone)]
21pub struct VerityClientConfig {
22    pub prover_url: String,
23}
24
25#[derive(Clone)]
26pub struct VerityClient {
27    pub(crate) inner: reqwest::Client,
28    pub(crate) config: VerityClientConfig,
29}
30
31pub struct VerityResponse {
32    pub subject: Response,
33    pub proof: String,
34    pub notary_pub_key: String,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38#[serde(rename_all = "camelCase")]
39pub struct NotaryInformation {
40    pub version: String,
41    pub public_key: String,
42    pub git_commit_hash: String,
43    pub git_commit_timestamp: String,
44}
45
46impl VerityClient {
47    /// Creates a new `VerityClient` with the given configuration.
48    pub fn new(config: VerityClientConfig) -> Self {
49        return Self {
50            inner: reqwest::Client::new(),
51            config,
52        };
53    }
54
55    /// Convenience method to make a `GET` request to a URL.
56    ///
57    /// # Errors
58    ///
59    /// This method fails whenever the supplied `Url` cannot be parsed.
60    pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
61        self.request(Method::GET, url)
62    }
63
64    /// Convenience method to make a `POST` request to a URL.
65    ///
66    /// # Errors
67    ///
68    /// This method fails whenever the supplied `Url` cannot be parsed.
69    pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder {
70        self.request(Method::POST, url)
71    }
72
73    /// Starts building a `Request` with the specified `Method` and `Url`.
74    ///
75    /// Returns a `RequestBuilder`, which allows setting headers and
76    /// the request body before sending.
77    ///
78    /// # Errors
79    ///
80    /// This method fails whenever the supplied `Url` cannot be parsed.
81    pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
82        RequestBuilder {
83            client: self.clone(),
84            inner: self.inner.request(method, url),
85        }
86    }
87
88    /// Executes a `Request` and returns a `VerityResponse`.
89    ///
90    /// A `Request` can be built manually with `Request::new()` or obtained
91    /// from a RequestBuilder with `RequestBuilder::build()`.
92    ///
93    /// You should prefer to use the `RequestBuilder` and
94    /// `RequestBuilder::send()`.
95    ///
96    /// # Errors
97    ///
98    /// This method fails if there was an error while sending the request,
99    /// a redirect loop was detected, or the redirect limit was exhausted.
100    pub async fn execute(&mut self, request: reqwest::Request) -> anyhow::Result<VerityResponse> {
101        self.execute_request(request).await
102    }
103
104    /// Executes the given request and awaits proof of execution.
105    ///
106    /// # Errors
107    ///
108    /// This method fails if the request cannot be sent or if proof cannot be obtained.
109    pub async fn execute_request(
110        &mut self,
111        mut req: reqwest::Request,
112    ) -> anyhow::Result<VerityResponse> {
113        let proxy_url = &String::from(req.url().as_str());
114        let headers = req.headers_mut();
115
116        let request_id = Uuid::new_v4();
117        headers.append(
118            "T-REQUEST-ID",
119            HeaderValue::from_str(&format!("{}", request_id))?,
120        );
121
122        headers.append("T-PROXY-URL", HeaderValue::from_str(proxy_url)?);
123
124        *req.url_mut() = Url::from_str(&format!("{}/proxy", self.config.prover_url))?;
125
126        let req = reqwest::RequestBuilder::from_parts(self.inner.clone(), req);
127
128        let request_cancellation_token = CancellationToken::new();
129        let timeout_cancellation_token = CancellationToken::new();
130
131        let proof_awaiter = self.await_proof(
132            request_id.to_string(),
133            request_cancellation_token.clone(),
134            timeout_cancellation_token.clone(),
135        )?;
136
137        // prettier-ignore
138        let (response, proof_msg) = tokio::try_join!(
139            self.send_request(req, request_cancellation_token, timeout_cancellation_token),
140            proof_awaiter
141        )
142        .map_err(|e| anyhow::anyhow!("Failed to prove the request: {}", e))?;
143
144        let subject = response?;
145        let (notary_pub_key, proof) = proof_msg?;
146
147        Ok(VerityResponse {
148            subject,
149            proof,
150            notary_pub_key,
151        })
152    }
153
154    /// Sends the request and handles cancellation tokens.
155    ///
156    /// Returns a `JoinHandle` that resolves to the response or an error.
157    fn send_request(
158        &self,
159        request: reqwest::RequestBuilder,
160        request_cancellation_token: CancellationToken,
161        timeout_cancellation_token: CancellationToken,
162    ) -> JoinHandle<anyhow::Result<reqwest::Response>> {
163        tokio::spawn(async move {
164            let result = request.send().await;
165            let response = result.map_err(|e| {
166                error!("{}", e);
167                e
168            })?;
169
170            // If T-PROOF-ID header has value, wait for the proof with the timeout,
171            // otherwise stop waiting
172            if response.headers().get("T-PROOF-ID").is_some() {
173                tokio::spawn(async move {
174                    tokio::time::sleep(PROOF_TIMEOUT).await;
175                    timeout_cancellation_token.cancel();
176                });
177            } else {
178                request_cancellation_token.cancel();
179                return Ok(response);
180            }
181
182            Ok(response)
183        })
184    }
185
186    /// Awaits proof of request execution.
187    ///
188    /// Returns a `JoinHandle` that resolves to the proof or an error.
189    ///
190    /// # Errors
191    ///
192    /// This method fails if the proof cannot be obtained.
193    fn await_proof(
194        &self,
195        request_id: String,
196        request_cancellation_token: CancellationToken,
197        timeout_cancellation_token: CancellationToken,
198    ) -> anyhow::Result<JoinHandle<anyhow::Result<(String, String)>>> {
199        let url = Url::from_str(&format!("{}/proof/{}", self.config.prover_url, request_id))?;
200        let mut event_source = EventSource::get(url);
201
202        let awaiter = tokio::task::spawn(async move {
203            while let Some(event) = event_source.next().await {
204                match event {
205                    Ok(Event::Open) => {}
206                    Ok(Event::Message(message)) => {
207                        let parts: Vec<&str> = message.data.splitn(2, "|").collect();
208                        if parts.len() != 2 {
209                            anyhow::bail!("Invalid proof response");
210                        }
211
212                        return Ok((parts[0].to_string(), parts[1].to_string()));
213                    }
214                    Err(err) => {
215                        error!("{}", err);
216                        Err(err)?;
217                    }
218                }
219            }
220
221            Ok((String::from(""), String::from("")))
222        });
223
224        let join_handle = tokio::spawn(async move {
225            // Wait for either SSE message, timeout or cancellation
226            select! {
227                proof = awaiter => {
228                    proof.unwrap()
229                }
230                () = timeout_cancellation_token.cancelled() => {
231                    anyhow::bail!("Timeout reached while waiting for a proof")
232                }
233                () = request_cancellation_token.cancelled() => {
234                    Ok((String::new(), String::new()))
235                }
236            }
237        });
238
239        Ok(join_handle)
240    }
241
242    /// Get the information of the connected notary
243    pub async fn get_notary_info(&self) -> anyhow::Result<NotaryInformation> {
244        let notary_info_url = format!("{}/notaryinfo", self.config.prover_url);
245        let notary_information = reqwest::get(notary_info_url)
246            .await?
247            .json::<NotaryInformation>()
248            .await?;
249
250        Ok(notary_information)
251    }
252}