Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
ea6616c
WIP Allow uploading mags, attachments to existing datasets
fm3 Mar 16, 2026
2f4d28d
WIP refactor uploadService
fm3 Mar 31, 2026
0b21b58
Merge branch 'master' into upload-mags-attachments
fm3 Mar 31, 2026
584f4c1
complete the extraction of redis interaction
fm3 Mar 31, 2026
0a9cc69
three metadata stores
fm3 Mar 31, 2026
93d6579
WIP uploadDomain
fm3 Apr 1, 2026
3c95eb6
inject concrete class, not trait
fm3 Apr 1, 2026
d4b0037
Merge branch 'master' into upload-mags-attachments
fm3 Apr 2, 2026
9f96d4d
restructure upload info
fm3 Apr 2, 2026
bd71d24
reserve mag + attachment
fm3 Apr 2, 2026
b1ae126
insert mag + attachment details into redis
fm3 Apr 2, 2026
c90b1c9
rename dao
fm3 Apr 2, 2026
8863dd4
Merge branch 'master' into upload-mags-attachments
fm3 Apr 7, 2026
14181c4
evolution
fm3 Apr 7, 2026
638cdbd
changelog
fm3 Apr 7, 2026
de4c2a3
respect uploadIsPending bool in DAOs
fm3 Apr 7, 2026
03672f5
Merge branch 'master' into upload-mags-attachments
fm3 Apr 7, 2026
ded3c82
wip reserve routes
fm3 Apr 7, 2026
b008682
wip report upload from ds to wk
fm3 Apr 7, 2026
6429e04
Merge branch 'master' into upload-mags-attachments
fm3 Apr 7, 2026
4ef0540
adapt frontend ds upload, wip finish checks
fm3 Apr 7, 2026
d5a038f
report routes
fm3 Apr 7, 2026
fa65477
bump schema version
fm3 Apr 7, 2026
6763bae
changelog
fm3 Apr 7, 2026
05522d4
fix route names
fm3 Apr 7, 2026
bbebdd9
adapt test db
fm3 Apr 7, 2026
0e6b8c1
assert cancel domain
fm3 Apr 7, 2026
3a67e99
Merge branch 'master' into upload-mags-attachments
fm3 Apr 8, 2026
3920836
format
fm3 Apr 8, 2026
59410bb
adapt e2e test; retry on cache lookup failed for type postgres error,…
fm3 Apr 8, 2026
fcdf4f5
fixes
fm3 Apr 8, 2026
118ef59
move to target; misc fixes
fm3 Apr 8, 2026
7da534d
invalidate cache
fm3 Apr 8, 2026
fc6feb7
unused param
fm3 Apr 8, 2026
a11fd2e
v13 api backwards compatibility
fm3 Apr 8, 2026
6d4f3a4
Merge branch 'master' into upload-mags-attachments
fm3 Apr 9, 2026
173fedd
add unversioned upload/test chunk routes for compat with old libs
fm3 Apr 9, 2026
1315546
WIP handle pending; clean up signatures in RedisTemporaryStore
fm3 Apr 9, 2026
3db3091
cleanup, overwritePending
fm3 Apr 9, 2026
b0c8f21
fix copy-paste errors
fm3 Apr 9, 2026
9e0cd3d
format
fm3 Apr 9, 2026
ead0339
implement feedback
fm3 Apr 9, 2026
58af571
typo
fm3 Apr 9, 2026
f06b5dd
format
fm3 Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions app/controllers/DatasetController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ case class ReserveAttachmentUploadToPathRequest(
attachmentName: String,
attachmentType: LayerAttachmentType.Value,
attachmentDataformat: LayerAttachmentDataformat.Value,
pathPrefix: Option[UPath]
pathPrefix: Option[UPath],
overwritePending: Option[Boolean] = None
)

object ReserveAttachmentUploadToPathRequest {
Expand Down Expand Up @@ -166,14 +167,14 @@ class DatasetController @Inject()(userService: UserService,
wKRemoteSegmentAnythingClient: WKRemoteSegmentAnythingClient,
teamService: TeamService,
datasetDAO: DatasetDAO,
datasetLayerAttachmentsDAO: DatasetLayerAttachmentsDAO,
datasetLayerAttachmentsDAO: DatasetLayerAttachmentDAO,
datasetUploadToPathsService: UploadToPathsService,
folderService: FolderService,
thumbnailService: ThumbnailService,
thumbnailCachingService: ThumbnailCachingService,
usedStorageService: UsedStorageService,
conf: WkConf,
datasetMagsDAO: DatasetMagsDAO,
datasetMagsDAO: DatasetMagDAO,
slackNotificationService: SlackNotificationService,
authenticationService: AccessibleBySwitchingService,
analyticsService: AnalyticsService,
Expand Down Expand Up @@ -726,6 +727,7 @@ class DatasetController @Inject()(userService: UserService,
for {
dataset <- datasetDAO.findOne(datasetId) ?~> notFoundMessage(datasetId.toString) ~> NOT_FOUND
_ <- Fox.assertTrue(datasetService.isEditableBy(dataset, Some(request.identity))) ?~> "notAllowed" ~> FORBIDDEN
_ <- datasetMagsDAO.findOneWithPendingUploadToPath(datasetId, request.body.layerName, request.body.mag) ?~> "dataset.finishMagUploadToPath.notPending"
_ <- datasetMagsDAO.finishUploadToPath(datasetId, request.body.layerName, request.body.mag)
dataStoreClient <- datasetService.clientFor(dataset)
_ <- Fox.runIf(!dataset.isVirtual) {
Expand Down Expand Up @@ -753,10 +755,15 @@ class DatasetController @Inject()(userService: UserService,
for {
dataset <- datasetDAO.findOne(datasetId) ?~> notFoundMessage(datasetId.toString) ~> NOT_FOUND
_ <- Fox.assertTrue(datasetService.isEditableBy(dataset, Some(request.identity))) ?~> "notAllowed" ~> FORBIDDEN
_ <- datasetLayerAttachmentsDAO.findOneWithPendingUploadToPath(
datasetId,
request.body.layerName,
request.body.attachmentType,
request.body.attachmentName) ?~> "dataset.finishAttachmentUploadToPath.notPending"
_ <- datasetLayerAttachmentsDAO.finishUploadToPath(datasetId,
request.body.layerName,
request.body.attachmentName,
request.body.attachmentType)
request.body.attachmentType,
request.body.attachmentName)
dataStoreClient <- datasetService.clientFor(dataset)
_ <- Fox.runIf(!dataset.isVirtual) {
for {
Expand Down
143 changes: 127 additions & 16 deletions app/controllers/WKRemoteDataStoreController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,26 @@ import com.scalableminds.util.objectid.ObjectId
import com.scalableminds.util.time.Instant
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.controllers.JobExportProperties
import com.scalableminds.webknossos.datastore.helpers.UPath
import com.scalableminds.webknossos.datastore.models.UnfinishedUpload
import com.scalableminds.webknossos.datastore.models.datasource.{
DataSource,
DataSourceId,
DataSourceStatus,
LayerAttachmentType,
UnusableDataSource
}
import com.scalableminds.webknossos.datastore.services.{DataSourcePathInfo, DataStoreStatus}
import com.scalableminds.webknossos.datastore.services.uploading.{
AttachmentUploadAdditionalInfo,
AttachmentUploadInfo,
DatasetUploadAdditionalInfo,
DatasetUploadInfo,
MagUploadAdditionalInfo,
MagUploadInfo,
ReportAttachmentUploadParameters,
ReportDatasetUploadParameters,
ReserveAdditionalInformation,
ReserveUploadInformation
ReportMagUploadParameters
}
import com.typesafe.scalalogging.LazyLogging
import models.dataset._
Expand Down Expand Up @@ -46,6 +54,9 @@ class WKRemoteDataStoreController @Inject()(
userDAO: UserDAO,
teamDAO: TeamDAO,
jobDAO: JobDAO,
datasetMagDAO: DatasetMagDAO,
datasetAttachmentDAO: DatasetLayerAttachmentDAO,
uploadToPathsService: UploadToPathsService,
credentialDAO: CredentialDAO,
wkSilhouetteEnvironment: WkSilhouetteEnvironment)(implicit ec: ExecutionContext, bodyParsers: PlayBodyParsers)
extends Controller
Expand All @@ -54,45 +65,103 @@ class WKRemoteDataStoreController @Inject()(
val bearerTokenService: WebknossosBearerTokenAuthenticatorService =
wkSilhouetteEnvironment.combinedAuthenticatorService.tokenAuthenticatorService

def reserveDatasetUpload(name: String, key: String, token: String): Action[ReserveUploadInformation] =
Action.async(validateJson[ReserveUploadInformation]) { implicit request =>
def reserveDatasetUpload(name: String, key: String, token: String): Action[DatasetUploadInfo] =
Action.async(validateJson[DatasetUploadInfo]) { implicit request =>
dataStoreService.validateAccess(name, key) { dataStore =>
val uploadInfo = request.body
for {
user <- bearerTokenService.userForToken(token) ~> FORBIDDEN
organization <- organizationDAO.findOne(uploadInfo.organization)(GlobalAccessContext) ?~> Messages(
organization <- organizationDAO.findOne(uploadInfo.organizationId)(GlobalAccessContext) ?~> Messages(
"organization.notFound",
uploadInfo.organization) ~> NOT_FOUND
uploadInfo.organizationId) ~> NOT_FOUND
usedStorageBytes <- organizationDAO.getUsedStorage(organization._id)
_ <- Fox.runOptional(organization.includedStorageBytes)(includedStorage =>
Fox.fromBool(usedStorageBytes + uploadInfo.totalFileSizeInBytes.getOrElse(0L) <= includedStorage)) ?~> "dataset.upload.storageExceeded" ~> FORBIDDEN
Fox.fromBool(usedStorageBytes + uploadInfo.resumableUploadInfo.totalFileSizeInBytes
.getOrElse(0L) <= includedStorage)) ?~> "dataset.upload.storageExceeded" ~> FORBIDDEN
_ <- Fox.fromBool(organization._id == user._organization) ?~> "notAllowed" ~> FORBIDDEN
_ <- datasetService.assertValidDatasetName(uploadInfo.name)
_ <- datasetService.assertValidDatasetName(uploadInfo.datasetName)
_ <- Fox.fromBool(dataStore.onlyAllowedOrganization.forall(_ == organization._id)) ?~> "dataset.upload.Datastore.restricted"
_ <- Fox.serialCombined(uploadInfo.layersToLink.getOrElse(List.empty))(l =>
layerToLinkService.validateLayerToLink(l, user)) ?~> "dataset.upload.invalidLinkedLayers"
_ <- Fox.runIf(request.body.requireUniqueName.getOrElse(false))(
datasetService.assertNewDatasetNameUnique(request.body.name, organization._id))
datasetService.assertNewDatasetNameUnique(request.body.datasetName, organization._id))
preliminaryDataSource = UnusableDataSource(DataSourceId("", ""), None, DataSourceStatus.notYetUploaded)
dataset <- datasetService.createAndSetUpDataset(
uploadInfo.name,
uploadInfo.datasetName,
dataStore,
preliminaryDataSource,
uploadInfo.folderId,
user,
isVirtual = uploadInfo.isVirtual.getOrElse(true),
creationType = DatasetCreationType.Upload
) ?~> "dataset.upload.creation.failed"
_ <- datasetService.addInitialTeams(dataset, uploadInfo.initialTeams, user)(AuthorizedAccessContext(user))
additionalInfo = ReserveAdditionalInformation(dataset._id, dataset.directoryName)
_ <- datasetService.addInitialTeams(dataset, uploadInfo.initialTeamIds, user)(AuthorizedAccessContext(user))
additionalInfo = DatasetUploadAdditionalInfo(dataset._id, dataset.directoryName)
} yield Ok(Json.toJson(additionalInfo))
}
}

def getUnfinishedUploadsForUser(name: String,
key: String,
token: String,
organizationId: String): Action[AnyContent] =
def reserveMagUpload(name: String, key: String, token: String): Action[MagUploadInfo] =
Action.async(validateJson[MagUploadInfo]) { implicit request =>
dataStoreService.validateAccess(name, key) { dataStore =>
// DS write access was asserted already at this point.
for {
user <- bearerTokenService.userForToken(token)
dataset <- datasetDAO.findOne(request.body.datasetId)(AuthorizedAccessContext(user))
_ <- Fox.fromBool(dataset.isVirtual) ?~> "dataset.reserveMagUpload.notVirtual"
(dataSource, dataLayer) <- datasetService.getDataSourceAndLayerFor(dataset, request.body.layerName)
_ <- Fox.fromBool(!dataLayer.mags.exists(_.mag.maxDim == request.body.mag.mag.maxDim)) ?~> s"New mag ${request.body.mag.mag} conflicts with existing mag of the layer."
_ <- Fox.fromBool(dataset._dataStore == dataStore.name) ?~> "Cannot upload mag to existing dataset via different datastore."
_ <- uploadToPathsService.handleExistingPendingMag(dataset,
request.body.layerName,
request.body.mag.mag,
request.body.overwritePending)
_ <- datasetMagDAO.insertWithUploadPending(request.body.datasetId,
request.body.layerName,
request.body.mag.mag,
request.body.mag.axisOrder,
request.body.mag.channelIndex)
} yield Ok(Json.toJson(MagUploadAdditionalInfo(dataSource.id)))
}
}

def reserveAttachmentUpload(name: String, key: String, token: String): Action[AttachmentUploadInfo] =
Action.async(validateJson[AttachmentUploadInfo]) { implicit request =>
dataStoreService.validateAccess(name, key) { dataStore =>
// DS write access was asserted already at this point.
for {
user <- bearerTokenService.userForToken(token)
dataset <- datasetDAO.findOne(request.body.datasetId)(AuthorizedAccessContext(user))
_ <- Fox.fromBool(dataset.isVirtual) ?~> "dataset.reserveAttachmentUpload.notVirtual"
(dataSource, dataLayer) <- datasetService.getDataSourceAndLayerFor(dataset, request.body.layerName)
isSingletonAttachment = LayerAttachmentType.isSingletonAttachment(request.body.attachmentType)
existsError = if (isSingletonAttachment) "attachment.singleton.alreadyFilled" else "attachment.name.taken"
existingAttachmentOpt = dataLayer.attachments.flatMap(
_.getByTypeAndNameAlwaysReturnSingletons(request.body.attachmentType, request.body.attachment.name))
_ <- Fox.fromBool(existingAttachmentOpt.isEmpty) ?~> existsError
_ <- Fox.fromBool(dataset._dataStore == dataStore.name) ?~> "Cannot upload attachment to existing dataset via different datastore."
dummyAttachmentPath <- UPath.fromString("<pending upload>").toFox
_ <- uploadToPathsService.handleExistingPendingAttachment(dataset,
request.body.layerName,
request.body.attachmentType,
request.body.attachment.name,
request.body.overwritePending)
_ <- datasetAttachmentDAO.insertWithUploadPending(
request.body.datasetId,
request.body.layerName,
request.body.attachment.name,
request.body.attachmentType,
request.body.attachment.dataFormat,
dummyAttachmentPath
)
Comment thread
fm3 marked this conversation as resolved.
} yield Ok(Json.toJson(AttachmentUploadAdditionalInfo(dataSource.id)))
}
}

def getUnfinishedDatasetUploadsForUser(name: String,
key: String,
token: String,
organizationId: String): Action[AnyContent] =
Action.async { implicit request =>
dataStoreService.validateAccess(name, key) { _ =>
for {
Expand Down Expand Up @@ -149,6 +218,48 @@ class WKRemoteDataStoreController @Inject()(
}
}

def reportMagUpload(name: String, key: String): Action[ReportMagUploadParameters] =
Action.async(validateJson[ReportMagUploadParameters]) { implicit request =>
dataStoreService.validateAccess(name, key) { _ =>
for {
dataset <- datasetDAO.findOne(request.body.datasetId)(GlobalAccessContext) ?~> Messages(
"dataset.notFound",
request.body.datasetId) ~> NOT_FOUND
_ <- datasetMagDAO.findOneWithPendingUpload(request.body.datasetId,
request.body.layerName,
request.body.mag.mag) ?~> "dataset.finishMagUpload.notPending"
_ <- request.body.mag.path.toFox ?~> "dataset.finishMagUpload.pathNotSet"
_ <- datasetMagDAO.finishUpload(request.body.datasetId, request.body.layerName, request.body.mag)
dataStoreClient <- datasetService.clientFor(dataset)(GlobalAccessContext)
_ <- dataStoreClient.invalidateDatasetInDSCache(dataset._id)
_ <- usedStorageService.refreshStorageReportForDataset(dataset)
} yield Ok
}
}

def reportAttachmentUpload(name: String, key: String): Action[ReportAttachmentUploadParameters] =
Action.async(validateJson[ReportAttachmentUploadParameters]) { implicit request =>
dataStoreService.validateAccess(name, key) { _ =>
for {
dataset <- datasetDAO.findOne(request.body.datasetId)(GlobalAccessContext) ?~> Messages(
"dataset.notFound",
request.body.datasetId) ~> NOT_FOUND
_ <- datasetAttachmentDAO.findOneWithPendingUpload(
request.body.datasetId,
request.body.layerName,
request.body.attachmentType,
request.body.attachment.name) ?~> "dataset.finishAttachmentUpload.notPending"
_ <- datasetAttachmentDAO.finishUpload(request.body.datasetId,
request.body.layerName,
request.body.attachmentType,
request.body.attachment)
dataStoreClient <- datasetService.clientFor(dataset)(GlobalAccessContext)
_ <- dataStoreClient.invalidateDatasetInDSCache(dataset._id)
_ <- usedStorageService.refreshStorageReportForDataset(dataset)
} yield Ok
}
Comment thread
fm3 marked this conversation as resolved.
}

def statusUpdate(name: String, key: String): Action[DataStoreStatus] = Action.async(validateJson[DataStoreStatus]) {
implicit request =>
dataStoreService.validateAccess(name, key) { _ =>
Expand Down
Loading
Loading