From c794920ab8824b56311a8610ff266843ca1c9d03 Mon Sep 17 00:00:00 2001 From: Carl Sverre <82591+carlsverre@users.noreply.github.com> Date: Fri, 5 Jan 2024 18:04:45 -0800 Subject: [PATCH] Server-initiated mutations Fixes #42 --- lib/sqlsync-worker/sqlsync-wasm/src/utils.rs | 1 - lib/sqlsync/examples/end-to-end-local-net.rs | 125 ++++++++++++++----- lib/sqlsync/src/coordinator.rs | 54 ++++++-- lib/sqlsync/src/db.rs | 19 ++- lib/sqlsync/src/error.rs | 14 ++- lib/sqlsync/src/journal/cursor.rs | 12 +- lib/sqlsync/src/journal/journal.rs | 20 +-- lib/sqlsync/src/journal/memory.rs | 30 ++--- lib/sqlsync/src/replication.rs | 27 ++-- lib/sqlsync/src/storage.rs | 69 ++++++---- lib/sqlsync/src/timeline.rs | 21 +--- 11 files changed, 251 insertions(+), 141 deletions(-) diff --git a/lib/sqlsync-worker/sqlsync-wasm/src/utils.rs b/lib/sqlsync-worker/sqlsync-wasm/src/utils.rs index e53df0c..b3345f5 100644 --- a/lib/sqlsync-worker/sqlsync-wasm/src/utils.rs +++ b/lib/sqlsync-worker/sqlsync-wasm/src/utils.rs @@ -95,7 +95,6 @@ impl_from_error!( io::Error, sqlsync::error::Error, sqlsync::sqlite::Error, - sqlsync::JournalError, sqlsync::replication::ReplicationError, sqlsync::JournalIdParseError, sqlsync::ReducerError, diff --git a/lib/sqlsync/examples/end-to-end-local-net.rs b/lib/sqlsync/examples/end-to-end-local-net.rs index d090364..b0b7bb5 100644 --- a/lib/sqlsync/examples/end-to-end-local-net.rs +++ b/lib/sqlsync/examples/end-to-end-local-net.rs @@ -16,7 +16,6 @@ use sqlsync::local::NoopSignal; use sqlsync::replication::ReplicationMsg; use sqlsync::replication::ReplicationProtocol; use sqlsync::JournalId; -use sqlsync::Lsn; use sqlsync::MemoryJournalFactory; use sqlsync::Reducer; @@ -141,6 +140,8 @@ fn handle_client( let mut num_steps = 0; + let mut remaining_direct_mutations = 5; + loop { let msg = receive_msg(&mut socket_reader)?; log::info!("server: received {:?}", msg); @@ -157,6 +158,31 @@ fn handle_client( log::info!("server: stepping doc (steps: {})", num_steps); unlock!(|doc| doc.step()?); + // trigger a direct increment on the server side after every message + if remaining_direct_mutations > 0 { + remaining_direct_mutations -= 1; + unlock!(|doc| { + log::info!("server: running a direct mutation on the doc"); + doc.mutate_direct(|tx| { + match tx.execute( + "INSERT INTO counter (id, value) VALUES (1, 0) + ON CONFLICT (id) DO UPDATE SET value = value + 1", + [], + ) { + Ok(_) => Ok::<_, anyhow::Error>(()), + // ignore missing table error + Err(rusqlite::Error::SqliteFailure(_, Some(msg))) + if msg == "no such table: counter" => + { + log::info!("server: skipping direct mutation"); + Ok(()) + } + Err(err) => Err(err)?, + } + })?; + }); + } + // sync back to the client if needed unlock!(|doc| { if let Some((msg, mut reader)) = protocol.sync(doc)? { @@ -219,7 +245,16 @@ fn start_client( let total_mutations = 10 as usize; let mut remaining_mutations = total_mutations; + // the total number of sync attempts we will make + let total_syncs = 100 as usize; + let mut syncs = 0; + loop { + syncs += 1; + if syncs > total_syncs { + panic!("client({}): too many syncs", timeline_id); + } + let msg = receive_msg(&mut socket_reader)?; log::info!("client({}): received {:?}", timeline_id, msg); @@ -248,25 +283,31 @@ fn start_client( } log::info!("client({}): QUERYING STATE", timeline_id); - doc.query(|conn| { - conn.query_row("select value from counter", [], |row| { - let value: Option = row.get(0)?; - log::info!( - "client({}): counter value: {:?}", - timeline_id, - value - ); - Ok(()) - })?; - - Ok::<_, anyhow::Error>(()) + let current_value = doc.query(|conn| { + let value = conn.query_row( + "select value from counter where id = 0", + [], + |row| { + let value: Option = row.get(0)?; + log::info!( + "client({}): counter value: {:?}", + timeline_id, + value + ); + Ok(value) + }, + )?; + + Ok::<_, anyhow::Error>(value) })?; - if let Some(lsn) = doc.storage_lsn() { - // once the storage has reached (total_mutations+1) * num_clients - // then we have reached the end - log::info!("client({}): storage lsn: {}", timeline_id, lsn); - if lsn >= ((total_mutations * num_clients) + 1) as Lsn { + if let Some(value) = current_value { + log::info!( + "client({}): storage lsn: {:?}", + timeline_id, + doc.storage_lsn() + ); + if value == (total_mutations * num_clients) { break; } } @@ -279,23 +320,47 @@ fn start_client( // final query, value should be total_mutations * num_clients doc.query(|conn| { - conn.query_row_and_then("select value from counter", [], |row| { - let value: Option = row.get(0)?; - log::info!( - "client({}): FINAL counter value: {:?}", - timeline_id, - value - ); - if value != Some(total_mutations * num_clients) { - return Err(anyhow::anyhow!( + conn.query_row_and_then( + "select value from counter where id = 0", + [], + |row| { + let value: Option = row.get(0)?; + log::info!( + "client({}): FINAL counter value: {:?}", + timeline_id, + value + ); + if value != Some(total_mutations * num_clients) { + return Err(anyhow::anyhow!( "client({}): counter value is incorrect: {:?}, expected {}", timeline_id, value, total_mutations * num_clients )); - } - Ok(()) - })?; + } + Ok(()) + }, + )?; + conn.query_row_and_then( + "select value from counter where id = 1", + [], + |row| { + let value: Option = row.get(0)?; + log::info!( + "client({}): FINAL server counter value: {:?}", + timeline_id, + value + ); + if value.is_none() || value == Some(0) { + return Err(anyhow::anyhow!( + "client({}): server counter value is incorrect: {:?}, expected non-zero value", + timeline_id, + value, + )); + } + Ok(()) + }, + )?; Ok::<_, anyhow::Error>(()) })?; diff --git a/lib/sqlsync/src/coordinator.rs b/lib/sqlsync/src/coordinator.rs index c86b26a..ee03e43 100644 --- a/lib/sqlsync/src/coordinator.rs +++ b/lib/sqlsync/src/coordinator.rs @@ -1,19 +1,24 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; +use std::convert::From; use std::fmt::Debug; use std::io; -use crate::db::{open_with_vfs, ConnectionPair}; +use rusqlite::Transaction; + +use crate::db::{open_with_vfs, run_in_tx, ConnectionPair}; use crate::error::Result; use crate::reducer::Reducer; -use crate::replication::{ReplicationDestination, ReplicationError, ReplicationSource}; +use crate::replication::{ + ReplicationDestination, ReplicationError, ReplicationSource, +}; use crate::timeline::{apply_timeline_range, run_timeline_migration}; +use crate::Lsn; use crate::{ journal::{Journal, JournalFactory, JournalId}, lsn::LsnRange, storage::Storage, }; -use crate::{JournalError, Lsn}; struct ReceiveQueueEntry { id: JournalId, @@ -44,10 +49,11 @@ impl CoordinatorDocument { timeline_factory: J::Factory, reducer_wasm_bytes: &[u8], ) -> Result { - let (mut sqlite, storage) = open_with_vfs(storage)?; + let (mut sqlite, mut storage) = open_with_vfs(storage)?; // TODO: this feels awkward here run_timeline_migration(&mut sqlite.readwrite)?; + storage.commit()?; Ok(Self { reducer: Reducer::new(reducer_wasm_bytes)?, @@ -62,10 +68,12 @@ impl CoordinatorDocument { fn get_or_create_timeline_mut( &mut self, id: JournalId, - ) -> std::result::Result<&mut J, JournalError> { + ) -> io::Result<&mut J> { match self.timelines.entry(id) { Entry::Occupied(entry) => Ok(entry.into_mut()), - Entry::Vacant(entry) => Ok(entry.insert(self.timeline_factory.open(id)?)), + Entry::Vacant(entry) => { + Ok(entry.insert(self.timeline_factory.open(id)?)) + } } } @@ -89,12 +97,26 @@ impl CoordinatorDocument { } } + pub fn mutate_direct(&mut self, f: F) -> Result<(), E> + where + F: FnOnce(&mut Transaction) -> Result<(), E>, + E: From + From, + { + run_in_tx(&mut self.sqlite.readwrite, f)?; + self.storage.commit()?; + Ok(()) + } + pub fn step(&mut self) -> Result<()> { // check to see if we have anything in the receive queue let entry = self.timeline_receive_queue.pop_front(); if let Some(entry) = entry { - log::debug!("applying range {} to timeline {}", entry.range, entry.id); + log::debug!( + "applying range {} to timeline {}", + entry.range, + entry.id + ); // get the timeline let timeline = self @@ -119,7 +141,9 @@ impl CoordinatorDocument { } /// CoordinatorDocument knows how to replicate it's storage journal -impl ReplicationSource for CoordinatorDocument { +impl ReplicationSource + for CoordinatorDocument +{ type Reader<'a> = ::Reader<'a> where Self: 'a; @@ -132,14 +156,22 @@ impl ReplicationSource for CoordinatorDocument(&'a self, lsn: crate::Lsn) -> io::Result>> { + fn read_lsn<'a>( + &'a self, + lsn: crate::Lsn, + ) -> io::Result>> { self.storage.read_lsn(lsn) } } /// CoordinatorDocument knows how to receive timeline journals from elsewhere -impl ReplicationDestination for CoordinatorDocument { - fn range(&mut self, id: JournalId) -> std::result::Result { +impl ReplicationDestination + for CoordinatorDocument +{ + fn range( + &mut self, + id: JournalId, + ) -> std::result::Result { let timeline = self.get_or_create_timeline_mut(id)?; ReplicationDestination::range(timeline, id) } diff --git a/lib/sqlsync/src/db.rs b/lib/sqlsync/src/db.rs index e99e70c..1542058 100644 --- a/lib/sqlsync/src/db.rs +++ b/lib/sqlsync/src/db.rs @@ -1,6 +1,8 @@ +use std::convert::From; + use rusqlite::{ hooks::{AuthAction, AuthContext, Authorization}, - Connection, OpenFlags, + Connection, OpenFlags, Transaction, }; use sqlite_vfs::FilePtr; @@ -13,11 +15,9 @@ pub struct ConnectionPair { pub readonly: Connection, } -type Result = std::result::Result; - pub fn open_with_vfs( journal: J, -) -> Result<(ConnectionPair, Box>)> { +) -> rusqlite::Result<(ConnectionPair, Box>)> { let mut storage = Box::new(Storage::new(journal)); let storage_ptr = FilePtr::new(&mut storage); @@ -68,3 +68,14 @@ pub fn open_with_vfs( storage, )) } + +pub fn run_in_tx(sqlite: &mut Connection, f: F) -> Result<(), E> +where + F: FnOnce(&mut Transaction) -> Result<(), E>, + E: From, +{ + let mut txn = sqlite.transaction()?; + f(&mut txn)?; // will cause a rollback on failure + txn.commit()?; + Ok(()) +} diff --git a/lib/sqlsync/src/error.rs b/lib/sqlsync/src/error.rs index 5ed79ba..e9f7c98 100644 --- a/lib/sqlsync/src/error.rs +++ b/lib/sqlsync/src/error.rs @@ -1,8 +1,10 @@ +use std::io; + use thiserror::Error; use crate::{ - reducer::ReducerError, replication::ReplicationError, timeline::TimelineError, JournalError, - JournalIdParseError, + reducer::ReducerError, replication::ReplicationError, + timeline::TimelineError, JournalIdParseError, }; #[derive(Error, Debug)] @@ -10,9 +12,6 @@ pub enum Error { #[error(transparent)] ReplicationError(#[from] ReplicationError), - #[error(transparent)] - JournalError(#[from] JournalError), - #[error(transparent)] JournalIdParseError(#[from] JournalIdParseError), @@ -24,6 +23,9 @@ pub enum Error { #[error(transparent)] SqliteError(#[from] rusqlite::Error), + + #[error("io error: {0}")] + IoError(#[from] io::Error), } -pub type Result = std::result::Result; +pub type Result = std::result::Result; diff --git a/lib/sqlsync/src/journal/cursor.rs b/lib/sqlsync/src/journal/cursor.rs index 1cd7933..efaa495 100644 --- a/lib/sqlsync/src/journal/cursor.rs +++ b/lib/sqlsync/src/journal/cursor.rs @@ -28,11 +28,7 @@ pub struct Cursor<'a, S: Scannable, I> { impl<'a, S: Scannable, I: DoubleEndedIterator> Cursor<'a, S, I> { pub fn new(inner: &'a S, lsn_iter: I) -> Self { - Self { - inner, - lsn_iter, - state: None, - } + Self { inner, lsn_iter, state: None } } /// advance the cursor @@ -60,11 +56,7 @@ impl<'a, S: Scannable, I: DoubleEndedIterator> Cursor<'a, S, I> { /// reverse this cursor pub fn into_rev(self) -> Cursor<'a, S, Rev> { - Cursor { - inner: self.inner, - lsn_iter: self.lsn_iter.rev(), - state: None, - } + Cursor { inner: self.inner, lsn_iter: self.lsn_iter.rev(), state: None } } } diff --git a/lib/sqlsync/src/journal/journal.rs b/lib/sqlsync/src/journal/journal.rs index fb2ca4d..38086ff 100644 --- a/lib/sqlsync/src/journal/journal.rs +++ b/lib/sqlsync/src/journal/journal.rs @@ -1,8 +1,5 @@ use std::fmt::Debug; use std::io; -use std::result::Result; - -use thiserror::Error; use crate::Serializable; use crate::{ @@ -12,17 +9,6 @@ use crate::{ use super::Scannable; -#[derive(Error, Debug)] -pub enum JournalError { - #[error("io error: {0}")] - IoError(#[from] io::Error), - - #[error("failed to serialize object")] - SerializationError(#[source] io::Error), -} - -pub type JournalResult = Result; - pub trait Journal: Scannable + Debug + Sized { type Factory: JournalFactory; @@ -33,12 +19,12 @@ pub trait Journal: Scannable + Debug + Sized { fn range(&self) -> LsnRange; /// append a new journal entry, and then write to it - fn append(&mut self, obj: impl Serializable) -> JournalResult<()>; + fn append(&mut self, obj: impl Serializable) -> io::Result<()>; /// drop the journal's prefix - fn drop_prefix(&mut self, up_to: Lsn) -> JournalResult<()>; + fn drop_prefix(&mut self, up_to: Lsn) -> io::Result<()>; } pub trait JournalFactory { - fn open(&self, id: JournalId) -> JournalResult; + fn open(&self, id: JournalId) -> io::Result; } diff --git a/lib/sqlsync/src/journal/memory.rs b/lib/sqlsync/src/journal/memory.rs index 637a21a..cd78f11 100644 --- a/lib/sqlsync/src/journal/memory.rs +++ b/lib/sqlsync/src/journal/memory.rs @@ -2,10 +2,12 @@ use std::fmt::{Debug, Formatter}; use std::io; use crate::lsn::{Lsn, LsnIter, LsnRange}; -use crate::{JournalError, JournalFactory, Serializable}; +use crate::{JournalFactory, Serializable}; -use super::{Cursor, Journal, JournalId, JournalResult, Scannable}; -use crate::replication::{ReplicationDestination, ReplicationError, ReplicationSource}; +use super::{Cursor, Journal, JournalId, Scannable}; +use crate::replication::{ + ReplicationDestination, ReplicationError, ReplicationSource, +}; pub struct MemoryJournal { id: JournalId, @@ -23,19 +25,15 @@ impl Debug for MemoryJournal { } impl MemoryJournal { - pub fn open(id: JournalId) -> JournalResult { - Ok(MemoryJournal { - id, - range: LsnRange::empty(), - data: vec![], - }) + pub fn open(id: JournalId) -> io::Result { + Ok(MemoryJournal { id, range: LsnRange::empty(), data: vec![] }) } } pub struct MemoryJournalFactory; impl JournalFactory for MemoryJournalFactory { - fn open(&self, id: JournalId) -> JournalResult { + fn open(&self, id: JournalId) -> io::Result { MemoryJournal::open(id) } } @@ -51,11 +49,10 @@ impl Journal for MemoryJournal { self.range } - fn append(&mut self, obj: impl Serializable) -> JournalResult<()> { + fn append(&mut self, obj: impl Serializable) -> io::Result<()> { // serialize the entry let mut entry: Vec = Vec::new(); - obj.serialize_into(&mut entry) - .map_err(|err| JournalError::SerializationError(err))?; + obj.serialize_into(&mut entry)?; // update the journal self.data.push(entry); @@ -64,7 +61,7 @@ impl Journal for MemoryJournal { Ok(()) } - fn drop_prefix(&mut self, up_to: Lsn) -> JournalResult<()> { + fn drop_prefix(&mut self, up_to: Lsn) -> io::Result<()> { let remaining_range = self.range.trim_prefix(up_to); let offsets = self.range.intersection_offsets(&remaining_range); self.data = self.data[offsets].to_vec(); @@ -108,7 +105,10 @@ impl ReplicationSource for MemoryJournal { self.range() } - fn read_lsn<'a>(&'a self, lsn: Lsn) -> io::Result>> { + fn read_lsn<'a>( + &'a self, + lsn: Lsn, + ) -> io::Result>> { match self.range.offset(lsn) { None => Ok(None), Some(offset) => Ok(Some(&self.data[offset][..])), diff --git a/lib/sqlsync/src/replication.rs b/lib/sqlsync/src/replication.rs index e17720e..6341442 100644 --- a/lib/sqlsync/src/replication.rs +++ b/lib/sqlsync/src/replication.rs @@ -3,7 +3,7 @@ use std::{cmp, io}; use serde::{Deserialize, Serialize}; use thiserror::Error; -use crate::{lsn::LsnRange, positioned_io::PositionedReader, JournalError, JournalId, Lsn}; +use crate::{lsn::LsnRange, positioned_io::PositionedReader, JournalId, Lsn}; // maximum number of frames we will send without receiving an acknowledgement // note: this does not affect durability, as we keep don't truncate the source journal until rebase @@ -32,9 +32,6 @@ pub enum ReplicationError { #[error("unknown journal id: {0}")] UnknownJournal(JournalId), - #[error(transparent)] - JournalError(#[from] JournalError), - #[error( "replication must be contiguous, received lsn {received} but expected lsn in range {range}" )] @@ -57,7 +54,10 @@ impl ReplicationProtocol { pub fn start(&self, doc: &D) -> ReplicationMsg { // before we can start sending frames to the destination, we need to know // what frames the destination already has - ReplicationMsg::RangeRequest { id: doc.source_id(), source_range: doc.source_range() } + ReplicationMsg::RangeRequest { + id: doc.source_id(), + source_range: doc.source_range(), + } } /// initialized returns true if we have received a response to our initial range request @@ -86,7 +86,11 @@ impl ReplicationProtocol { // send frame return Ok(Some(( - ReplicationMsg::Frame { id: doc.source_id(), lsn, len: data.size()? as u64 }, + ReplicationMsg::Frame { + id: doc.source_id(), + lsn, + len: data.size()? as u64, + }, data, ))); } @@ -123,14 +127,18 @@ impl ReplicationProtocol { // subsequent range response, update outstanding range |outstanding_range| { let next = range.next(); - assert!(next > 0, "subsequent range responses should never be empty"); + assert!( + next > 0, + "subsequent range responses should never be empty" + ); Some(outstanding_range.trim_prefix(next - 1)) }, ); Ok(None) } ReplicationMsg::Frame { id, lsn, len } => { - let mut reader = LimitedReader { limit: len, inner: connection }; + let mut reader = + LimitedReader { limit: len, inner: connection }; doc.write_lsn(id, lsn, &mut reader)?; Ok(Some(ReplicationMsg::Range { range: doc.range(id)? })) } @@ -150,7 +158,8 @@ pub trait ReplicationSource { fn source_range(&self) -> LsnRange; /// read the given lsn from the source journal if it exists - fn read_lsn<'a>(&'a self, lsn: Lsn) -> io::Result>>; + fn read_lsn<'a>(&'a self, lsn: Lsn) + -> io::Result>>; } pub trait ReplicationDestination { diff --git a/lib/sqlsync/src/storage.rs b/lib/sqlsync/src/storage.rs index c46fa2e..192538f 100644 --- a/lib/sqlsync/src/storage.rs +++ b/lib/sqlsync/src/storage.rs @@ -9,7 +9,7 @@ use crate::{ lsn::LsnRange, page::{Page, PageIdx}, replication::{ReplicationDestination, ReplicationSource}, - JournalResult, Lsn, + Lsn, }; // Useful SQLite header offsets @@ -82,12 +82,13 @@ impl Storage { self.visible_lsn_range.last() < self.journal.range().last() } - pub fn commit(&mut self) -> JournalResult<()> { + pub fn commit(&mut self) -> io::Result<()> { if self.pending.num_pages() > 0 { self.journal.append(std::mem::take(&mut self.pending))?; // calculate the LsnRange between the current visible range and the committed range - let new_lsns = self.journal.range().difference(&self.visible_lsn_range); + let new_lsns = + self.journal.range().difference(&self.visible_lsn_range); // clear the changed pages list (update_changed_root_pages will scan the new lsns) self.changed_pages.clear(); @@ -102,7 +103,7 @@ impl Storage { Ok(()) } - pub fn reset(&mut self) -> JournalResult<()> { + pub fn reset(&mut self) -> io::Result<()> { // mark every page in pending as changed to ensure that we re-run queries that depended on the results of something in pending self.changed_pages = self.pending.page_idxs().copied().collect(); @@ -126,7 +127,7 @@ impl Storage { /// update_changed_root_pages does two things /// 1. it scans the journal, updating changed_root_pages for each frame /// 2. it updates changed_root_pages for every page in self.changed_pages - fn update_changed_root_pages(&mut self, range: LsnRange) -> JournalResult<()> { + fn update_changed_root_pages(&mut self, range: LsnRange) -> io::Result<()> { // scan the journal, updating changed_root_pages for each frame let mut cursor = self.journal.scan_range(range); while cursor.advance()? { @@ -135,9 +136,11 @@ impl Storage { for page_idx in pages.page_idxs()?.iter() { // we need to resolve each page_idx to it's root page by only // looking at ptrmap pages that existed as of this lsn - if let Some(root_page_idx) = - self.resolve_root_page(LsnRange::new(0, lsn), false, *page_idx)? - { + if let Some(root_page_idx) = self.resolve_root_page( + LsnRange::new(0, lsn), + false, + *page_idx, + )? { self.changed_root_pages.insert(root_page_idx); } } @@ -168,7 +171,7 @@ impl Storage { range: LsnRange, include_pending: bool, page_idx: PageIdx, - ) -> JournalResult> { + ) -> io::Result> { const PENDING_BYTE_PAGE_IDX: u64 = (0x40000000 / (PAGESIZE as u64)) + 1; // XXX: SQLSync does not currently support SQLite extensions, so we @@ -184,7 +187,8 @@ impl Storage { // effectively taking into account the ptrmap page itself // math mostly copied from: // https://github.com/sqlite/sqlite/blob/1eca330a08e18fd0930491302802141f5ce6298e/src/btree.c#L989C1-L1001C2 - const PAGES_PER_PTRMAP: u64 = (USABLE_PAGE_SIZE / PTRMAP_ENTRY_SIZE) + 1; + const PAGES_PER_PTRMAP: u64 = + (USABLE_PAGE_SIZE / PTRMAP_ENTRY_SIZE) + 1; if page_idx == 1 { // page 1 is the schema root page @@ -213,13 +217,25 @@ impl Storage { } // calculate the offset of the page_idx within the ptrmap page - let page_idx_offset = (page_idx - ptrmap_page_idx - 1) * PTRMAP_ENTRY_SIZE; + let page_idx_offset = + (page_idx - ptrmap_page_idx - 1) * PTRMAP_ENTRY_SIZE; // convert the relative offset to an absolute offset within the file - let page_idx_pos = ((ptrmap_page_idx - 1) * (PAGESIZE as u64)) + page_idx_offset; + let page_idx_pos = + ((ptrmap_page_idx - 1) * (PAGESIZE as u64)) + page_idx_offset; // read the ptrmap_entry for this page - self.read_at_range(range, include_pending, page_idx_pos, &mut ptrmap_entry)?; + self.read_at_range( + range, + include_pending, + page_idx_pos, + &mut ptrmap_entry, + )?; match ptrmap_entry[0] { + 0 => { + // page is missing, this can happen while we are rebasing + // right after we create a local table or index (for example) + return Ok(None); + } 1 => { // page is a b-tree root page // return the page_idx @@ -242,7 +258,7 @@ impl Storage { } } - fn schema_cookie(&self) -> JournalResult { + fn schema_cookie(&self) -> io::Result { let mut buf = [0; 4]; self.read_at_range( self.visible_lsn_range, @@ -256,10 +272,11 @@ impl Storage { pub fn has_changes(&self) -> bool { // it's not possible for the schema to change without also modifying pages // so we don't have to check the schema cookie here - return self.changed_pages.len() > 0 || self.changed_root_pages.len() > 0; + return self.changed_pages.len() > 0 + || self.changed_root_pages.len() > 0; } - pub fn changes(&mut self) -> JournalResult { + pub fn changes(&mut self) -> io::Result { // check to see if the schema has changed let schema_cookie = self.schema_cookie()?; if schema_cookie != self.last_schema_cookie { @@ -280,7 +297,8 @@ impl Storage { self.update_changed_root_pages(LsnRange::empty())?; // gather changed root pages into sorted vec - let mut root_pages_sorted: Vec<_> = self.changed_root_pages.iter().copied().collect(); + let mut root_pages_sorted: Vec<_> = + self.changed_root_pages.iter().copied().collect(); root_pages_sorted.sort(); // reset variables @@ -325,7 +343,8 @@ impl Storage { { // if pos = 0, then this should be FILE_CHANGE_COUNTER_OFFSET // if pos = FILE_CHANGE_COUNTER_OFFSET, this this should be 0 - let file_change_buf_offset = FILE_CHANGE_COUNTER_OFFSET - page_offset; + let file_change_buf_offset = + FILE_CHANGE_COUNTER_OFFSET - page_offset; buf[file_change_buf_offset..(file_change_buf_offset + 4)] .copy_from_slice(&self.file_change_counter.to_be_bytes()); @@ -351,7 +370,10 @@ impl ReplicationSource for Storage { self.journal.source_range() } - fn read_lsn<'a>(&'a self, lsn: crate::Lsn) -> io::Result>> { + fn read_lsn<'a>( + &'a self, + lsn: crate::Lsn, + ) -> io::Result>> { self.journal.read_lsn(lsn) } } @@ -386,7 +408,8 @@ impl sqlite_vfs::File for Storage { let mut cursor = self.journal.scan_range(self.visible_lsn_range); while cursor.advance().map_err(|_| SQLITE_IOERR)? { let pages = SerializedPagesReader(&cursor); - max_page_idx = max_page_idx.max(Some(pages.max_page_idx().map_err(|_| SQLITE_IOERR)?)); + max_page_idx = max_page_idx + .max(Some(pages.max_page_idx().map_err(|_| SQLITE_IOERR)?)); } Ok(max_page_idx @@ -418,7 +441,11 @@ impl sqlite_vfs::File for Storage { Ok(buf.len()) } - fn read(&mut self, pos: u64, buf: &mut [u8]) -> sqlite_vfs::VfsResult { + fn read( + &mut self, + pos: u64, + buf: &mut [u8], + ) -> sqlite_vfs::VfsResult { self.read_at_range(self.visible_lsn_range, true, pos, buf) .map_err(|_| SQLITE_IOERR) } diff --git a/lib/sqlsync/src/timeline.rs b/lib/sqlsync/src/timeline.rs index 206e572..96cf364 100644 --- a/lib/sqlsync/src/timeline.rs +++ b/lib/sqlsync/src/timeline.rs @@ -1,14 +1,14 @@ use std::io; -use rusqlite::{named_params, Connection, Transaction}; +use rusqlite::{named_params, Connection}; use thiserror::Error; use crate::{ + db::run_in_tx, journal::Journal, lsn::{Lsn, LsnRange}, positioned_io::PositionedReader, reducer::{Reducer, ReducerError}, - JournalError, }; const TIMELINES_TABLE_SQL: &str = " @@ -38,25 +38,12 @@ pub enum TimelineError { #[error(transparent)] Sqlite(#[from] rusqlite::Error), - #[error(transparent)] - JournalError(#[from] JournalError), - #[error(transparent)] ReducerError(#[from] ReducerError), } type Result = std::result::Result; -fn run_in_tx(sqlite: &mut Connection, f: F) -> Result<()> -where - F: FnOnce(&mut Transaction) -> Result<()>, -{ - let mut txn = sqlite.transaction()?; - f(&mut txn)?; // will cause a rollback on failure - txn.commit()?; - Ok(()) -} - pub fn run_timeline_migration(sqlite: &mut Connection) -> Result<()> { sqlite.execute(TIMELINES_TABLE_SQL, [])?; Ok(()) @@ -68,7 +55,7 @@ pub fn apply_mutation( reducer: &mut Reducer, mutation: &[u8], ) -> Result<()> { - run_in_tx(sqlite, |tx| Ok(reducer.apply(tx, &mutation)?))?; + run_in_tx(sqlite, |tx| reducer.apply(tx, &mutation))?; timeline.append(mutation)?; Ok(()) } @@ -103,7 +90,7 @@ pub fn rebase_timeline( let mutation = cursor.read_all()?; reducer.apply(tx, &mutation)?; } - Ok(()) + Ok::<_, TimelineError>(()) })?; Ok(())