Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions cmd/uncloud/service/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type runOptions struct {
image string
machines []string
memory dockeropts.MemBytes
memoryReservation dockeropts.MemBytes
cpuReservation dockeropts.NanoCPUs
mode string
name string
privileged bool
Expand Down Expand Up @@ -62,6 +64,8 @@ func NewRunCommand() *cobra.Command {
cmd.Flags().VarP(&opts.cpu, "cpu", "",
"Maximum number of CPU cores a service container can use. Fractional values are allowed: "+
"0.5 for half a core or 2.25 for two and a quarter cores.")
cmd.Flags().Var(&opts.cpuReservation, "reserve-cpu",
"Minimum CPU cores to reserve for placement (nanocores). Fractional values are allowed, e.g. 0.5 for half a core.")
cmd.Flags().StringVar(&opts.entrypoint, "entrypoint", "",
"Overwrite the default ENTRYPOINT of the image. Pass an empty string \"\" to reset it.")
cmd.Flags().StringSliceVarP(&opts.env, "env", "e", nil,
Expand All @@ -78,6 +82,8 @@ func NewRunCommand() *cobra.Command {
"Maximum amount of memory a service container can use. Value is a positive integer with optional unit suffix "+
"(b, k, m, g). Default unit is bytes if no suffix specified.\n"+
"Examples: 1073741824, 1024m, 1g (all equal 1 gibibyte)")
cmd.Flags().Var(&opts.memoryReservation, "reserve-memory",
"Minimum memory to reserve for placement. Value is a positive integer with optional unit suffix (b, k, m, g).")
cmd.Flags().StringVarP(&opts.name, "name", "n", "",
"Assign a name to the service. A random name is generated if not specified.")
cmd.Flags().BoolVar(&opts.privileged, "privileged", false,
Expand Down Expand Up @@ -210,8 +216,10 @@ func prepareServiceSpec(opts runOptions) (api.ServiceSpec, error) {
Privileged: opts.privileged,
PullPolicy: opts.pull,
Resources: api.ContainerResources{
CPU: opts.cpu.Value(),
Memory: opts.memory.Value(),
CPU: opts.cpu.Value(),
Memory: opts.memory.Value(),
CPUReservation: opts.cpuReservation.Value(),
MemoryReservation: opts.memoryReservation.Value(),
},
User: opts.user,
VolumeMounts: mounts,
Expand Down
235 changes: 140 additions & 95 deletions internal/machine/api/pb/machine.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions internal/machine/api/pb/machine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ message MachineInfo {
string name = 2;
NetworkConfig network = 3;
IP public_ip = 4;
// Resource capacity and reservations for scheduling.
int64 total_cpu_nanos = 5; // Total CPU in nanocores (1e9 = 1 core)
int64 total_memory_bytes = 6;
int64 reserved_cpu_nanos = 7;
int64 reserved_memory_bytes = 8;
}

message NetworkConfig {
Expand Down
41 changes: 38 additions & 3 deletions internal/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"
"sync"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/go-connections/sockets"
"github.com/psviderski/uncloud/internal/corrosion"
Expand Down Expand Up @@ -864,16 +865,50 @@ func (m *Machine) Token(_ context.Context, _ *emptypb.Empty) (*pb.TokenResponse,
return &pb.TokenResponse{Token: tokenStr}, nil
}

func (m *Machine) Inspect(_ context.Context, _ *emptypb.Empty) (*pb.MachineInfo, error) {
return &pb.MachineInfo{
func (m *Machine) Inspect(ctx context.Context, _ *emptypb.Empty) (*pb.MachineInfo, error) {
info := &pb.MachineInfo{
Id: m.state.ID,
Name: m.state.Name,
Network: &pb.NetworkConfig{
Subnet: pb.NewIPPrefix(m.state.Network.Subnet),
ManagementIp: pb.NewIP(m.state.Network.ManagementIP),
PublicKey: m.state.Network.PublicKey,
},
}, nil
}

// Populate resource capacity and reservations for scheduling.
if m.dockerService != nil {
if err := m.populateResources(ctx, info); err != nil {
slog.Warn("Failed to populate machine resources.", "err", err)
}
}

return info, nil
}

// populateResources fills in the resource capacity and reservation fields of MachineInfo.
func (m *Machine) populateResources(ctx context.Context, info *pb.MachineInfo) error {
// Get system info for total CPU and memory.
dockerInfo, err := m.dockerService.Client.Info(ctx)
if err != nil {
return fmt.Errorf("get docker info: %w", err)
}

info.TotalCpuNanos = int64(dockerInfo.NCPU) * 1e9
info.TotalMemoryBytes = dockerInfo.MemTotal

// Sum up reserved resources from running containers.
containers, err := m.dockerService.ListServiceContainers(ctx, "", container.ListOptions{})
if err != nil {
return fmt.Errorf("list containers: %w", err)
}

for _, ctr := range containers {
info.ReservedCpuNanos += ctr.ServiceSpec.Container.Resources.CPUReservation
info.ReservedMemoryBytes += ctr.ServiceSpec.Container.Resources.MemoryReservation
}

return nil
}

// IsNetworkReady returns true if the Docker network is ready for containers.
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ const (
type ContainerResources struct {
// CPU is the maximum amount of CPU nanocores (1000000000 = 1 CPU core) the container can use.
CPU int64
// CPUReservation is the minimum amount of CPU nanocores the container needs to run efficiently.
// Used by the scheduler to ensure machines have sufficient available CPU before placement.
CPUReservation int64
// Memory is the maximum amount of memory (in bytes) the container can use.
Memory int64
// MemoryReservation is the minimum amount of memory (in bytes) the container needs to run efficiently.
// TODO: implement a placement constraint that checks available memory on machines.
// Used by the scheduler to ensure machines have sufficient available memory before placement.
MemoryReservation int64
// Device reservations/requests for access to things like GPUs
DeviceReservations []container.DeviceRequest
Expand Down
4 changes: 4 additions & 0 deletions pkg/client/compose/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func resourcesFromCompose(service types.ServiceConfig) api.ContainerResources {
}
}
if service.Deploy.Resources.Reservations != nil {
if service.Deploy.Resources.Reservations.NanoCPUs > 0 {
// NanoCPUs is actually a CPU fraction, not nanocores.
resources.CPUReservation = int64(service.Deploy.Resources.Reservations.NanoCPUs * 1e9)
}
if service.Deploy.Resources.Reservations.MemoryBytes > 0 {
resources.MemoryReservation = int64(service.Deploy.Resources.Reservations.MemoryBytes)
}
Expand Down
52 changes: 52 additions & 0 deletions pkg/client/deploy/scheduler/constraint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"fmt"
"reflect"
"slices"
"strings"
Expand Down Expand Up @@ -45,6 +46,15 @@ func constraintsFromSpec(spec api.ServiceSpec) []Constraint {
})
}

// Add resource constraint if CPU or memory reservations are specified.
resources := spec.Container.Resources
if resources.CPUReservation > 0 || resources.MemoryReservation > 0 {
constraints = append(constraints, &ResourceConstraint{
RequiredCPU: resources.CPUReservation,
RequiredMemory: resources.MemoryReservation,
})
}

return constraints
}

Expand Down Expand Up @@ -138,3 +148,45 @@ func (c *VolumesConstraint) Description() string {

return "Volumes: " + strings.Join(volumeNames, ", ")
}

// ResourceConstraint restricts container placement to machines that have sufficient available resources.
// This is opt-in: if no reservations are set (both values are 0), the constraint always passes.
type ResourceConstraint struct {
// RequiredCPU is the CPU reservation in nanocores (1e9 = 1 core).
RequiredCPU int64
// RequiredMemory is the memory reservation in bytes.
RequiredMemory int64
}

// Evaluate determines if a machine has sufficient available resources.
// Returns true if the machine has enough unreserved CPU and memory, or if no reservations are required.
// This accounts for both running containers and containers scheduled during this planning session.
func (c *ResourceConstraint) Evaluate(machine *Machine) bool {
// If no reservations are set, constraint always passes (opt-in behavior).
if c.RequiredCPU == 0 && c.RequiredMemory == 0 {
return true
}

if c.RequiredCPU > 0 && machine.AvailableCPU() < c.RequiredCPU {
return false
}
if c.RequiredMemory > 0 && machine.AvailableMemory() < c.RequiredMemory {
return false
}
return true
}

func (c *ResourceConstraint) Description() string {
if c.RequiredCPU == 0 && c.RequiredMemory == 0 {
return "No resource constraint"
}

var parts []string
if c.RequiredCPU > 0 {
parts = append(parts, fmt.Sprintf("CPU: %.2f cores", float64(c.RequiredCPU)/1e9))
}
if c.RequiredMemory > 0 {
parts = append(parts, fmt.Sprintf("Memory: %d MB", c.RequiredMemory/(1024*1024)))
}
return "Resource reservation: " + strings.Join(parts, ", ")
}
Loading