@@ -34,9 +34,10 @@ use snarkos_node_sync::{BlockSync, Ping};
3434use snarkvm:: {
3535 console:: account:: Address ,
3636 ledger:: {
37+ SubdagTransmissions ,
3738 block:: Transaction ,
3839 committee:: Committee ,
39- narwhal:: { BatchCertificate , Data , Subdag , Transmission , TransmissionID } ,
40+ narwhal:: { BatchCertificate , Data , NarwhalCertificate , Subdag , Transmission , TransmissionID } ,
4041 puzzle:: { Solution , SolutionID } ,
4142 } ,
4243 prelude:: { Field , Network , Result , bail, ensure} ,
@@ -636,10 +637,14 @@ impl<N: Network> BFT<N> {
636637 if !IS_SYNCING {
637638 // Initialize a map for the deduped transmissions.
638639 let mut transmissions = IndexMap :: new ( ) ;
640+ // Initialize a map for the deduped prior transmission ids.
641+ let mut prior_included_transmissions = IndexSet :: new ( ) ;
642+ // Initialize a map for transmission ids which could not be read from the ledger.
643+ let mut aborted_transmissions = IndexSet :: new ( ) ;
639644 // Initialize a map for the deduped transaction ids.
640- let mut seen_transaction_ids = IndexSet :: new ( ) ;
645+ let mut seen_transaction_ids: IndexSet < N :: TransactionID > = IndexSet :: new ( ) ;
641646 // Initialize a map for the deduped solution ids.
642- let mut seen_solution_ids = IndexSet :: new ( ) ;
647+ let mut seen_solution_ids: IndexSet < SolutionID < N > > = IndexSet :: new ( ) ;
643648 // Start from the oldest leader certificate.
644649 for certificate in commit_subdag. values ( ) . flatten ( ) {
645650 // Retrieve the transmissions.
@@ -650,7 +655,7 @@ impl<N: Network> BFT<N> {
650655 match transmission_id {
651656 TransmissionID :: Solution ( solution_id, _) => {
652657 // If the solution already exists, skip it.
653- if seen_solution_ids. contains ( & solution_id) {
658+ if seen_solution_ids. contains ( solution_id) {
654659 continue ;
655660 }
656661 }
@@ -670,41 +675,52 @@ impl<N: Network> BFT<N> {
670675 }
671676 // If the transmission already exists in the ledger, skip it.
672677 // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
673- if self . ledger ( ) . contains_transmission ( transmission_id) . unwrap_or ( true ) {
674- continue ;
675- }
676- // Retrieve the transmission.
677- let Some ( transmission) = self . storage ( ) . get_transmission ( * transmission_id) else {
678- bail ! (
679- "BFT failed to retrieve transmission '{}.{}' from round {}" ,
680- fmt_id( transmission_id) ,
681- fmt_id( transmission_id. checksum( ) . unwrap_or_default( ) ) . dimmed( ) ,
682- certificate. round( )
683- ) ;
684- } ;
685- // Insert the transaction ID or solution ID into the map.
686- match transmission_id {
687- TransmissionID :: Solution ( id, _) => {
688- seen_solution_ids. insert ( id) ;
678+ // Check if the ledger contains the transmission already.
679+ match self . ledger ( ) . contains_transmission ( transmission_id) {
680+ // On failure to read from the ledger, we skip including this transmission, out of safety.
681+ Err ( err) => {
682+ warn ! ( "{}" , err) ;
683+ aborted_transmissions. insert ( * transmission_id) ;
684+ }
685+ // If the transmission already exists in the ledger, save just the transmission ID.
686+ Ok ( true ) => {
687+ prior_included_transmissions. insert ( * transmission_id) ;
689688 }
690- TransmissionID :: Transaction ( id, _) => {
691- seen_transaction_ids. insert ( id) ;
689+ // If the transmission does not exist in the ledger, retrieve it from the storage.
690+ Ok ( false ) => {
691+ // Retrieve the transmission.
692+ let Some ( transmission) = self . storage ( ) . get_transmission ( * transmission_id) else {
693+ bail ! (
694+ "BFT failed to retrieve transmission '{}' from round {}" ,
695+ fmt_id( transmission_id) ,
696+ certificate. round( )
697+ ) ;
698+ } ;
699+ // Insert the transaction ID or solution ID into the map.
700+ match transmission_id {
701+ TransmissionID :: Solution ( id, _) => {
702+ seen_solution_ids. insert ( * id) ;
703+ }
704+ TransmissionID :: Transaction ( id, _) => {
705+ seen_transaction_ids. insert ( * id) ;
706+ }
707+ TransmissionID :: Ratification => { }
708+ }
709+ // Add the transmission to the set.
710+ transmissions. insert ( * transmission_id, transmission) ;
692711 }
693- TransmissionID :: Ratification => { }
694712 }
695- // Add the transmission to the set.
696- transmissions. insert ( * transmission_id, transmission) ;
697713 }
698714 }
699715 // Trigger consensus, as this will build a new block for the ledger.
700716 // Construct the subdag.
701- let subdag = Subdag :: from ( commit_subdag. clone ( ) ) ?;
717+ let subdag = Subdag :: from_full ( commit_subdag. clone ( ) ) ?;
702718 // Retrieve the anchor round.
703719 let anchor_round = subdag. anchor_round ( ) ;
704720 // Retrieve the number of transmissions.
705721 let num_transmissions = transmissions. len ( ) ;
706722 // Retrieve metadata about the subdag.
707- let subdag_metadata = subdag. iter ( ) . map ( | ( round , c ) | ( * round , c . len ( ) ) ) . collect :: < Vec < _ > > ( ) ;
723+ let subdag_metadata = subdag. num_certificates_rounds ( ) ;
708724
709725 // Ensure the subdag anchor round matches the leader round.
710726 ensure ! (
@@ -714,10 +730,13 @@ impl<N: Network> BFT<N> {
714730
715731 // Trigger consensus.
716732 if let Some ( consensus_sender) = self . consensus_sender . get ( ) {
733+ // Construct subdag transmissions.
734+ let subdag_transmissions =
735+ SubdagTransmissions { transmissions, prior_included_transmissions, aborted_transmissions } ;
717736 // Initialize a callback sender and receiver.
718737 let ( callback_sender, callback_receiver) = oneshot:: channel ( ) ;
719738 // Send the subdag and transmissions to consensus.
720- consensus_sender. tx_consensus_subdag . send ( ( subdag, transmissions , callback_sender) ) . await ?;
739+ consensus_sender. tx_consensus_subdag . send ( ( subdag, subdag_transmissions , callback_sender) ) . await ?;
721740 // Await the callback to continue.
722741 match callback_receiver. await {
723742 Ok ( Ok ( ( ) ) ) => ( ) , // continue
@@ -733,7 +752,7 @@ impl<N: Network> BFT<N> {
733752 }
734753
735754 info ! (
736- "\n \n Committing a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n "
755+ "\n \n Committing a subdag from round {anchor_round} with {num_transmissions} new transmissions: {subdag_metadata:?}\n "
737756 ) ;
738757 }
739758
@@ -745,7 +764,7 @@ impl<N: Network> BFT<N> {
745764
746765 // Update the validator telemetry.
747766 #[ cfg( feature = "telemetry" ) ]
748- self . primary ( ) . gateway ( ) . validator_telemetry ( ) . insert_subdag ( & Subdag :: from ( commit_subdag) ?) ;
767+ self . primary ( ) . gateway ( ) . validator_telemetry ( ) . insert_subdag ( & Subdag :: from_full ( commit_subdag) ?) ;
749768 }
750769
751770 // Perform garbage collection based on the latest committed leader round.
@@ -960,7 +979,10 @@ mod tests {
960979 console:: account:: { Address , PrivateKey } ,
961980 ledger:: {
962981 committee:: Committee ,
963- narwhal:: batch_certificate:: test_helpers:: { sample_batch_certificate, sample_batch_certificate_for_round} ,
982+ narwhal:: {
983+ NarwhalCertificate ,
984+ batch_certificate:: test_helpers:: { sample_batch_certificate, sample_batch_certificate_for_round} ,
985+ } ,
964986 } ,
965987 utilities:: TestRng ,
966988 } ;
0 commit comments