Skip to content

Commit d243ef5

Browse files
authored
v0.3.0 (#41)
* diffing kickoff * v0.3.0 on the way * Fix example. Renamings * Fix initial bootstraping && remove prints * fmt & clippy * Update example. Add drop for ReadReceiver to make sure all future wakers are consumed and called * Move all blocking ops to spawned thread * Redo async bridge API * Name things properly, remove missleading comment, snapshots are explicit now * Rebase, fmt & clippy
1 parent ed2419c commit d243ef5

15 files changed

Lines changed: 424 additions & 288 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/sync-backend/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ publish = false
66

77
[dependencies]
88
libsqlite-sys = { path = "../../libsqlite-sys" }
9-
journal = { path = "../../journal" }
9+
journal = { path = "../../journal", features = ["async"] }
1010
serde_sqlite = { path = "../../serde_sqlite" }
1111

1212
tokio = { version = "1", features = ["full"] }

examples/sync-backend/src/main.rs

Lines changed: 28 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,7 @@
22
//!
33
//! ** Strictly for the demo purposes only **
44
//! ** Known issues **:
5-
//! - It works only for single database and single client
6-
//! - There is no Sync procotol implemented here at all – direct stream of journal.
7-
//! - The server assumes the client sends only new snapshots so the local version is not checked, and it's
8-
//! possible to write the same snapshots multiple times.
9-
//! - No sanity checks that the client actually sends valid data not random garbage
10-
//! - Calling Journal::add_page directly is a hack and rewrites snapshot timestamps/numbers.
11-
//! - The Journal API doesn't allow to write headers directly (yet).
12-
//! - The Journal is experimental, it only supports blocking IO so the scheduler is blocked on the journal IO ops.
5+
//! - It works only for single database
136
//!
147
//! Run with
158
//!
@@ -19,21 +12,20 @@
1912
2013
use axum::{
2114
extract::{BodyStream, Path, State, Query},
15+
http::StatusCode,
16+
body,
2217
response,
2318
routing::get,
2419
Router, Server,
2520
};
2621
use futures::StreamExt;
27-
use journal::{Journal, Protocol, Stream};
28-
use serde_sqlite::de;
29-
use std::io::Read;
30-
use std::sync::Arc;
31-
use tokio::sync::Mutex;
22+
use journal::{Journal, AsyncReadJournalStream, AsyncWriteJournalStream};
23+
use tokio::io::AsyncWriteExt;
3224
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
3325
use serde::Deserialize;
3426

35-
fn to_error<T: std::fmt::Debug>(e: T) -> String {
36-
format!("{e:?}")
27+
fn to_error<T: std::fmt::Debug>(_e: T) -> StatusCode {
28+
StatusCode::INTERNAL_SERVER_ERROR
3729
}
3830

3931
#[derive(Debug, Default, Deserialize)]
@@ -48,52 +40,27 @@ async fn post_snapshot(
4840
State(state): State<AppState>,
4941
Path(_domain): Path<String>,
5042
mut stream: BodyStream,
51-
) -> Result<&'static str, String> {
52-
let mut whole_body = vec![];
43+
) -> Result<&'static str, StatusCode> {
44+
let mut write_stream = AsyncWriteJournalStream::new(state.journal_path).spawn();
5345
while let Some(chunk) = stream.next().await {
54-
whole_body.extend(chunk.map_err(to_error)?);
55-
}
56-
if whole_body.is_empty() {
57-
return Ok("");
58-
}
59-
let mut whole_body = std::io::Cursor::new(whole_body);
60-
let mut journal = state.journal.lock().await;
61-
62-
loop {
63-
match de::from_reader::<Protocol, _>(&mut whole_body) {
64-
Ok(Protocol::SnapshotHeader(snapshot_header)) => {
65-
journal.commit().map_err(to_error)?;
66-
journal.add_snapshot(&snapshot_header).map_err(to_error)?;
67-
tracing::info!("snapshot: {:?}", snapshot_header.id);
68-
}
69-
Ok(Protocol::PageHeader(page_header)) => {
70-
let mut page = vec![0; page_header.page_size as usize];
71-
whole_body.read_exact(page.as_mut_slice()).map_err(to_error)?;
72-
journal.add_page(&page_header, page.as_slice()).map_err(to_error)?;
73-
tracing::info!(" page: {:?}", page_header.page_num);
74-
},
75-
Ok(Protocol::EndOfStream(_)) => {
76-
journal.commit().map_err(to_error)?;
77-
tracing::info!("end of stream");
78-
break;
79-
},
80-
Err(e) => return Err(to_error(e)),
81-
}
82-
}
46+
let chunk = chunk.map_err(to_error)?;
47+
write_stream.write_all(&chunk).await.map_err(to_error)?;
48+
};
8349
Ok("OK")
8450
}
8551

8652
/// get latest knowns snapshot num
8753
async fn head_snapshot(
8854
State(state): State<AppState>,
8955
Path(_domain): Path<String>,
90-
) -> Result<impl response::IntoResponse, String> {
91-
let journal = state.journal.lock().await;
92-
let snapshot_id = match journal.current_snapshot() {
93-
Some(v) => format!("{v}"),
94-
None => "".into(),
95-
};
96-
let headers = response::AppendHeaders([("x-snapshot-id", snapshot_id)]);
56+
) -> Result<impl response::IntoResponse, StatusCode> {
57+
let res = tokio::task::spawn_blocking(move ||{
58+
let journal = Journal::try_from(state.journal_path)
59+
.or_else(|_e| Journal::create(state.journal_path))?;
60+
Ok::<_, journal::Error>(journal.get_header().snapshot_counter)
61+
});
62+
let snapshot_id = res.await.map_err(to_error)?.map_err(to_error)?;
63+
let headers = response::AppendHeaders([("x-snapshot-id", snapshot_id.to_string())]);
9764
Ok((headers, "head"))
9865
}
9966

@@ -102,30 +69,23 @@ async fn get_snapshot(
10269
State(state): State<AppState>,
10370
Path(_domain): Path<String>,
10471
params: Option<Query<Params>>,
105-
) -> Result<impl response::IntoResponse, String> {
106-
let snapshot_id: u64 = params.unwrap_or_default().snapshot_id;
107-
let mut journal = state.journal.lock().await;
108-
let iter = journal
109-
.into_iter()
110-
.skip_snapshots(snapshot_id);
111-
let mut buf = vec![];
112-
Stream::new(iter).read_to_end(&mut buf).map_err(to_error)?;
113-
Ok(buf)
72+
) -> Result<impl response::IntoResponse, StatusCode> {
73+
let stream = AsyncReadJournalStream::new(
74+
state.journal_path,
75+
params.map(|p| p.snapshot_id).unwrap_or(0)
76+
).spawn();
77+
Ok(body::StreamBody::new(tokio_util::io::ReaderStream::new(stream)))
11478
}
11579

11680
#[derive(Debug, Clone)]
11781
struct AppState {
118-
journal: Arc<Mutex<journal::Journal>>,
82+
journal_path: &'static str
11983
}
12084

12185
impl AppState {
12286
fn new() -> Self {
123-
let journal_path = "/tmp/journal";
124-
let journal = Journal::try_from(journal_path)
125-
.or_else(|_e| Journal::create(journal_path))
126-
.unwrap();
12787
Self {
128-
journal: Arc::new(Mutex::new(journal)),
88+
journal_path: "/tmp/journal"
12989
}
13090
}
13191
}

0 commit comments

Comments
 (0)