@@ -3,12 +3,14 @@ use std::path::{Path, PathBuf};
33use std:: sync:: atomic:: { AtomicBool , Ordering } ;
44use std:: sync:: Arc ;
55
6+ use arc_swap:: ArcSwapOption ;
67use libsql_sys:: wal:: wrapper:: { WalWrapper , WrapWal , WrappedWal } ;
78use libsql_sys:: wal:: { Wal , WalManager } ;
89use metrics:: { histogram, increment_counter} ;
910use parking_lot:: { Mutex , RwLock } ;
1011use rusqlite:: ffi:: SQLITE_BUSY ;
1112use rusqlite:: { DatabaseName , ErrorCode , OpenFlags , StatementStatus , TransactionState } ;
13+ use std:: collections:: HashSet ;
1214use tokio:: sync:: { watch, Notify } ;
1315use tokio:: time:: { Duration , Instant } ;
1416
@@ -40,6 +42,8 @@ pub struct MakeLibSqlConn<T: WalManager> {
4042 auto_checkpoint : u32 ,
4143 current_frame_no_receiver : watch:: Receiver < Option < FrameNo > > ,
4244 state : Arc < TxnState < T :: Wal > > ,
45+ // Information whether database1 is attached to database2
46+ attachments : ArcSwapOption < HashSet < ( String , String ) > > ,
4347 /// In wal mode, closing the last database takes time, and causes other databases creation to
4448 /// return sqlite busy. To mitigate that, we hold on to one connection
4549 _db : Option < LibSqlConnection < T :: Wal > > ,
7478 _db : None ,
7579 state : Default :: default ( ) ,
7680 wal_manager,
81+ attachments : ArcSwapOption :: from ( None ) ,
7782 } ;
7883
7984 let db = this. try_create_db ( ) . await ?;
@@ -125,6 +130,7 @@ where
125130 } ,
126131 self . current_frame_no_receiver . clone ( ) ,
127132 self . state . clone ( ) ,
133+ self . attachments . load_full ( ) ,
128134 )
129135 . await
130136 }
@@ -141,6 +147,30 @@ where
141147 async fn create ( & self ) -> Result < Self :: Connection , Error > {
142148 self . make_connection ( ) . await
143149 }
150+
151+ fn register_attached ( & self , db_name : String , attached_db_name : String ) {
152+ self . attachments . rcu ( |attachments| {
153+ let mut attachments = if let Some ( attachments) = attachments {
154+ HashSet :: clone ( attachments)
155+ } else {
156+ HashSet :: new ( )
157+ } ;
158+ attachments. insert ( ( db_name. clone ( ) , attached_db_name. clone ( ) ) ) ;
159+ Some ( Arc :: new ( attachments) )
160+ } ) ;
161+ }
162+
163+ fn unregister_attached ( & self , db_name : String , attached_db_name : String ) {
164+ self . attachments . rcu ( |attachments| {
165+ if let Some ( attachments) = attachments {
166+ let mut attachments = HashSet :: clone ( attachments) ;
167+ attachments. remove ( & ( db_name. clone ( ) , attached_db_name. clone ( ) ) ) ;
168+ Some ( Arc :: new ( attachments) )
169+ } else {
170+ None
171+ }
172+ } ) ;
173+ }
144174}
145175
146176pub struct LibSqlConnection < T > {
@@ -180,7 +210,7 @@ impl<W: Wal> WrapWal<W> for InhibitCheckpointWalWrapper {
180210 _buf : & mut [ u8 ] ,
181211 ) -> libsql_sys:: wal:: Result < ( u32 , u32 ) > {
182212 tracing:: warn!(
183- "chackpoint inhibited: this connection is not allowed to perform checkpoints"
213+ "checkpoint inhibited: this connection is not allowed to perform checkpoints"
184214 ) ;
185215 Err ( rusqlite:: ffi:: Error :: new ( SQLITE_BUSY ) )
186216 }
@@ -245,6 +275,43 @@ where
245275 libsql_sys:: Connection :: open ( path. join ( "data" ) , flags, wal_manager, auto_checkpoint)
246276}
247277
278+ fn attach_databases < W : Wal + Send + ' static > (
279+ conn : & libsql_sys:: Connection < W > ,
280+ path : & Path ,
281+ attachments : & HashSet < ( String , String ) > ,
282+ ) {
283+ let db_name = path
284+ . file_name ( )
285+ . map ( std:: ffi:: OsStr :: to_str)
286+ . flatten ( )
287+ . unwrap_or ( "" ) ;
288+ tracing:: info!( "DB name: {db_name}" ) ;
289+
290+ let dbs_path = path
291+ . parent ( )
292+ . unwrap_or_else ( || std:: path:: Path :: new ( ".." ) )
293+ . canonicalize ( )
294+ . unwrap_or_else ( |_| std:: path:: PathBuf :: from ( ".." ) ) ;
295+ // Linear, under the assumption ATTACH is rare (it better be,
296+ // or the amount of open descriptors will quickly blow up).
297+ for ( db1_name, db2_name) in attachments {
298+ tracing:: info!( "attach entry: {db1_name} -> {db2_name}" ) ;
299+ if db1_name == db_name {
300+ let attached_path = dbs_path. join ( & db2_name) . join ( "data" ) ;
301+ tracing:: info!( "Attaching {} to {db_name}" , attached_path. display( ) ) ;
302+ let query = format ! (
303+ "ATTACH DATABASE 'file:{}?mode=ro' AS \" {db2_name}\" " ,
304+ attached_path. display( )
305+ ) ;
306+ if let Err ( e) = conn. execute ( & query, ( ) ) {
307+ tracing:: warn!( "Failed to attach database {}: {e}" , attached_path. display( ) , ) ;
308+ } else {
309+ tracing:: debug!( "Attached {} as {db2_name}" , attached_path. display( ) ) ;
310+ }
311+ }
312+ }
313+ }
314+
248315impl < W > LibSqlConnection < W >
249316where
250317 W : Wal + Send + ' static ,
@@ -258,6 +325,7 @@ where
258325 builder_config : QueryBuilderConfig ,
259326 current_frame_no_receiver : watch:: Receiver < Option < FrameNo > > ,
260327 state : Arc < TxnState < W > > ,
328+ attachments : Option < Arc < HashSet < ( String , String ) > > > ,
261329 ) -> crate :: Result < Self >
262330 where
263331 T : WalManager < Wal = W > + Send + ' static ,
@@ -276,6 +344,11 @@ where
276344 ) ?;
277345 conn. conn
278346 . pragma_update ( None , "max_page_count" , max_db_size) ?;
347+
348+ if let Some ( attachments) = attachments. as_ref ( ) {
349+ attach_databases ( & conn. conn , path. as_ref ( ) , attachments) ;
350+ }
351+
279352 Ok ( conn)
280353 } )
281354 . await
0 commit comments