1use std::str::FromStr;
2use std::time::Duration;
3
4use futures::stream::StreamExt;
5use http::{HeaderValue, Method};
6use reqwest::{IntoUrl, Response, Url};
7use reqwest_eventsource::{Event, EventSource};
8use serde::{Deserialize, Serialize};
9use tokio::select;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use tracing::error;
13use uuid::Uuid;
14
15use crate::request::RequestBuilder;
16
17const PROOF_TIMEOUT: Duration = Duration::from_millis(1000);
19
20#[derive(Clone)]
21pub struct VerityClientConfig {
22 pub prover_url: String,
23}
24
25#[derive(Clone)]
26pub struct VerityClient {
27 pub(crate) inner: reqwest::Client,
28 pub(crate) config: VerityClientConfig,
29}
30
31pub struct VerityResponse {
32 pub subject: Response,
33 pub proof: String,
34 pub notary_pub_key: String,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38#[serde(rename_all = "camelCase")]
39pub struct NotaryInformation {
40 pub version: String,
41 pub public_key: String,
42 pub git_commit_hash: String,
43 pub git_commit_timestamp: String,
44}
45
46impl VerityClient {
47 pub fn new(config: VerityClientConfig) -> Self {
49 return Self {
50 inner: reqwest::Client::new(),
51 config,
52 };
53 }
54
55 pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
61 self.request(Method::GET, url)
62 }
63
64 pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder {
70 self.request(Method::POST, url)
71 }
72
73 pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
82 RequestBuilder {
83 client: self.clone(),
84 inner: self.inner.request(method, url),
85 }
86 }
87
88 pub async fn execute(&mut self, request: reqwest::Request) -> anyhow::Result<VerityResponse> {
101 self.execute_request(request).await
102 }
103
104 pub async fn execute_request(
110 &mut self,
111 mut req: reqwest::Request,
112 ) -> anyhow::Result<VerityResponse> {
113 let proxy_url = &String::from(req.url().as_str());
114 let headers = req.headers_mut();
115
116 let request_id = Uuid::new_v4();
117 headers.append(
118 "T-REQUEST-ID",
119 HeaderValue::from_str(&format!("{}", request_id))?,
120 );
121
122 headers.append("T-PROXY-URL", HeaderValue::from_str(proxy_url)?);
123
124 *req.url_mut() = Url::from_str(&format!("{}/proxy", self.config.prover_url))?;
125
126 let req = reqwest::RequestBuilder::from_parts(self.inner.clone(), req);
127
128 let request_cancellation_token = CancellationToken::new();
129 let timeout_cancellation_token = CancellationToken::new();
130
131 let proof_awaiter = self.await_proof(
132 request_id.to_string(),
133 request_cancellation_token.clone(),
134 timeout_cancellation_token.clone(),
135 )?;
136
137 let (response, proof_msg) = tokio::try_join!(
139 self.send_request(req, request_cancellation_token, timeout_cancellation_token),
140 proof_awaiter
141 )
142 .map_err(|e| anyhow::anyhow!("Failed to prove the request: {}", e))?;
143
144 let subject = response?;
145 let (notary_pub_key, proof) = proof_msg?;
146
147 Ok(VerityResponse {
148 subject,
149 proof,
150 notary_pub_key,
151 })
152 }
153
154 fn send_request(
158 &self,
159 request: reqwest::RequestBuilder,
160 request_cancellation_token: CancellationToken,
161 timeout_cancellation_token: CancellationToken,
162 ) -> JoinHandle<anyhow::Result<reqwest::Response>> {
163 tokio::spawn(async move {
164 let result = request.send().await;
165 let response = result.map_err(|e| {
166 error!("{}", e);
167 e
168 })?;
169
170 if response.headers().get("T-PROOF-ID").is_some() {
173 tokio::spawn(async move {
174 tokio::time::sleep(PROOF_TIMEOUT).await;
175 timeout_cancellation_token.cancel();
176 });
177 } else {
178 request_cancellation_token.cancel();
179 return Ok(response);
180 }
181
182 Ok(response)
183 })
184 }
185
186 fn await_proof(
194 &self,
195 request_id: String,
196 request_cancellation_token: CancellationToken,
197 timeout_cancellation_token: CancellationToken,
198 ) -> anyhow::Result<JoinHandle<anyhow::Result<(String, String)>>> {
199 let url = Url::from_str(&format!("{}/proof/{}", self.config.prover_url, request_id))?;
200 let mut event_source = EventSource::get(url);
201
202 let awaiter = tokio::task::spawn(async move {
203 while let Some(event) = event_source.next().await {
204 match event {
205 Ok(Event::Open) => {}
206 Ok(Event::Message(message)) => {
207 let parts: Vec<&str> = message.data.splitn(2, "|").collect();
208 if parts.len() != 2 {
209 anyhow::bail!("Invalid proof response");
210 }
211
212 return Ok((parts[0].to_string(), parts[1].to_string()));
213 }
214 Err(err) => {
215 error!("{}", err);
216 Err(err)?;
217 }
218 }
219 }
220
221 Ok((String::from(""), String::from("")))
222 });
223
224 let join_handle = tokio::spawn(async move {
225 select! {
227 proof = awaiter => {
228 proof.unwrap()
229 }
230 () = timeout_cancellation_token.cancelled() => {
231 anyhow::bail!("Timeout reached while waiting for a proof")
232 }
233 () = request_cancellation_token.cancelled() => {
234 Ok((String::new(), String::new()))
235 }
236 }
237 });
238
239 Ok(join_handle)
240 }
241
242 pub async fn get_notary_info(&self) -> anyhow::Result<NotaryInformation> {
244 let notary_info_url = format!("{}/notaryinfo", self.config.prover_url);
245 let notary_information = reqwest::get(notary_info_url)
246 .await?
247 .json::<NotaryInformation>()
248 .await?;
249
250 Ok(notary_information)
251 }
252}