Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 76 additions & 14 deletions crates/walrus-e2e-tests/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,14 +423,13 @@ async fn test_store_with_existing_blob_resource(

let blob_data = walrus_test_utils::random_data_list(31415, 4);
let blobs: Vec<&[u8]> = blob_data.iter().map(AsRef::as_ref).collect();
let encoding_type = DEFAULT_ENCODING;
let metatdatum = blobs
.iter()
.map(|blob| {
let (_, metadata) = client
.as_ref()
.encoding_config()
.get_for_type(encoding_type)
.get_for_type(DEFAULT_ENCODING)
.encode_with_metadata(blob)
.expect("blob encoding should not fail");
let metadata = metadata.metadata().to_owned();
Expand Down Expand Up @@ -462,8 +461,7 @@ async fn test_store_with_existing_blob_resource(
.collect::<HashMap<_, _>>();

// Now ask the client to store again.
let store_args =
StoreArgs::default_with_epochs(epochs_ahead_required).with_encoding_type(encoding_type);
let store_args = StoreArgs::default_with_epochs(epochs_ahead_required);
let blob_stores = client
.inner
.reserve_and_store_blobs(&blobs, &store_args)
Expand Down Expand Up @@ -717,12 +715,11 @@ async_param_test! {
/// The `epochs_ahead_registered` are the epochs ahead of the already-existing storage resource.
/// The `epochs_ahead_required` are the epochs ahead that are requested to the client when
/// registering anew.
/// `should_match` is a boolean that indicates if the storage object used in the final upload
/// should be the same as the first one registered.
/// `should_reuse` is a boolean that indicates if the storage objects should be reused.
async fn test_store_with_existing_storage_resource(
epochs_ahead_registered: EpochCount,
epochs_ahead_required: EpochCount,
should_match: bool,
should_reuse: bool,
) -> TestResult {
telemetry_subscribers::init_for_testing();

Expand All @@ -746,19 +743,34 @@ async fn test_store_with_existing_storage_resource(
.map(|blob| blob.encoded_size().expect("encoded size should be present"))
.collect::<Vec<_>>();

// The encoded sizes should all be the same.
assert!(encoded_sizes.iter().all(|&size| size == encoded_sizes[0]));
let encoded_size = encoded_sizes[0];

// Reserve space for the blobs. Collect all original storage resource objects ids.
// Each storage object has `EXTRA_BYTES_PER_STORAGE` bytes reserved, such that we can later
// check they were correctly split off.
const EXTRA_BYTES_PER_STORAGE: u64 = 2718;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Praise: Thanks for adding the tests!

I am wondering if we can test with spaces of different sizes pre-reserved: encoded_size - x, encoded_size, encoded_size + x. so that we can cover the case that some blobs reuse spaces, some registered new ones.

let original_storage_resources =
futures::future::join_all(encoded_sizes.iter().map(|encoded_size| async {
let resource = client
client
.as_ref()
.sui_client()
.reserve_space(*encoded_size, epochs_ahead_registered)
.reserve_space(
*encoded_size + EXTRA_BYTES_PER_STORAGE,
epochs_ahead_registered,
)
.await
.expect("reserve space should not fail");
resource.id
.expect("reserve space should not fail")
}))
.await
.await;

let storage_start_epoch = original_storage_resources[0].start_epoch;
let current_epoch = client.as_ref().sui_client().current_epoch().await?;

let original_storage_resource_ids = original_storage_resources
.into_iter()
.map(|resource| resource.id)
.collect::<HashSet<_>>();

let blobs = encoded_blobs
Expand All @@ -780,8 +792,58 @@ async fn test_store_with_existing_storage_resource(
})
.collect::<HashSet<_>>();

// Check the object ids are the same.
assert!(should_match == (original_storage_resources == blob_store));
if should_reuse {
// Check the object ids are the same.
assert_eq!(original_storage_resource_ids, blob_store);

// The storage should have been reused.
// Check that the wallet contains the right number of storage objects, and with the right
// amount of storage left.
let storage_objects = client
.as_ref()
.sui_client()
.owned_storage(ExpirySelectionPolicy::All)
.await?;

if epochs_ahead_registered > epochs_ahead_required {
// We should have split both in length and in size.
assert_eq!(
storage_objects.len(),
original_storage_resource_ids.len() * 2
);
} else {
// We should have split only in size.
assert_eq!(storage_objects.len(), original_storage_resource_ids.len());
}

// There can be either one or two storage objects per original storage resource, depending
// on whether the original storage resource has been split in the number of epochs or not.
//
// In both cases, the "long-thin" piece on top, resulting from the 1st split by size,
// should be present.
//
// size ^ ┌───────────────────────────┐ < encoded_size + EXTRA_BYTES_PER_STORAGE
// │ 1st split by size │ } this is exactly EXTRA_BYTES_PER_STORAGE
// ┌──────────────────┌────────┐ < encoded_size
// │ Used storage │ 2nd │
// │ │ split │
// │ │ by ep. │
// └──────────────────└────────┘
// start ^ required ^ reg ^ epochs >
for storage_object in storage_objects {
assert!(
// The long-thin piece.
(storage_object.start_epoch == storage_start_epoch
&& storage_object.end_epoch == epochs_ahead_registered + current_epoch
&& storage_object.storage_size == EXTRA_BYTES_PER_STORAGE)
// The short-thick piece.
|| (storage_object.start_epoch == epochs_ahead_required + current_epoch
&& storage_object.end_epoch == epochs_ahead_registered + current_epoch
&& storage_object.storage_size == encoded_size)
);
}
}

Ok(())
}

Expand Down
9 changes: 6 additions & 3 deletions crates/walrus-sdk/src/client/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,14 +534,13 @@ impl<'a> ResourceManager<'a> {
}
}

// TODO(giac): consider splitting the storage before reusing it (WAL-208).
let target_epoch = epochs_ahead + self.write_committee_epoch;
if !blob_processing_items.is_empty() {
let all_storage_resources = self
.sui_client
.owned_storage(ExpirySelectionPolicy::Valid)
.await?;

let target_epoch = epochs_ahead + self.write_committee_epoch;
let mut available_resources: Vec<_> = all_storage_resources
.into_iter()
.filter(|storage| storage.end_epoch >= target_epoch)
Expand Down Expand Up @@ -592,7 +591,11 @@ impl<'a> ResourceManager<'a> {
);
let blobs = self
.sui_client
.register_blobs(reused_metadata_with_storage, persistence)
.register_blobs(
reused_metadata_with_storage,
persistence,
Some(target_epoch),
)
.await?;
results.extend(blobs.into_iter().zip(reused_encoded_lengths.iter()).map(
|(blob, &encoded_length)| (blob, RegisterBlobOp::ReuseStorage { encoded_length }),
Expand Down
63 changes: 60 additions & 3 deletions crates/walrus-sui/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,12 +600,13 @@ impl SuiContractClient {
&self,
blob_metadata_and_storage: Vec<(BlobObjectMetadata, StorageResource)>,
persistence: BlobPersistence,
target_epoch: Option<Epoch>,
) -> SuiClientResult<Vec<Blob>> {
Comment on lines +603 to 604
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: will there be calls of register_blobs with target_epoch = None, expect in tests? Does it make sense to use Epoch instead of Option?

self.retry_on_wrong_version(|| async {
self.inner
.lock()
.await
.register_blobs(blob_metadata_and_storage.clone(), persistence)
.register_blobs(blob_metadata_and_storage.clone(), persistence, target_epoch)
.await
})
.await
Expand Down Expand Up @@ -1581,16 +1582,63 @@ impl SuiContractClientInner {
self.sui_client().get_sui_object(storage_id[0]).await
}

/// Checks if storage resources need to be split and performs the splitting if necessary.
///
/// Splits first by size, if the storage size exceeds the required blob encoded size,
/// then by epoch, if the storage end epoch is greater than the intended store epoch.
///
/// Returns a new StorageResource that is appropriately sized for the blob metadata.
async fn check_and_split_storage(
pt_builder: &mut WalrusPtbBuilder,
blob_metadata: &BlobObjectMetadata,
storage: StorageResource,
target_epoch: Option<Epoch>,
) -> SuiClientResult<StorageResource> {
let mut current_storage = storage;

if current_storage.storage_size > blob_metadata.encoded_size {
tracing::debug!(
storage_size = current_storage.storage_size,
blob_size = blob_metadata.encoded_size,
"splitting existing storage by size",
);
let _split_arg = pt_builder
.split_storage_by_size(current_storage.id.into(), blob_metadata.encoded_size)
.await?;
current_storage.storage_size = blob_metadata.encoded_size;
}

if let Some(target_epoch) = target_epoch {
if current_storage.end_epoch > target_epoch {
tracing::debug!(
storage_end_epoch = current_storage.end_epoch,
target_epoch,
"splitting existing storage by epoch",
);

let _split_arg = pt_builder
.split_storage_by_epoch(current_storage.id.into(), target_epoch)
.await?;
current_storage.end_epoch = target_epoch;
}
}

Ok(current_storage)
}

/// Registers a blob with the specified [`BlobId`] using the provided [`StorageResource`],
/// and returns the created blob object.
///
/// `blob_size` is the size of the unencoded blob. The encoded size of the blob must be
/// less than or equal to the size reserved in `storage`.
///
/// Storage resources are automatically split to the correct size and epoch if needed.
#[tracing::instrument(level = Level::DEBUG, skip_all)]
pub async fn register_blobs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark/Suggestion:
I'm not sure if this is the main function that we should be concerned with.
We use this function mainly for tests, but as I've noticed now, we also use it in ResourceManager::get_existing_or_register_with_resources as separate call in addition to reserve_and_register, which we should not do, because that means potentially sending two transactions instead of one.

I think the main focus should be on reserve_and_register to ensure that existing storage is reused if possible (e.g. by passing a vector of storage resources that can then be merged & split, potentially combined with new storage), which should then also reduce the complexity in the ResourceManager.

We can then also do the splitting in this function here, but then I'd prefer an approach where instead of passing pairs of (BlobObjectMetadata, StorageResource), we pass two independent vectors (or slices) of BlobObjectMetadata and StorageResource, which then allows the storage resources to also be combined instead of just split. This would then also allow using the same logic here as in reserve_and_register for splitting & merging (modulo buying new storage, which would just cause an error here if that were needed).

&mut self,
blob_metadata_and_storage: Vec<(BlobObjectMetadata, StorageResource)>,
persistence: BlobPersistence,
target_epoch: Option<Epoch>,
) -> SuiClientResult<Vec<Blob>> {
if blob_metadata_and_storage.is_empty() {
tracing::debug!("no blobs to register");
Expand All @@ -1604,13 +1652,22 @@ impl SuiContractClientInner {
let mut pt_builder = self.transaction_builder()?;
// Build a ptb to include all register blob commands for all blobs.
for (blob_metadata, storage) in blob_metadata_and_storage.into_iter() {
// Check and split storage resources as needed before registering.
let split_storage = Self::check_and_split_storage(
&mut pt_builder,
&blob_metadata,
storage,
target_epoch,
)
.await?;

if with_credits {
pt_builder
.register_blob_with_credits(storage.id.into(), blob_metadata, persistence)
.register_blob_with_credits(split_storage.id.into(), blob_metadata, persistence)
.await?;
} else {
pt_builder
.register_blob(storage.id.into(), blob_metadata, persistence)
.register_blob(split_storage.id.into(), blob_metadata, persistence)
.await?;
};
}
Expand Down
20 changes: 20 additions & 0 deletions crates/walrus-sui/src/client/transaction_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,26 @@ impl WalrusPtbBuilder {
Ok(result_arg)
}

/// Adds a call to `storage_resource::split_by_epoch` to the `pt_builder` and returns
/// the result [`Argument`].
///
/// The call modifies the input storage resource to end at `split_epoch` and creates
/// a new storage resource covering the period from `split_epoch` to the original end epoch.
pub async fn split_storage_by_epoch(
&mut self,
storage_resource: ArgumentOrOwnedObject,
split_epoch: u32,
) -> SuiClientResult<Argument> {
let split_arguments = vec![
self.argument_from_arg_or_obj(storage_resource).await?,
self.pt_builder.pure(split_epoch)?,
];
let result_arg =
self.walrus_move_call(contracts::storage_resource::split_by_epoch, split_arguments)?;
self.add_result_to_be_consumed(result_arg);
Ok(result_arg)
}

/// Adds a call to `register_blob` to the `pt_builder` and returns the result [`Argument`].
pub async fn register_blob(
&mut self,
Expand Down
2 changes: 2 additions & 0 deletions crates/walrus-sui/tests/test_walrus_sui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ async fn test_register_certify_blob() -> anyhow::Result<()> {
.register_blobs(
vec![(blob_metadata, storage_resource.clone())],
BlobPersistence::Permanent,
None,
)
.await?
.into_iter()
Expand Down Expand Up @@ -263,6 +264,7 @@ async fn test_register_certify_blob() -> anyhow::Result<()> {
.register_blobs(
vec![(blob_metadata, storage_resource)],
BlobPersistence::Permanent,
None,
)
.await?
.into_iter()
Expand Down