diff --git a/heed/src/db/uniform.rs b/heed/src/db/database.rs similarity index 99% rename from heed/src/db/uniform.rs rename to heed/src/db/database.rs index 6cbd2548..6f31a8da 100644 --- a/heed/src/db/uniform.rs +++ b/heed/src/db/database.rs @@ -114,13 +114,13 @@ use crate::*; /// wtxn.commit()?; /// # Ok(()) } /// ``` -pub struct Database { - pub(crate) dyndb: PolyDatabase, - marker: marker::PhantomData<(KC, DC)>, +pub struct Database<'t, KC, DC> { + pub(crate) dyndb: PolyDatabase<'t>, + pub(crate) marker: marker::PhantomData<(KC, DC)>, } -impl Database { - pub(crate) fn new(env_ident: usize, dbi: ffi::MDB_dbi) -> Database { +impl Database<'_, KC, DC> { + pub(crate) fn new<'t>(env_ident: usize, dbi: ffi::MDB_dbi) -> Database<'t, KC, DC> { Database { dyndb: PolyDatabase::new(env_ident, dbi), marker: std::marker::PhantomData } } @@ -1604,15 +1604,15 @@ impl Database { } } -impl Clone for Database { - fn clone(&self) -> Database { +impl<'t, KC, DC> Clone for Database<'t, KC, DC> { + fn clone(&self) -> Database<'t, KC, DC> { Database { dyndb: self.dyndb, marker: marker::PhantomData } } } -impl Copy for Database {} +impl Copy for Database<'_, KC, DC> {} -impl fmt::Debug for Database { +impl fmt::Debug for Database<'_, KC, DC> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Database") .field("key_codec", &any::type_name::()) diff --git a/heed/src/db/mod.rs b/heed/src/db/mod.rs index ca6056a2..26c56881 100644 --- a/heed/src/db/mod.rs +++ b/heed/src/db/mod.rs @@ -1,5 +1,5 @@ -mod polymorph; -mod uniform; +mod database; +mod poly_database; -pub use self::polymorph::PolyDatabase; -pub use self::uniform::Database; +pub use self::database::Database; +pub use self::poly_database::PolyDatabase; diff --git a/heed/src/db/polymorph.rs b/heed/src/db/poly_database.rs similarity index 99% rename from heed/src/db/polymorph.rs rename to heed/src/db/poly_database.rs index cc918bc8..ec48d652 100644 --- a/heed/src/db/polymorph.rs +++ b/heed/src/db/poly_database.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; use std::ops::{Bound, RangeBounds}; -use std::{fmt, mem, ptr}; +use std::{fmt, marker, mem, ptr}; use crate::mdb::error::mdb_result; use crate::mdb::ffi; @@ -103,14 +103,15 @@ use crate::*; /// # Ok(()) } /// ``` #[derive(Copy, Clone)] -pub struct PolyDatabase { +pub struct PolyDatabase<'t> { pub(crate) env_ident: usize, + pub(crate) txn: marker::PhantomData<&'t ()>, pub(crate) dbi: ffi::MDB_dbi, } -impl PolyDatabase { - pub(crate) fn new(env_ident: usize, dbi: ffi::MDB_dbi) -> PolyDatabase { - PolyDatabase { env_ident, dbi } +impl PolyDatabase<'_> { + pub(crate) fn new<'t>(env_ident: usize, dbi: ffi::MDB_dbi) -> PolyDatabase<'t> { + PolyDatabase { env_ident, txn: marker::PhantomData, dbi } } /// Retrieves the value associated with a key. @@ -1856,7 +1857,7 @@ impl PolyDatabase { } } -impl fmt::Debug for PolyDatabase { +impl fmt::Debug for PolyDatabase<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("PolyDatabase").finish() } diff --git a/heed/src/env.rs b/heed/src/env.rs index 5bab61fa..e67568be 100644 --- a/heed/src/env.rs +++ b/heed/src/env.rs @@ -284,7 +284,7 @@ pub fn env_closing_event>(path: P) -> Option { /// An environment handle constructed by using [`EnvOpenOptions`]. #[derive(Clone)] -pub struct Env(Arc); +pub struct Env(pub(crate) Arc); impl fmt::Debug for Env { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -295,7 +295,9 @@ impl fmt::Debug for Env { struct EnvInner { env: *mut ffi::MDB_env, - dbi_open_mutex: sync::Mutex>>, + /// The first state of a database is `None` until an [`open_database`] + /// method is called to define the type for the first time. + pub(crate) dbi_open_mutex: sync::Mutex, (u32, Option)>>, path: PathBuf, } @@ -320,6 +322,16 @@ impl Drop for EnvInner { } } +/// The type of the database. +// TODO replace the UnknownYet by an Option in the HashMap +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum DatabaseType { + /// Defines the types of a [`Database`]. + Typed { key_type: TypeId, data_type: TypeId }, + /// Defines the types of a [`PolyDatabase`]. + Untyped, +} + /// Whether to perform compaction while copying an environment. #[derive(Debug, Copy, Clone)] pub enum CompactionOption { @@ -431,7 +443,7 @@ impl Env { size += compute_size(stat); // if the db wasn’t already opened - if !dbi_open.contains_key(&dbi) { + if !dbi_open.contains_key(&Some(key)) { unsafe { ffi::mdb_dbi_close(self.env_mut_ptr(), dbi) } } } @@ -440,148 +452,105 @@ impl Env { Ok(size) } - /// Opens a typed database that already exists in this environment. - /// - /// If the database was previously opened in this program run, types will be checked. - /// - /// ## Important Information - /// - /// LMDB have an important restriction on the unnamed database when named ones are opened, - /// the names of the named databases are stored as keys in the unnamed one and are immutable, - /// these keys can only be read and not written. - pub fn open_database( - &self, - rtxn: &RoTxn, - name: Option<&str>, - ) -> Result>> - where - KC: 'static, - DC: 'static, - { - assert_eq_env_txn!(self, rtxn); - - let types = (TypeId::of::(), TypeId::of::()); - match self.raw_init_database(rtxn.txn, name, Some(types), false) { - Ok(dbi) => Ok(Some(Database::new(self.env_mut_ptr() as _, dbi))), - Err(Error::Mdb(e)) if e.not_found() => Ok(None), - Err(e) => Err(e), - } - } - - /// Opens an untyped database that already exists in this environment. - /// - /// If the database was previously opened as a typed one, an error will be returned. - /// - /// ## Important Information - /// - /// LMDB have an important restriction on the unnamed database when named ones are opened, - /// the names of the named databases are stored as keys in the unnamed one and are immutable, - /// these keys can only be read and not written. - pub fn open_poly_database( - &self, - rtxn: &RoTxn, - name: Option<&str>, - ) -> Result> { - assert_eq_env_txn!(self, rtxn); - - match self.raw_init_database(rtxn.txn, name, None, false) { - Ok(dbi) => Ok(Some(PolyDatabase::new(self.env_mut_ptr() as _, dbi))), - Err(Error::Mdb(e)) if e.not_found() => Ok(None), - Err(e) => Err(e), - } - } - - /// Creates a typed database that can already exist in this environment. - /// - /// If the database was previously opened in this program run, types will be checked. - /// - /// ## Important Information - /// - /// LMDB have an important restriction on the unnamed database when named ones are opened, - /// the names of the named databases are stored as keys in the unnamed one and are immutable, - /// these keys can only be read and not written. - pub fn create_database( - &self, - wtxn: &mut RwTxn, - name: Option<&str>, - ) -> Result> - where - KC: 'static, - DC: 'static, - { - assert_eq_env_txn!(self, wtxn); - - let types = (TypeId::of::(), TypeId::of::()); - match self.raw_init_database(wtxn.txn.txn, name, Some(types), true) { - Ok(dbi) => Ok(Database::new(self.env_mut_ptr() as _, dbi)), - Err(e) => Err(e), - } - } - - /// Creates a typed database that can already exist in this environment. - /// - /// If the database was previously opened as a typed one, an error will be returned. - /// - /// ## Important Information - /// - /// LMDB have an important restriction on the unnamed database when named ones are opened, - /// the names of the named databases are stored as keys in the unnamed one and are immutable, - /// these keys can only be read and not written. - pub fn create_poly_database( - &self, - wtxn: &mut RwTxn, - name: Option<&str>, - ) -> Result { - assert_eq_env_txn!(self, wtxn); - - match self.raw_init_database(wtxn.txn.txn, name, None, true) { - Ok(dbi) => Ok(PolyDatabase::new(self.env_mut_ptr() as _, dbi)), - Err(e) => Err(e), - } - } - - fn raw_open_dbi( - &self, - raw_txn: *mut ffi::MDB_txn, - name: Option<&str>, - flags: u32, - ) -> std::result::Result { - let mut dbi = 0; - let name = name.map(|n| CString::new(n).unwrap()); - let name_ptr = match name { - Some(ref name) => name.as_bytes_with_nul().as_ptr() as *const _, - None => ptr::null(), - }; - - // safety: The name cstring is cloned by LMDB, we can drop it after. - // If a read-only is used with the MDB_CREATE flag, LMDB will throw an error. - unsafe { mdb_result(ffi::mdb_dbi_open(raw_txn, name_ptr, flags, &mut dbi))? }; - - Ok(dbi) - } - - fn raw_init_database( - &self, - raw_txn: *mut ffi::MDB_txn, - name: Option<&str>, - types: Option<(TypeId, TypeId)>, - create: bool, - ) -> Result { - let mut lock = self.0.dbi_open_mutex.lock().unwrap(); - - let flags = if create { ffi::MDB_CREATE } else { 0 }; - match self.raw_open_dbi(raw_txn, name, flags) { - Ok(dbi) => { - let old_types = lock.entry(dbi).or_insert(types); - if *old_types == types { - Ok(dbi) - } else { - Err(Error::InvalidDatabaseTyping) - } - } - Err(e) => Err(e.into()), - } - } + // /// Opens a typed database that already exists in this environment. + // /// + // /// If the database was previously opened in this program run, types will be checked. + // /// + // /// ## Important Information + // /// + // /// LMDB have an important restriction on the unnamed database when named ones are opened, + // /// the names of the named databases are stored as keys in the unnamed one and are immutable, + // /// these keys can only be read and not written. + // pub fn open_database<'t, KC, DC>( + // &self, + // rtxn: &'t RoTxn, + // name: Option<&str>, + // ) -> Result>> + // where + // KC: 'static, + // DC: 'static, + // { + // assert_eq_env_txn!(self, rtxn); + + // let types = (TypeId::of::(), TypeId::of::()); + // match self.raw_init_database(rtxn.txn, name, Some(types), false) { + // Ok(dbi) => Ok(Some(Database::new(self.env_mut_ptr() as _, dbi))), + // Err(Error::Mdb(e)) if e.not_found() => Ok(None), + // Err(e) => Err(e), + // } + // } + + // /// Opens an untyped database that already exists in this environment. + // /// + // /// If the database was previously opened as a typed one, an error will be returned. + // /// + // /// ## Important Information + // /// + // /// LMDB have an important restriction on the unnamed database when named ones are opened, + // /// the names of the named databases are stored as keys in the unnamed one and are immutable, + // /// these keys can only be read and not written. + // pub fn open_poly_database<'t>( + // &self, + // rtxn: &'t RoTxn, + // name: Option<&str>, + // ) -> Result>> { + // assert_eq_env_txn!(self, rtxn); + + // match self.raw_init_database(rtxn.txn, name, None, false) { + // Ok(dbi) => Ok(Some(PolyDatabase::new(self.env_mut_ptr() as _, dbi))), + // Err(Error::Mdb(e)) if e.not_found() => Ok(None), + // Err(e) => Err(e), + // } + // } + + // /// Creates a typed database that can already exist in this environment. + // /// + // /// If the database was previously opened in this program run, types will be checked. + // /// + // /// ## Important Information + // /// + // /// LMDB have an important restriction on the unnamed database when named ones are opened, + // /// the names of the named databases are stored as keys in the unnamed one and are immutable, + // /// these keys can only be read and not written. + // pub fn create_database<'t, KC, DC>( + // &self, + // wtxn: &'t mut RwTxn, + // name: Option<&str>, + // ) -> Result> + // where + // KC: 'static, + // DC: 'static, + // { + // assert_eq_env_txn!(self, wtxn); + + // let types = (TypeId::of::(), TypeId::of::()); + // match self.raw_init_database(wtxn.txn.txn, name, Some(types), true) { + // Ok(dbi) => Ok(Database::new(self.env_mut_ptr() as _, dbi)), + // Err(e) => Err(e), + // } + // } + + // /// Creates a typed database that can already exist in this environment. + // /// + // /// If the database was previously opened as a typed one, an error will be returned. + // /// + // /// ## Important Information + // /// + // /// LMDB have an important restriction on the unnamed database when named ones are opened, + // /// the names of the named databases are stored as keys in the unnamed one and are immutable, + // /// these keys can only be read and not written. + // pub fn create_poly_database<'t>( + // &self, + // wtxn: &'t mut RwTxn, + // name: Option<&str>, + // ) -> Result> { + // assert_eq_env_txn!(self, wtxn); + + // match self.raw_init_database(wtxn.txn.txn, name, None, true) { + // Ok(dbi) => Ok(PolyDatabase::new(self.env_mut_ptr() as _, dbi)), + // Err(e) => Err(e), + // } + // } /// Create a transaction with read and write access for use with the environment. pub fn write_txn(&self) -> Result { diff --git a/heed/src/txn.rs b/heed/src/txn.rs index 1c4364d2..c4c29c1f 100644 --- a/heed/src/txn.rs +++ b/heed/src/txn.rs @@ -1,9 +1,14 @@ +use std::any::TypeId; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::ffi::CString; use std::ops::Deref; -use std::ptr; +use std::{ptr, sync}; +use crate::env::DatabaseType::{self, Typed, Untyped}; use crate::mdb::error::mdb_result; use crate::mdb::ffi; -use crate::{Env, Result}; +use crate::{Database, Env, Error, PolyDatabase, Result}; /// A read-only transaction. pub struct RoTxn<'e> { @@ -30,6 +35,60 @@ impl<'e> RoTxn<'e> { pub(crate) fn env_mut_ptr(&self) -> *mut ffi::MDB_env { self.env.env_mut_ptr() } + + // get the dbi from the env without any call to LMDB + pub fn open_database( + &self, + name: Option<&str>, + ) -> Result>> + where + KC: 'static, + DC: 'static, + { + let mut lock = self.env.0.dbi_open_mutex.lock().unwrap(); + + match lock.entry(name.map(ToOwned::to_owned)) { + Entry::Occupied(mut entry) => { + let (dbi, t) = entry.get_mut(); + let types = DatabaseType::Typed { + key_type: TypeId::of::(), + data_type: TypeId::of::(), + }; + + if t.is_none() { + *t = Some(types); + Ok(Some(Database::new(self.env_mut_ptr() as _, *dbi))) + } else if *t != Some(types) { + Err(Error::InvalidDatabaseTyping) + } else { + Ok(Some(Database::new(self.env_mut_ptr() as _, *dbi))) + } + } + Entry::Vacant(entry) => Ok(None), + } + } + + // get the dbi from the env without any call to LMDB + pub fn open_poly_database(&self, name: Option<&str>) -> Result>> { + let mut lock = self.env.0.dbi_open_mutex.lock().unwrap(); + + match lock.entry(name.map(ToOwned::to_owned)) { + Entry::Occupied(mut entry) => { + let types = DatabaseType::Untyped; + + let (dbi, t) = entry.get_mut(); + if t.is_none() { + *t = Some(types); + Ok(Some(PolyDatabase::new(self.env_mut_ptr() as _, *dbi))) + } else if *t != Some(types) { + Err(Error::InvalidDatabaseTyping) + } else { + Ok(Some(PolyDatabase::new(self.env_mut_ptr() as _, *dbi))) + } + } + Entry::Vacant(entry) => Ok(None), + } + } } impl Drop for RoTxn<'_> { @@ -52,34 +111,158 @@ fn abort_txn(txn: *mut ffi::MDB_txn) { /// A read-write transaction. pub struct RwTxn<'p> { pub(crate) txn: RoTxn<'p>, + // The list of dbi only available while the transaction is alive + // this list will be added to the global list if the transaction is successfully committed. + local_opened_dbi: sync::Mutex, (u32, DatabaseType)>>, } impl<'p> RwTxn<'p> { pub(crate) fn new(env: &'p Env) -> Result> { let mut txn: *mut ffi::MDB_txn = ptr::null_mut(); - unsafe { mdb_result(ffi::mdb_txn_begin(env.env_mut_ptr(), ptr::null_mut(), 0, &mut txn))? }; - - Ok(RwTxn { txn: RoTxn { txn, env } }) + Ok(RwTxn { txn: RoTxn { txn, env }, local_opened_dbi: Default::default() }) } pub(crate) fn nested(env: &'p Env, parent: &'p mut RwTxn) -> Result> { let mut txn: *mut ffi::MDB_txn = ptr::null_mut(); let parent_ptr: *mut ffi::MDB_txn = parent.txn.txn; - unsafe { mdb_result(ffi::mdb_txn_begin(env.env_mut_ptr(), parent_ptr, 0, &mut txn))? }; - - Ok(RwTxn { txn: RoTxn { txn, env } }) + Ok(RwTxn { txn: RoTxn { txn, env }, local_opened_dbi: Default::default() }) } pub(crate) fn env_mut_ptr(&self) -> *mut ffi::MDB_env { self.txn.env.env_mut_ptr() } + // TODO document me + pub fn open_database<'t, KC, DC>( + &'t self, + name: Option<&str>, + ) -> Result>> + where + KC: 'static, + DC: 'static, + { + let types = Typed { key_type: TypeId::of::(), data_type: TypeId::of::() }; + match self.raw_init_database(name, types, false) { + Ok(dbi) => Ok(Some(Database::new(self.env_mut_ptr() as _, dbi))), + Err(Error::Mdb(e)) if e.not_found() => Ok(None), + Err(e) => Err(e), + } + } + + // TODO document me + pub fn open_poly_database<'t>( + &'t self, + name: Option<&str>, + ) -> Result>> { + match self.raw_init_database(name, Untyped, false) { + Ok(dbi) => Ok(Some(PolyDatabase::new(self.env_mut_ptr() as _, dbi))), + Err(Error::Mdb(e)) if e.not_found() => Ok(None), + Err(e) => Err(e), + } + } + + // TODO document me + pub fn create_database<'t, KC, DC>(&'t self, name: Option<&str>) -> Result> + where + KC: 'static, + DC: 'static, + { + let types = Typed { key_type: TypeId::of::(), data_type: TypeId::of::() }; + match self.raw_init_database(name, types, true) { + Ok(dbi) => Ok(Database::new(self.env_mut_ptr() as _, dbi)), + Err(e) => Err(e), + } + } + + // TODO document me + pub fn create_poly_database<'t>(&'t self, name: Option<&str>) -> Result> { + match self.raw_init_database(name, Untyped, true) { + Ok(dbi) => Ok(PolyDatabase::new(self.env_mut_ptr() as _, dbi)), + Err(e) => Err(e), + } + } + + fn raw_init_database( + &self, + name: Option<&str>, + types: DatabaseType, + create: bool, + ) -> Result { + let mut global = self.txn.env.0.dbi_open_mutex.lock().unwrap(); + let mut local = self.local_opened_dbi.lock().unwrap(); + + let raw_txn = self.txn.txn; + let flags = if create { ffi::MDB_CREATE } else { 0 }; + match raw_open_dbi(raw_txn, name, flags) { + Ok(dbi) => { + let name = name.map(ToOwned::to_owned); + if let Some((_, Some(t))) = global.get(&name) { + if *t == types { + Ok(dbi) + } else { + Err(Error::InvalidDatabaseTyping) + } + } else { + match local.entry(name) { + Entry::Occupied(mut entry) => { + let (dbi, t) = entry.get_mut(); + if *t != types { + Err(Error::InvalidDatabaseTyping) + } else { + Ok(*dbi) + } + } + Entry::Vacant(entry) => { + entry.insert((dbi, types)); + Ok(dbi) + } + } + } + } + Err(e) => Err(e.into()), + } + } + pub fn commit(mut self) -> Result<()> { let result = unsafe { mdb_result(ffi::mdb_txn_commit(self.txn.txn)) }; self.txn.txn = ptr::null_mut(); - result.map_err(Into::into) + match result { + Ok(()) => { + for (name, (dbi, types)) in self.local_opened_dbi.into_inner().unwrap() { + // ... + } + + // let mut lock = self.txn.env.0.dbi_open_mutex.lock().unwrap(); + + // let raw_txn = self.txn.txn; + // let flags = if create { ffi::MDB_CREATE } else { 0 }; + // match raw_open_dbi(raw_txn, name, flags) { + // Ok(dbi) => match lock.entry(name.map(ToOwned::to_owned)) { + // Entry::Occupied(mut entry) => { + // let (dbi, t) = entry.get_mut(); + // if t.is_none() { + // *t = Some(types); + // Ok(*dbi) + // } else if *t != Some(types) { + // Err(Error::InvalidDatabaseTyping) + // } else { + // Ok(*dbi) + // } + // } + // Entry::Vacant(entry) => { + // entry.insert((dbi, Some(types))); + // Ok(dbi) + // } + // }, + // Err(e) => Err(e.into()), + // } + + todo!("store the opened databases into the env") + } + Err(e) => Err(e.into()), + } } pub fn abort(mut self) { @@ -95,3 +278,22 @@ impl<'p> Deref for RwTxn<'p> { &self.txn } } + +fn raw_open_dbi( + raw_txn: *mut ffi::MDB_txn, + name: Option<&str>, + flags: u32, +) -> std::result::Result { + let mut dbi = 0; + let name = name.map(|n| CString::new(n).unwrap()); + let name_ptr = match name { + Some(ref name) => name.as_bytes_with_nul().as_ptr() as *const _, + None => ptr::null(), + }; + + // safety: The name cstring is cloned by LMDB, we can drop it after. + // If a read-only is used with the MDB_CREATE flag, LMDB will throw an error. + unsafe { mdb_result(ffi::mdb_dbi_open(raw_txn, name_ptr, flags, &mut dbi))? }; + + Ok(dbi) +}