@@ -42,11 +42,16 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.LSM
4242 ) where
4343
4444import Codec.Serialise (decode )
45+ import Control.Exception (assert )
4546import qualified Control.Monad as Monad
4647import Control.Monad.Trans (lift )
4748import Control.Monad.Trans.Except
49+ import Control.Monad.Trans.Maybe (MaybeT (.. ), maybeToExceptT )
4850import Control.ResourceRegistry
4951import Control.Tracer
52+ import Data.ByteString (toStrict )
53+ import qualified Data.ByteString.Builder as BS
54+ import Data.ByteString.Char8 (readWord64 )
5055import qualified Data.Foldable as Foldable
5156import Data.Functor.Contravariant ((>$<) )
5257import qualified Data.List as List
@@ -63,6 +68,7 @@ import qualified Data.Vector as V
6368import qualified Data.Vector.Mutable as VM
6469import qualified Data.Vector.Primitive as VP
6570import Data.Void
71+ import Data.Word
6672import Database.LSMTree (Salt , Session , Table )
6773import qualified Database.LSMTree as LSM
6874import GHC.Generics
@@ -87,6 +93,7 @@ import Ouroboros.Consensus.Util.IndexedMemPack
8793import qualified Streaming as S
8894import qualified Streaming.Prelude as S
8995import System.FS.API
96+ import System.FS.API.Lazy (hGetAll , hPutAll )
9097import qualified System.FS.BlockIO.API as BIO
9198import System.FS.BlockIO.IO
9299import System.FilePath (splitDirectories , splitFileName )
@@ -170,21 +177,24 @@ newLSMLedgerTablesHandle ::
170177 , IndexedMemPack (l EmptyMK ) (TxOut l )
171178 ) =>
172179 Tracer m LedgerDBV2Trace ->
180+ -- | The size of the tables
181+ Word64 ->
173182 (ResourceKey m , UTxOTable m ) ->
174183 m (LedgerTablesHandle m l )
175- newLSMLedgerTablesHandle tracer (origResKey, t) =
184+ newLSMLedgerTablesHandle tracer origSize (origResKey, t) =
176185 encloseTimedWith (TraceLedgerTablesHandleCreate >$< tracer) $ do
177186 tv <- newTVarIO origResKey
187+ tsize <- newTVarIO origSize
178188 pure
179189 LedgerTablesHandle
180190 { close = implClose tv
181- , duplicate = \ rr -> implDuplicate rr t tracer
191+ , duplicate = \ rr -> implDuplicate rr tsize t tracer
182192 , read = implRead tracer t
183193 , readRange = implReadRange t
184194 , readAll = implReadAll t
185- , pushDiffs = implPushDiffs tracer t
195+ , pushDiffs = implPushDiffs tracer t tsize
186196 , takeHandleSnapshot = implTakeHandleSnapshot tracer t
187- , tablesSize = pure Nothing
197+ , tablesSize = fromIntegral <$> readTVarIO tsize
188198 , transfer = atomically . writeTVar tv
189199 }
190200
@@ -206,16 +216,18 @@ implDuplicate ::
206216 , IndexedMemPack (l EmptyMK ) (TxOut l )
207217 ) =>
208218 ResourceRegistry m ->
219+ StrictTVar m Word64 ->
209220 UTxOTable m ->
210221 Tracer m LedgerDBV2Trace ->
211222 m (ResourceKey m , LedgerTablesHandle m l )
212- implDuplicate rr t tracer = do
223+ implDuplicate rr sizeTVar t tracer = do
213224 (rk, table) <-
214225 allocate
215226 rr
216227 (\ _ -> encloseTimedWith (TraceLedgerTablesHandleDuplicate >$< tracer) $ LSM. duplicate t)
217228 (encloseTimedWith (TraceLedgerTablesHandleClose >$< tracer) . LSM. closeTable)
218- (rk,) <$> newLSMLedgerTablesHandle tracer (rk, table)
229+ size <- readTVarIO sizeTVar
230+ (rk,) <$> newLSMLedgerTablesHandle tracer size (rk, table)
219231
220232implRead ::
221233 forall m l .
@@ -303,8 +315,8 @@ implPushDiffs ::
303315 , HasLedgerTables l
304316 , IndexedMemPack (l EmptyMK ) (TxOut l )
305317 ) =>
306- Tracer m LedgerDBV2Trace -> UTxOTable m -> l mk -> l DiffMK -> m ()
307- implPushDiffs tracer t _ ! st1 =
318+ Tracer m LedgerDBV2Trace -> UTxOTable m -> StrictTVar m Word64 -> l mk -> l DiffMK -> m ()
319+ implPushDiffs tracer t sizeTVar _ ! st1 =
308320 encloseTimedWith (TraceLedgerTablesHandleRead >$< tracer) $ do
309321 let LedgerTables (DiffMK (Diff. Diff diffs)) = projectLedgerTables st1
310322 let vec = V. create $ do
@@ -314,6 +326,22 @@ implPushDiffs tracer t _ !st1 =
314326 0
315327 $ Map. toList diffs
316328 pure vec'
329+ let (ins, dels) =
330+ Map. foldl'
331+ ( \ (i, d) delta -> case delta of
332+ Diff. Insert {} -> (i + 1 , d)
333+ Diff. Delete -> (i, d + 1 )
334+ )
335+ (0 , 0 )
336+ diffs
337+ atomically $
338+ modifyTVar
339+ sizeTVar
340+ ( \ x ->
341+ assert (x + ins >= x) $
342+ assert (x + ins - dels <= x + ins) $
343+ x + ins - dels
344+ )
317345 encloseTimedWith (BackendTrace . SomeBackendTrace . LSMUpdate >$< tracer) $ LSM. updates t vec
318346 where
319347 f (Diff. Insert v) = LSM. Insert (toTxOutBytes (forgetLedgerTables st1) v) Nothing
@@ -394,21 +422,41 @@ implTakeSnapshot ccfg tracer shfs@(SomeHasFS hasFs) suffix st =
394422 then
395423 return Nothing
396424 else do
425+ sz <- tablesSize (tables st)
397426 encloseTimedWith (TookSnapshot snapshot t >$< tracer) $
398- writeSnapshot snapshot
427+ writeSnapshot sz snapshot
399428 return $ Just (snapshot, t)
400429 where
401- writeSnapshot ds = do
430+ writeSnapshot sz ds = do
402431 createDirectoryIfMissing hasFs True $ snapshotToDirPath ds
403432 crc1 <- writeExtLedgerState shfs (encodeDiskExtLedgerState ccfg) (snapshotToStatePath ds) $ state st
404433 crc2 <- takeHandleSnapshot (tables st) (state st) $ snapshotToDirName ds
434+ writeUTxOSizeFile hasFs (snapshotToUTxOSizeFilePath ds) sz
405435 writeSnapshotMetadata shfs ds $
406436 SnapshotMetadata
407437 { snapshotBackend = UTxOHDLSMSnapshot
408438 , snapshotChecksum = maybe crc1 (crcOfConcat crc1) crc2
409439 , snapshotTablesCodecVersion = TablesCodecVersion1
410440 }
411441
442+ snapshotToUTxOSizeFilePath :: DiskSnapshot -> FsPath
443+ snapshotToUTxOSizeFilePath ds = snapshotToDirPath ds </> mkFsPath [" utxoSize" ]
444+
445+ writeUTxOSizeFile :: MonadThrow f => HasFS f h -> FsPath -> Int -> f ()
446+ writeUTxOSizeFile hasFs p sz =
447+ Monad. void $ withFile hasFs p (WriteMode MustBeNew ) $ \ h ->
448+ hPutAll hasFs h $ BS. toLazyByteString $ BS. intDec sz
449+
450+ readUTxOSizeFile :: MonadThrow m => HasFS m h -> FsPath -> ExceptT (SnapshotFailure blk ) m Word64
451+ readUTxOSizeFile hfs p = do
452+ exists <- lift $ doesFileExist hfs p
453+ Monad. unless exists $ throwE (InitFailureRead ReadSnapshotDataCorruption )
454+ fmap fst $
455+ maybeToExceptT (InitFailureRead ReadSnapshotDataCorruption ) $
456+ MaybeT $
457+ withFile hfs p ReadMode $ \ h ->
458+ readWord64 . toStrict <$> hGetAll hfs h
459+
412460-- | Delete snapshot from disk and also from the LSM tree database.
413461implDeleteSnapshot ::
414462 IOLike m =>
@@ -471,6 +519,7 @@ loadSnapshot tracer rr ccfg fs@(SomeHasFS hfs) session ds =
471519 withExceptT
472520 (InitFailureRead . ReadSnapshotFailed )
473521 $ readExtLedgerState fs (decodeDiskExtLedgerState ccfg) decode (snapshotToStatePath ds)
522+ msz <- readUTxOSizeFile hfs (snapshotToUTxOSizeFilePath ds)
474523 case pointToWithOriginRealPoint (castPoint (getTip extLedgerSt)) of
475524 Origin -> throwE InitFailureGenesis
476525 NotOrigin pt -> do
@@ -492,7 +541,7 @@ loadSnapshot tracer rr ccfg fs@(SomeHasFS hfs) session ds =
492541 $ InitFailureRead
493542 ReadSnapshotDataCorruption
494543 (,pt)
495- <$> lift (empty extLedgerSt (rk, values) (newLSMLedgerTablesHandle tracer))
544+ <$> lift (empty extLedgerSt (rk, values) (newLSMLedgerTablesHandle tracer msz ))
496545
497546-- | Create the initial LSM table from values, which should happen only at
498547-- Genesis.
@@ -504,7 +553,7 @@ tableFromValuesMK ::
504553 Session m ->
505554 l EmptyMK ->
506555 LedgerTables l ValuesMK ->
507- m (ResourceKey m , UTxOTable m )
556+ m (ResourceKey m , UTxOTable m , Word64 )
508557tableFromValuesMK tracer rr session st (LedgerTables (ValuesMK values)) = do
509558 (rk, table) <-
510559 allocate
@@ -515,7 +564,7 @@ tableFromValuesMK tracer rr session st (LedgerTables (ValuesMK values)) = do
515564 )
516565 (encloseTimedWith (TraceLedgerTablesHandleClose >$< tracer) . LSM. closeTable)
517566 mapM_ (go table) $ chunks 1000 $ Map. toList values
518- pure (rk, table)
567+ pure (rk, table, fromIntegral $ Map. size values )
519568 where
520569 go table items =
521570 LSM. inserts table $
@@ -613,9 +662,9 @@ instance
613662 loadSnapshot trcr reg ccfg shfs (sessionResource res) ds
614663
615664 newHandleFromValues trcr reg res st = do
616- table <-
665+ (rk, table, sz) <-
617666 tableFromValuesMK trcr reg (sessionResource res) (forgetLedgerTables st) (ltprj st)
618- newLSMLedgerTablesHandle trcr table
667+ newLSMLedgerTablesHandle trcr sz (rk, table)
619668
620669 snapshotManager _ res = Ouroboros.Consensus.Storage.LedgerDB.V2.LSM. snapshotManager (sessionResource res)
621670
@@ -636,13 +685,15 @@ instance
636685 = SinkLSM
637686 -- \| Chunk size
638687 Int
639- -- \| Snap name
640- String
688+ -- \| LedgerDB snapshot fs
689+ (SomeHasFS m )
690+ -- \| DiskSnapshot
691+ DiskSnapshot
641692 (Session m )
642693
643694 yield _ (YieldLSM chunkSize hdl ) = yieldLsmS chunkSize hdl
644695
645- sink _ (SinkLSM chunkSize snapName session ) = sinkLsmS chunkSize snapName session
696+ sink _ (SinkLSM chunkSize shfs ds session ) = sinkLsmS chunkSize shfs ds session
646697
647698data SomeHasFSAndBlockIO m where
648699 SomeHasFSAndBlockIO ::
@@ -684,35 +735,37 @@ sinkLsmS ::
684735 , IndexedMemPack (l EmptyMK ) (TxOut l )
685736 ) =>
686737 Int ->
687- String ->
738+ SomeHasFS m ->
739+ DiskSnapshot ->
688740 Session m ->
689741 Sink m l
690- sinkLsmS writeChunkSize snapName session st s = do
691- tb :: UTxOTable m <- lift $ LSM. newTable session
692- r <- go tb writeChunkSize mempty s
742+ sinkLsmS writeChunkSize ( SomeHasFS hfs) ds session st stream = do
743+ lsmTable :: UTxOTable m <- lift $ LSM. newTable session
744+ (r, utxosSize) <- go ( 0 :: Int ) lsmTable writeChunkSize mempty stream
693745 lift $
694746 LSM. saveSnapshot
695- (LSM. toSnapshotName snapName )
747+ (LSM. toSnapshotName (snapshotToDirName ds) )
696748 (LSM. SnapshotLabel $ T. pack " UTxO table" )
697- tb
698- lift $ LSM. closeTable tb
749+ lsmTable
750+ lift $ LSM. closeTable lsmTable
751+ lift $ writeUTxOSizeFile hfs (snapshotToUTxOSizeFilePath ds) utxosSize
699752 pure (fmap (,Nothing ) r)
700753 where
701- go tb 0 m s' = do
702- lift $
703- LSM. inserts tb $
704- V. fromList [(toTxInBytes (Proxy @ l ) k, toTxOutBytes st v, Nothing ) | (k, v) <- m]
705- go tb writeChunkSize mempty s'
706- go tb n m s' = do
707- mbs <- S. uncons s'
708- case mbs of
754+ writeToTable lsmTable accUTxOs =
755+ LSM. inserts lsmTable $
756+ V. fromList
757+ [(toTxInBytes (Proxy @ l ) txin, toTxOutBytes st txout, Nothing ) | (txin, txout) <- accUTxOs]
758+
759+ go utxosSize lsmTable 0 accUTxOs stream' = do
760+ lift $ writeToTable lsmTable accUTxOs
761+ go utxosSize lsmTable writeChunkSize mempty stream'
762+ go utxosSize lsmTable numToRead accUTxOs stream' = do
763+ mItem <- S. uncons stream'
764+ case mItem of
709765 Nothing -> do
710- lift $
711- LSM. inserts tb $
712- V. fromList
713- [(toTxInBytes (Proxy @ l ) k, toTxOutBytes st v, Nothing ) | (k, v) <- m]
714- S. effects s'
715- Just (item, s'') -> go tb (n - 1 ) (item : m) s''
766+ lift $ writeToTable lsmTable accUTxOs
767+ (,utxosSize) <$> S. effects stream'
768+ Just (item, stream'') -> go (utxosSize + 1 ) lsmTable (numToRead - 1 ) (item : accUTxOs) stream''
716769
717770-- | Create Yield arguments for LSM
718771mkLSMYieldArgs ::
@@ -746,15 +799,17 @@ mkLSMYieldArgs fp snapName mkFS mkGen _ reg = do
746799 (LSM. SnapshotLabel $ T. pack " UTxO table" )
747800 )
748801 LSM. closeTable
749- YieldLSM 1000 <$> newLSMLedgerTablesHandle nullTracer tb
802+ YieldLSM 1000 <$> newLSMLedgerTablesHandle nullTracer 0 tb
750803
751804-- | Create Sink arguments for LSM
752805mkLSMSinkArgs ::
753806 IOLike m =>
754807 -- | The filepath in which the LSM database should be opened. Must not have a trailing slash!
755808 FilePath ->
756- -- | The complete name of the snapshot to be created, so @<slotno>[_<suffix>]@.
757- String ->
809+ -- | The filepath to the snapshot to be created, so @.../.../ledger/<slotno>[_<suffix>]@.
810+ FilePath ->
811+ -- | Usually 'ioHasFS'
812+ (MountPoint -> SomeHasFS m ) ->
758813 -- | Usually 'stdMkBlockIOFS'
759814 (FilePath -> ResourceRegistry m -> m (a , SomeHasFSAndBlockIO m )) ->
760815 -- | Usually 'newStdGen'
@@ -764,18 +819,20 @@ mkLSMSinkArgs ::
764819 m (SinkArgs m LSM l )
765820mkLSMSinkArgs
766821 (splitFileName -> (fp, lsmDir))
767- snapName
768- mkFS
822+ snapFP
823+ mkFs
824+ mkBlockIOFS
769825 mkGen
770826 _
771827 reg =
772828 do
773- (_, SomeHasFSAndBlockIO hasFS blockIO) <- mkFS fp reg
829+ (_, SomeHasFSAndBlockIO hasFS blockIO) <- mkBlockIOFS fp reg
774830 removeDirectoryRecursive hasFS lsmFsPath
775831 createDirectory hasFS lsmFsPath
776832 salt <- fst . genWord64 <$> mkGen
777833 (_, session) <-
778834 allocate reg (\ _ -> LSM. newSession nullTracer hasFS blockIO salt lsmFsPath) LSM. closeSession
779- pure (SinkLSM 1000 snapName session)
835+ let snapFS = mkFs (MountPoint snapFP)
836+ pure (SinkLSM 1000 snapFS (fromJust $ snapshotFromPath $ last $ splitDirectories snapFP) session)
780837 where
781838 lsmFsPath = mkFsPath [lsmDir]
0 commit comments