diff --git a/cmd/uncloud/service/run.go b/cmd/uncloud/service/run.go index 45dc04af..3df13f15 100644 --- a/cmd/uncloud/service/run.go +++ b/cmd/uncloud/service/run.go @@ -26,6 +26,8 @@ type runOptions struct { image string machines []string memory dockeropts.MemBytes + memoryReservation dockeropts.MemBytes + cpuReservation dockeropts.NanoCPUs mode string name string privileged bool @@ -62,6 +64,8 @@ func NewRunCommand() *cobra.Command { cmd.Flags().VarP(&opts.cpu, "cpu", "", "Maximum number of CPU cores a service container can use. Fractional values are allowed: "+ "0.5 for half a core or 2.25 for two and a quarter cores.") + cmd.Flags().Var(&opts.cpuReservation, "reserve-cpu", + "Minimum CPU cores to reserve for placement (nanocores). Fractional values are allowed, e.g. 0.5 for half a core.") cmd.Flags().StringVar(&opts.entrypoint, "entrypoint", "", "Overwrite the default ENTRYPOINT of the image. Pass an empty string \"\" to reset it.") cmd.Flags().StringSliceVarP(&opts.env, "env", "e", nil, @@ -78,6 +82,8 @@ func NewRunCommand() *cobra.Command { "Maximum amount of memory a service container can use. Value is a positive integer with optional unit suffix "+ "(b, k, m, g). Default unit is bytes if no suffix specified.\n"+ "Examples: 1073741824, 1024m, 1g (all equal 1 gibibyte)") + cmd.Flags().Var(&opts.memoryReservation, "reserve-memory", + "Minimum memory to reserve for placement. Value is a positive integer with optional unit suffix (b, k, m, g).") cmd.Flags().StringVarP(&opts.name, "name", "n", "", "Assign a name to the service. A random name is generated if not specified.") cmd.Flags().BoolVar(&opts.privileged, "privileged", false, @@ -210,8 +216,10 @@ func prepareServiceSpec(opts runOptions) (api.ServiceSpec, error) { Privileged: opts.privileged, PullPolicy: opts.pull, Resources: api.ContainerResources{ - CPU: opts.cpu.Value(), - Memory: opts.memory.Value(), + CPU: opts.cpu.Value(), + Memory: opts.memory.Value(), + CPUReservation: opts.cpuReservation.Value(), + MemoryReservation: opts.memoryReservation.Value(), }, User: opts.user, VolumeMounts: mounts, diff --git a/internal/machine/api/pb/machine.pb.go b/internal/machine/api/pb/machine.pb.go index 4119dbca..c84fc831 100644 --- a/internal/machine/api/pb/machine.pb.go +++ b/internal/machine/api/pb/machine.pb.go @@ -30,6 +30,11 @@ type MachineInfo struct { Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` Network *NetworkConfig `protobuf:"bytes,3,opt,name=network,proto3" json:"network,omitempty"` PublicIp *IP `protobuf:"bytes,4,opt,name=public_ip,json=publicIp,proto3" json:"public_ip,omitempty"` + // Resource capacity and reservations for scheduling. + TotalCpuNanos int64 `protobuf:"varint,5,opt,name=total_cpu_nanos,json=totalCpuNanos,proto3" json:"total_cpu_nanos,omitempty"` // Total CPU in nanocores (1e9 = 1 core) + TotalMemoryBytes int64 `protobuf:"varint,6,opt,name=total_memory_bytes,json=totalMemoryBytes,proto3" json:"total_memory_bytes,omitempty"` + ReservedCpuNanos int64 `protobuf:"varint,7,opt,name=reserved_cpu_nanos,json=reservedCpuNanos,proto3" json:"reserved_cpu_nanos,omitempty"` + ReservedMemoryBytes int64 `protobuf:"varint,8,opt,name=reserved_memory_bytes,json=reservedMemoryBytes,proto3" json:"reserved_memory_bytes,omitempty"` } func (x *MachineInfo) Reset() { @@ -92,6 +97,34 @@ func (x *MachineInfo) GetPublicIp() *IP { return nil } +func (x *MachineInfo) GetTotalCpuNanos() int64 { + if x != nil { + return x.TotalCpuNanos + } + return 0 +} + +func (x *MachineInfo) GetTotalMemoryBytes() int64 { + if x != nil { + return x.TotalMemoryBytes + } + return 0 +} + +func (x *MachineInfo) GetReservedCpuNanos() int64 { + if x != nil { + return x.ReservedCpuNanos + } + return 0 +} + +func (x *MachineInfo) GetReservedMemoryBytes() int64 { + if x != nil { + return x.ReservedMemoryBytes + } + return 0 +} + type NetworkConfig struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -735,7 +768,7 @@ var file_internal_machine_api_pb_machine_proto_rawDesc = []byte{ 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x24, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, - 0x85, 0x01, 0x0a, 0x0b, 0x4d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, + 0xbd, 0x02, 0x0a, 0x0b, 0x4d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x2c, 0x0a, 0x07, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x18, 0x03, @@ -743,102 +776,114 @@ var file_internal_machine_api_pb_machine_proto_rawDesc = []byte{ 0x72, 0x6b, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x07, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x24, 0x0a, 0x09, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x5f, 0x69, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x50, 0x52, 0x08, 0x70, - 0x75, 0x62, 0x6c, 0x69, 0x63, 0x49, 0x70, 0x22, 0xae, 0x01, 0x0a, 0x0d, 0x4e, 0x65, 0x74, 0x77, - 0x6f, 0x72, 0x6b, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x25, 0x0a, 0x06, 0x73, 0x75, 0x62, - 0x6e, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x61, 0x70, 0x69, 0x2e, - 0x49, 0x50, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78, 0x52, 0x06, 0x73, 0x75, 0x62, 0x6e, 0x65, 0x74, - 0x12, 0x2c, 0x0a, 0x0d, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x69, - 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x50, - 0x52, 0x0c, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x70, 0x12, 0x29, - 0x0a, 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, - 0x0b, 0x32, 0x0b, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x50, 0x50, 0x6f, 0x72, 0x74, 0x52, 0x09, - 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x75, 0x62, - 0x6c, 0x69, 0x63, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x70, - 0x75, 0x62, 0x6c, 0x69, 0x63, 0x4b, 0x65, 0x79, 0x22, 0x50, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, - 0x6b, 0x50, 0x72, 0x65, 0x72, 0x65, 0x71, 0x75, 0x69, 0x73, 0x69, 0x74, 0x65, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x61, 0x74, 0x69, 0x73, 0x66, - 0x69, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x73, 0x61, 0x74, 0x69, 0x73, - 0x66, 0x69, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0xc3, 0x01, 0x0a, 0x12, 0x49, + 0x75, 0x62, 0x6c, 0x69, 0x63, 0x49, 0x70, 0x12, 0x26, 0x0a, 0x0f, 0x74, 0x6f, 0x74, 0x61, 0x6c, + 0x5f, 0x63, 0x70, 0x75, 0x5f, 0x6e, 0x61, 0x6e, 0x6f, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x0d, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x70, 0x75, 0x4e, 0x61, 0x6e, 0x6f, 0x73, 0x12, + 0x2c, 0x0a, 0x12, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x5f, + 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x10, 0x74, 0x6f, 0x74, + 0x61, 0x6c, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x2c, 0x0a, + 0x12, 0x72, 0x65, 0x73, 0x65, 0x72, 0x76, 0x65, 0x64, 0x5f, 0x63, 0x70, 0x75, 0x5f, 0x6e, 0x61, + 0x6e, 0x6f, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x10, 0x72, 0x65, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x64, 0x43, 0x70, 0x75, 0x4e, 0x61, 0x6e, 0x6f, 0x73, 0x12, 0x32, 0x0a, 0x15, 0x72, + 0x65, 0x73, 0x65, 0x72, 0x76, 0x65, 0x64, 0x5f, 0x6d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x5f, 0x62, + 0x79, 0x74, 0x65, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x13, 0x72, 0x65, 0x73, 0x65, + 0x72, 0x76, 0x65, 0x64, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x42, 0x79, 0x74, 0x65, 0x73, 0x22, + 0xae, 0x01, 0x0a, 0x0d, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x12, 0x25, 0x0a, 0x06, 0x73, 0x75, 0x62, 0x6e, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x0d, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x50, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78, + 0x52, 0x06, 0x73, 0x75, 0x62, 0x6e, 0x65, 0x74, 0x12, 0x2c, 0x0a, 0x0d, 0x6d, 0x61, 0x6e, 0x61, + 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x07, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x50, 0x52, 0x0c, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, + 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x70, 0x12, 0x29, 0x0a, 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, + 0x6e, 0x74, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x61, 0x70, 0x69, 0x2e, + 0x49, 0x50, 0x50, 0x6f, 0x72, 0x74, 0x52, 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, + 0x73, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x5f, 0x6b, 0x65, 0x79, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x4b, 0x65, 0x79, + 0x22, 0x50, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x50, 0x72, 0x65, 0x72, 0x65, 0x71, 0x75, + 0x69, 0x73, 0x69, 0x74, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, + 0x0a, 0x09, 0x73, 0x61, 0x74, 0x69, 0x73, 0x66, 0x69, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x09, 0x73, 0x61, 0x74, 0x69, 0x73, 0x66, 0x69, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, + 0x6f, 0x72, 0x22, 0xc3, 0x01, 0x0a, 0x12, 0x49, 0x6e, 0x69, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x20, 0x0a, 0x0b, 0x6d, 0x61, 0x63, + 0x68, 0x69, 0x6e, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, + 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x27, 0x0a, 0x07, 0x6e, + 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x61, + 0x70, 0x69, 0x2e, 0x49, 0x50, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78, 0x52, 0x07, 0x6e, 0x65, 0x74, + 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x26, 0x0a, 0x09, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x5f, 0x69, + 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x50, + 0x48, 0x00, 0x52, 0x08, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x49, 0x70, 0x12, 0x26, 0x0a, 0x0e, + 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x5f, 0x69, 0x70, 0x5f, 0x61, 0x75, 0x74, 0x6f, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x0c, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x49, 0x70, + 0x41, 0x75, 0x74, 0x6f, 0x42, 0x12, 0x0a, 0x10, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x5f, 0x69, + 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x41, 0x0a, 0x13, 0x49, 0x6e, 0x69, 0x74, + 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x2a, 0x0a, 0x07, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x10, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x49, 0x6e, + 0x66, 0x6f, 0x52, 0x07, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x22, 0x79, 0x0a, 0x12, 0x4a, + 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x2a, 0x0a, 0x07, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, + 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x07, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x12, 0x37, 0x0a, + 0x0e, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x5f, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x73, 0x18, + 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4d, 0x61, 0x63, 0x68, + 0x69, 0x6e, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0d, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x4d, 0x61, + 0x63, 0x68, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x0e, 0x0a, + 0x0c, 0x52, 0x65, 0x73, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xc3, 0x01, + 0x0a, 0x07, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, + 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6d, 0x6f, 0x64, + 0x65, 0x12, 0x36, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x73, 0x18, + 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x52, 0x0a, 0x63, + 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x73, 0x1a, 0x48, 0x0a, 0x09, 0x43, 0x6f, 0x6e, + 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x0a, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, + 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6d, 0x61, 0x63, 0x68, + 0x69, 0x6e, 0x65, 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, + 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, + 0x6e, 0x65, 0x72, 0x22, 0x27, 0x0a, 0x15, 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x40, 0x0a, 0x16, + 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x26, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x32, 0xc3, + 0x03, 0x0a, 0x07, 0x4d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x12, 0x4d, 0x0a, 0x12, 0x43, 0x68, + 0x65, 0x63, 0x6b, 0x50, 0x72, 0x65, 0x72, 0x65, 0x71, 0x75, 0x69, 0x73, 0x69, 0x74, 0x65, 0x73, + 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1f, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x43, + 0x68, 0x65, 0x63, 0x6b, 0x50, 0x72, 0x65, 0x72, 0x65, 0x71, 0x75, 0x69, 0x73, 0x69, 0x74, 0x65, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x0b, 0x49, 0x6e, 0x69, + 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x17, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x20, 0x0a, 0x0b, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x4e, 0x61, 0x6d, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x4e, - 0x61, 0x6d, 0x65, 0x12, 0x27, 0x0a, 0x07, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x50, 0x50, 0x72, 0x65, - 0x66, 0x69, 0x78, 0x52, 0x07, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x26, 0x0a, 0x09, - 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x5f, 0x69, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x07, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x50, 0x48, 0x00, 0x52, 0x08, 0x70, 0x75, 0x62, 0x6c, - 0x69, 0x63, 0x49, 0x70, 0x12, 0x26, 0x0a, 0x0e, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x5f, 0x69, - 0x70, 0x5f, 0x61, 0x75, 0x74, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x0c, - 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x49, 0x70, 0x41, 0x75, 0x74, 0x6f, 0x42, 0x12, 0x0a, 0x10, - 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x5f, 0x69, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x22, 0x41, 0x0a, 0x13, 0x49, 0x6e, 0x69, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x07, 0x6d, 0x61, 0x63, 0x68, 0x69, - 0x6e, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4d, - 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x07, 0x6d, 0x61, 0x63, 0x68, - 0x69, 0x6e, 0x65, 0x22, 0x79, 0x0a, 0x12, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2a, 0x0a, 0x07, 0x6d, 0x61, 0x63, - 0x68, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x61, 0x70, 0x69, - 0x2e, 0x4d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x07, 0x6d, 0x61, - 0x63, 0x68, 0x69, 0x6e, 0x65, 0x12, 0x37, 0x0a, 0x0e, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x5f, 0x6d, - 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, - 0x61, 0x70, 0x69, 0x2e, 0x4d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, - 0x0d, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x4d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x25, - 0x0a, 0x0d, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, - 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x0e, 0x0a, 0x0c, 0x52, 0x65, 0x73, 0x65, 0x74, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xc3, 0x01, 0x0a, 0x07, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, - 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x12, 0x36, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, - 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, - 0x61, 0x70, 0x69, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x43, 0x6f, 0x6e, 0x74, - 0x61, 0x69, 0x6e, 0x65, 0x72, 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, - 0x73, 0x1a, 0x48, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x12, 0x1d, - 0x0a, 0x0a, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x09, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x49, 0x64, 0x12, 0x1c, 0x0a, - 0x09, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x22, 0x27, 0x0a, 0x15, 0x49, - 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x02, 0x69, 0x64, 0x22, 0x40, 0x0a, 0x16, 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x53, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x26, - 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x0c, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x07, 0x73, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x32, 0xc3, 0x03, 0x0a, 0x07, 0x4d, 0x61, 0x63, 0x68, 0x69, - 0x6e, 0x65, 0x12, 0x4d, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x50, 0x72, 0x65, 0x72, 0x65, - 0x71, 0x75, 0x69, 0x73, 0x69, 0x74, 0x65, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x1a, 0x1f, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x50, 0x72, 0x65, 0x72, - 0x65, 0x71, 0x75, 0x69, 0x73, 0x69, 0x74, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x40, 0x0a, 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x12, 0x17, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x61, 0x70, 0x69, 0x2e, - 0x49, 0x6e, 0x69, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x3e, 0x0a, 0x0b, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x12, 0x17, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, - 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, + 0x74, 0x1a, 0x18, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x43, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3e, 0x0a, 0x0b, 0x4a, + 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x17, 0x2e, 0x61, 0x70, 0x69, + 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x33, 0x0a, 0x05, 0x54, + 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x12, 0x2e, 0x61, + 0x70, 0x69, 0x2e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x33, 0x0a, 0x07, 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, - 0x70, 0x74, 0x79, 0x12, 0x33, 0x0a, 0x05, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x16, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x12, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x33, 0x0a, 0x07, 0x49, 0x6e, 0x73, 0x70, - 0x65, 0x63, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x10, 0x2e, 0x61, 0x70, - 0x69, 0x2e, 0x4d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x32, 0x0a, - 0x05, 0x52, 0x65, 0x73, 0x65, 0x74, 0x12, 0x11, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x52, 0x65, 0x73, - 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x12, 0x49, 0x0a, 0x0e, 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x53, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x12, 0x1a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, - 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x1b, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x53, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x37, 0x5a, 0x35, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x73, 0x76, 0x69, 0x64, - 0x65, 0x72, 0x73, 0x6b, 0x69, 0x2f, 0x75, 0x6e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x69, 0x6e, - 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x2f, 0x61, - 0x70, 0x69, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x70, 0x74, 0x79, 0x1a, 0x10, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4d, 0x61, 0x63, 0x68, 0x69, 0x6e, + 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x32, 0x0a, 0x05, 0x52, 0x65, 0x73, 0x65, 0x74, 0x12, 0x11, + 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x52, 0x65, 0x73, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x49, 0x0a, 0x0e, 0x49, 0x6e, 0x73, + 0x70, 0x65, 0x63, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x1a, 0x2e, 0x61, 0x70, + 0x69, 0x2e, 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x49, 0x6e, + 0x73, 0x70, 0x65, 0x63, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x70, 0x73, 0x76, 0x69, 0x64, 0x65, 0x72, 0x73, 0x6b, 0x69, 0x2f, 0x75, 0x6e, + 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x6d, + 0x61, 0x63, 0x68, 0x69, 0x6e, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/machine/api/pb/machine.proto b/internal/machine/api/pb/machine.proto index bfa126f3..763683b3 100644 --- a/internal/machine/api/pb/machine.proto +++ b/internal/machine/api/pb/machine.proto @@ -25,6 +25,11 @@ message MachineInfo { string name = 2; NetworkConfig network = 3; IP public_ip = 4; + // Resource capacity and reservations for scheduling. + int64 total_cpu_nanos = 5; // Total CPU in nanocores (1e9 = 1 core) + int64 total_memory_bytes = 6; + int64 reserved_cpu_nanos = 7; + int64 reserved_memory_bytes = 8; } message NetworkConfig { diff --git a/internal/machine/machine.go b/internal/machine/machine.go index a2d1749b..0d74cb53 100644 --- a/internal/machine/machine.go +++ b/internal/machine/machine.go @@ -15,6 +15,7 @@ import ( "strconv" "sync" + "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" "github.com/docker/go-connections/sockets" "github.com/psviderski/uncloud/internal/corrosion" @@ -864,8 +865,8 @@ func (m *Machine) Token(_ context.Context, _ *emptypb.Empty) (*pb.TokenResponse, return &pb.TokenResponse{Token: tokenStr}, nil } -func (m *Machine) Inspect(_ context.Context, _ *emptypb.Empty) (*pb.MachineInfo, error) { - return &pb.MachineInfo{ +func (m *Machine) Inspect(ctx context.Context, _ *emptypb.Empty) (*pb.MachineInfo, error) { + info := &pb.MachineInfo{ Id: m.state.ID, Name: m.state.Name, Network: &pb.NetworkConfig{ @@ -873,7 +874,41 @@ func (m *Machine) Inspect(_ context.Context, _ *emptypb.Empty) (*pb.MachineInfo, ManagementIp: pb.NewIP(m.state.Network.ManagementIP), PublicKey: m.state.Network.PublicKey, }, - }, nil + } + + // Populate resource capacity and reservations for scheduling. + if m.dockerService != nil { + if err := m.populateResources(ctx, info); err != nil { + slog.Warn("Failed to populate machine resources.", "err", err) + } + } + + return info, nil +} + +// populateResources fills in the resource capacity and reservation fields of MachineInfo. +func (m *Machine) populateResources(ctx context.Context, info *pb.MachineInfo) error { + // Get system info for total CPU and memory. + dockerInfo, err := m.dockerService.Client.Info(ctx) + if err != nil { + return fmt.Errorf("get docker info: %w", err) + } + + info.TotalCpuNanos = int64(dockerInfo.NCPU) * 1e9 + info.TotalMemoryBytes = dockerInfo.MemTotal + + // Sum up reserved resources from running containers. + containers, err := m.dockerService.ListServiceContainers(ctx, "", container.ListOptions{}) + if err != nil { + return fmt.Errorf("list containers: %w", err) + } + + for _, ctr := range containers { + info.ReservedCpuNanos += ctr.ServiceSpec.Container.Resources.CPUReservation + info.ReservedMemoryBytes += ctr.ServiceSpec.Container.Resources.MemoryReservation + } + + return nil } // IsNetworkReady returns true if the Docker network is ready for containers. diff --git a/pkg/api/resources.go b/pkg/api/resources.go index 8aece7d3..ba13e859 100644 --- a/pkg/api/resources.go +++ b/pkg/api/resources.go @@ -12,10 +12,13 @@ const ( type ContainerResources struct { // CPU is the maximum amount of CPU nanocores (1000000000 = 1 CPU core) the container can use. CPU int64 + // CPUReservation is the minimum amount of CPU nanocores the container needs to run efficiently. + // Used by the scheduler to ensure machines have sufficient available CPU before placement. + CPUReservation int64 // Memory is the maximum amount of memory (in bytes) the container can use. Memory int64 // MemoryReservation is the minimum amount of memory (in bytes) the container needs to run efficiently. - // TODO: implement a placement constraint that checks available memory on machines. + // Used by the scheduler to ensure machines have sufficient available memory before placement. MemoryReservation int64 // Device reservations/requests for access to things like GPUs DeviceReservations []container.DeviceRequest diff --git a/pkg/client/compose/service.go b/pkg/client/compose/service.go index f5be0656..a11be72e 100644 --- a/pkg/client/compose/service.go +++ b/pkg/client/compose/service.go @@ -140,6 +140,10 @@ func resourcesFromCompose(service types.ServiceConfig) api.ContainerResources { } } if service.Deploy.Resources.Reservations != nil { + if service.Deploy.Resources.Reservations.NanoCPUs > 0 { + // NanoCPUs is actually a CPU fraction, not nanocores. + resources.CPUReservation = int64(service.Deploy.Resources.Reservations.NanoCPUs * 1e9) + } if service.Deploy.Resources.Reservations.MemoryBytes > 0 { resources.MemoryReservation = int64(service.Deploy.Resources.Reservations.MemoryBytes) } diff --git a/pkg/client/deploy/scheduler/constraint.go b/pkg/client/deploy/scheduler/constraint.go index dc762155..e31675cd 100644 --- a/pkg/client/deploy/scheduler/constraint.go +++ b/pkg/client/deploy/scheduler/constraint.go @@ -1,6 +1,7 @@ package scheduler import ( + "fmt" "reflect" "slices" "strings" @@ -45,6 +46,15 @@ func constraintsFromSpec(spec api.ServiceSpec) []Constraint { }) } + // Add resource constraint if CPU or memory reservations are specified. + resources := spec.Container.Resources + if resources.CPUReservation > 0 || resources.MemoryReservation > 0 { + constraints = append(constraints, &ResourceConstraint{ + RequiredCPU: resources.CPUReservation, + RequiredMemory: resources.MemoryReservation, + }) + } + return constraints } @@ -138,3 +148,45 @@ func (c *VolumesConstraint) Description() string { return "Volumes: " + strings.Join(volumeNames, ", ") } + +// ResourceConstraint restricts container placement to machines that have sufficient available resources. +// This is opt-in: if no reservations are set (both values are 0), the constraint always passes. +type ResourceConstraint struct { + // RequiredCPU is the CPU reservation in nanocores (1e9 = 1 core). + RequiredCPU int64 + // RequiredMemory is the memory reservation in bytes. + RequiredMemory int64 +} + +// Evaluate determines if a machine has sufficient available resources. +// Returns true if the machine has enough unreserved CPU and memory, or if no reservations are required. +// This accounts for both running containers and containers scheduled during this planning session. +func (c *ResourceConstraint) Evaluate(machine *Machine) bool { + // If no reservations are set, constraint always passes (opt-in behavior). + if c.RequiredCPU == 0 && c.RequiredMemory == 0 { + return true + } + + if c.RequiredCPU > 0 && machine.AvailableCPU() < c.RequiredCPU { + return false + } + if c.RequiredMemory > 0 && machine.AvailableMemory() < c.RequiredMemory { + return false + } + return true +} + +func (c *ResourceConstraint) Description() string { + if c.RequiredCPU == 0 && c.RequiredMemory == 0 { + return "No resource constraint" + } + + var parts []string + if c.RequiredCPU > 0 { + parts = append(parts, fmt.Sprintf("CPU: %.2f cores", float64(c.RequiredCPU)/1e9)) + } + if c.RequiredMemory > 0 { + parts = append(parts, fmt.Sprintf("Memory: %d MB", c.RequiredMemory/(1024*1024))) + } + return "Resource reservation: " + strings.Join(parts, ", ") +} diff --git a/pkg/client/deploy/scheduler/constraint_test.go b/pkg/client/deploy/scheduler/constraint_test.go new file mode 100644 index 00000000..d8f4e27c --- /dev/null +++ b/pkg/client/deploy/scheduler/constraint_test.go @@ -0,0 +1,618 @@ +package scheduler + +import ( + "testing" + + "github.com/docker/docker/api/types/mount" + "github.com/docker/docker/api/types/volume" + "github.com/psviderski/uncloud/internal/machine/api/pb" + "github.com/psviderski/uncloud/pkg/api" + "github.com/stretchr/testify/assert" +) + +func TestResourceConstraint_Evaluate(t *testing.T) { + t.Parallel() + + const ( + gb = 1024 * 1024 * 1024 + core = int64(1e9) + ) + + tests := []struct { + name string + constraint *ResourceConstraint + machine *Machine + want bool + }{ + { + name: "no resource requirements - always passes", + constraint: &ResourceConstraint{ + RequiredCPU: 0, + RequiredMemory: 0, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + TotalMemoryBytes: 8 * gb, + }, + }, + want: true, + }, + { + name: "no resource requirements - passes even with zero resources", + constraint: &ResourceConstraint{ + RequiredCPU: 0, + RequiredMemory: 0, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 0, + TotalMemoryBytes: 0, + }, + }, + want: true, + }, + { + name: "CPU only requirement - sufficient resources", + constraint: &ResourceConstraint{ + RequiredCPU: 2 * core, + RequiredMemory: 0, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + TotalMemoryBytes: 8 * gb, + }, + }, + want: true, + }, + { + name: "CPU only requirement - insufficient resources", + constraint: &ResourceConstraint{ + RequiredCPU: 4 * core, + RequiredMemory: 0, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 2 * core, + TotalMemoryBytes: 8 * gb, + }, + }, + want: false, + }, + { + name: "memory only requirement - sufficient resources", + constraint: &ResourceConstraint{ + RequiredCPU: 0, + RequiredMemory: 4 * gb, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + TotalMemoryBytes: 8 * gb, + }, + }, + want: true, + }, + { + name: "memory only requirement - insufficient resources", + constraint: &ResourceConstraint{ + RequiredCPU: 0, + RequiredMemory: 16 * gb, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + TotalMemoryBytes: 8 * gb, + }, + }, + want: false, + }, + { + name: "both CPU and memory required - both sufficient", + constraint: &ResourceConstraint{ + RequiredCPU: 2 * core, + RequiredMemory: 4 * gb, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + TotalMemoryBytes: 8 * gb, + }, + }, + want: true, + }, + { + name: "both required - insufficient CPU", + constraint: &ResourceConstraint{ + RequiredCPU: 8 * core, + RequiredMemory: 4 * gb, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + TotalMemoryBytes: 8 * gb, + }, + }, + want: false, + }, + { + name: "both required - insufficient memory", + constraint: &ResourceConstraint{ + RequiredCPU: 2 * core, + RequiredMemory: 16 * gb, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + TotalMemoryBytes: 8 * gb, + }, + }, + want: false, + }, + { + name: "exactly matching resources - passes", + constraint: &ResourceConstraint{ + RequiredCPU: 4 * core, + RequiredMemory: 8 * gb, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + TotalMemoryBytes: 8 * gb, + }, + }, + want: true, + }, + { + name: "accounts for scheduled CPU resources", + constraint: &ResourceConstraint{ + RequiredCPU: 2 * core, + RequiredMemory: 0, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + TotalMemoryBytes: 8 * gb, + }, + ScheduledCPU: 3 * core, // Only 1 core available + }, + want: false, + }, + { + name: "accounts for scheduled memory resources", + constraint: &ResourceConstraint{ + RequiredCPU: 0, + RequiredMemory: 4 * gb, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + TotalMemoryBytes: 8 * gb, + }, + ScheduledMemory: 6 * gb, // Only 2 GB available + }, + want: false, + }, + { + name: "accounts for both reserved and scheduled CPU resources", + constraint: &ResourceConstraint{ + RequiredCPU: 2 * core, + RequiredMemory: 0, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + ReservedCpuNanos: 1 * core, // 1 core reserved + TotalMemoryBytes: 8 * gb, + }, + ScheduledCPU: 2 * core, // 2 cores scheduled, only 1 available + }, + want: false, + }, + { + name: "accounts for both reserved and scheduled memory resources", + constraint: &ResourceConstraint{ + RequiredCPU: 0, + RequiredMemory: 4 * gb, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + TotalMemoryBytes: 8 * gb, + ReservedMemoryBytes: 2 * gb, // 2 GB reserved + }, + ScheduledMemory: 4 * gb, // 4 GB scheduled, only 2 GB available + }, + want: false, + }, + { + name: "passes with reserved resources when enough available", + constraint: &ResourceConstraint{ + RequiredCPU: 1 * core, + RequiredMemory: 2 * gb, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4 * core, + ReservedCpuNanos: 1 * core, + TotalMemoryBytes: 8 * gb, + ReservedMemoryBytes: 2 * gb, + }, + ScheduledCPU: 1 * core, + ScheduledMemory: 2 * gb, + }, + want: true, // 1 core and 2 GB still available + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.constraint.Evaluate(tt.machine) + assert.Equal(t, tt.want, result) + }) + } +} + +func TestResourceConstraint_Description(t *testing.T) { + t.Parallel() + + const ( + mb = 1024 * 1024 + core = int64(1e9) + ) + + tests := []struct { + name string + constraint *ResourceConstraint + want string + }{ + { + name: "no requirements", + constraint: &ResourceConstraint{ + RequiredCPU: 0, + RequiredMemory: 0, + }, + want: "No resource constraint", + }, + { + name: "CPU only", + constraint: &ResourceConstraint{ + RequiredCPU: 2 * core, + RequiredMemory: 0, + }, + want: "Resource reservation: CPU: 2.00 cores", + }, + { + name: "memory only", + constraint: &ResourceConstraint{ + RequiredCPU: 0, + RequiredMemory: 512 * mb, + }, + want: "Resource reservation: Memory: 512 MB", + }, + { + name: "both CPU and memory", + constraint: &ResourceConstraint{ + RequiredCPU: 1500000000, // 1.5 cores + RequiredMemory: 1024 * mb, + }, + want: "Resource reservation: CPU: 1.50 cores, Memory: 1024 MB", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.constraint.Description() + assert.Equal(t, tt.want, result) + }) + } +} + +func TestPlacementConstraint_Evaluate(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + constraint *PlacementConstraint + machine *Machine + want bool + }{ + { + name: "machine matches by ID", + constraint: &PlacementConstraint{ + Machines: []string{"machine-1"}, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + Id: "machine-1", + Name: "node1", + }, + }, + want: true, + }, + { + name: "machine matches by name", + constraint: &PlacementConstraint{ + Machines: []string{"node1"}, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + Id: "machine-1", + Name: "node1", + }, + }, + want: true, + }, + { + name: "machine not in list", + constraint: &PlacementConstraint{ + Machines: []string{"machine-2", "machine-3"}, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + Id: "machine-1", + Name: "node1", + }, + }, + want: false, + }, + { + name: "multiple machines - matches one", + constraint: &PlacementConstraint{ + Machines: []string{"machine-1", "machine-2"}, + }, + machine: &Machine{ + Info: &pb.MachineInfo{ + Id: "machine-2", + Name: "node2", + }, + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.constraint.Evaluate(tt.machine) + assert.Equal(t, tt.want, result) + }) + } +} + +func TestVolumesConstraint_Evaluate(t *testing.T) { + t.Parallel() + + volumeSpec := api.VolumeSpec{ + Name: "data", + Type: api.VolumeTypeVolume, + VolumeOptions: &api.VolumeOptions{Driver: &mount.Driver{Name: api.VolumeDriverLocal}}, + } + + tests := []struct { + name string + machine *Machine + want bool + }{ + { + name: "passes when volume exists on machine", + machine: &Machine{ + Info: &pb.MachineInfo{Id: "m1"}, + Volumes: []volume.Volume{{ + Name: volumeSpec.DockerVolumeName(), + Driver: api.VolumeDriverLocal, + }}, + }, + want: true, + }, + { + name: "passes when volume is scheduled on machine", + machine: &Machine{ + Info: &pb.MachineInfo{Id: "m1"}, + ScheduledVolumes: []api.VolumeSpec{volumeSpec}, + }, + want: true, + }, + { + name: "fails when volume missing", + machine: &Machine{Info: &pb.MachineInfo{Id: "m1"}}, + want: false, + }, + { + name: "fails when scheduled volume driver mismatches", + machine: &Machine{ + Info: &pb.MachineInfo{Id: "m1"}, + ScheduledVolumes: []api.VolumeSpec{{ + Name: "data", + Type: api.VolumeTypeVolume, + VolumeOptions: &api.VolumeOptions{Driver: &mount.Driver{Name: "nfs"}}, + }}, + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &VolumesConstraint{Volumes: []api.VolumeSpec{volumeSpec}} + assert.Equal(t, tt.want, c.Evaluate(tt.machine)) + }) + } +} + +func TestConstraintsFromSpec(t *testing.T) { + t.Parallel() + + const ( + core = int64(1e9) + mb = int64(1024 * 1024) + ) + + tests := []struct { + name string + spec api.ServiceSpec + wantPlacement bool + wantVolumes bool + wantResources bool + wantResourceConstraint *ResourceConstraint + }{ + { + name: "empty spec - no constraints", + spec: api.ServiceSpec{}, + wantPlacement: false, + wantVolumes: false, + wantResources: false, + }, + { + name: "placement machines set", + spec: api.ServiceSpec{ + Placement: api.Placement{ + Machines: []string{"machine-1"}, + }, + }, + wantPlacement: true, + wantVolumes: false, + wantResources: false, + }, + { + name: "volume mounts with VolumeTypeVolume", + spec: api.ServiceSpec{ + Container: api.ContainerSpec{ + VolumeMounts: []api.VolumeMount{ + {VolumeName: "data", ContainerPath: "/data"}, + }, + }, + Volumes: []api.VolumeSpec{ + {Name: "data", Type: api.VolumeTypeVolume}, + }, + }, + wantPlacement: false, + wantVolumes: true, + wantResources: false, + }, + { + name: "CPU reservation only", + spec: api.ServiceSpec{ + Container: api.ContainerSpec{ + Resources: api.ContainerResources{ + CPUReservation: 2 * core, + }, + }, + }, + wantPlacement: false, + wantVolumes: false, + wantResources: true, + wantResourceConstraint: &ResourceConstraint{ + RequiredCPU: 2 * core, + RequiredMemory: 0, + }, + }, + { + name: "memory reservation only", + spec: api.ServiceSpec{ + Container: api.ContainerSpec{ + Resources: api.ContainerResources{ + MemoryReservation: 512 * mb, + }, + }, + }, + wantPlacement: false, + wantVolumes: false, + wantResources: true, + wantResourceConstraint: &ResourceConstraint{ + RequiredCPU: 0, + RequiredMemory: 512 * mb, + }, + }, + { + name: "both CPU and memory reservations", + spec: api.ServiceSpec{ + Container: api.ContainerSpec{ + Resources: api.ContainerResources{ + CPUReservation: 2 * core, + MemoryReservation: 512 * mb, + }, + }, + }, + wantPlacement: false, + wantVolumes: false, + wantResources: true, + wantResourceConstraint: &ResourceConstraint{ + RequiredCPU: 2 * core, + RequiredMemory: 512 * mb, + }, + }, + { + name: "combined placement + volumes + resources", + spec: api.ServiceSpec{ + Placement: api.Placement{ + Machines: []string{"machine-1"}, + }, + Container: api.ContainerSpec{ + VolumeMounts: []api.VolumeMount{ + {VolumeName: "data", ContainerPath: "/data"}, + }, + Resources: api.ContainerResources{ + CPUReservation: 1 * core, + MemoryReservation: 256 * mb, + }, + }, + Volumes: []api.VolumeSpec{ + {Name: "data", Type: api.VolumeTypeVolume}, + }, + }, + wantPlacement: true, + wantVolumes: true, + wantResources: true, + wantResourceConstraint: &ResourceConstraint{ + RequiredCPU: 1 * core, + RequiredMemory: 256 * mb, + }, + }, + { + name: "bind volume does not create volumes constraint", + spec: api.ServiceSpec{ + Container: api.ContainerSpec{ + VolumeMounts: []api.VolumeMount{ + {VolumeName: "config", ContainerPath: "/config"}, + }, + }, + Volumes: []api.VolumeSpec{ + {Name: "config", Type: api.VolumeTypeBind, BindOptions: &api.BindOptions{HostPath: "/host/config"}}, + }, + }, + wantPlacement: false, + wantVolumes: false, + wantResources: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + constraints := constraintsFromSpec(tt.spec) + + var foundPlacement, foundVolumes, foundResources bool + var resourceConstraint *ResourceConstraint + + for _, c := range constraints { + switch v := c.(type) { + case *PlacementConstraint: + foundPlacement = true + case *VolumesConstraint: + foundVolumes = true + case *ResourceConstraint: + foundResources = true + resourceConstraint = v + } + } + + assert.Equal(t, tt.wantPlacement, foundPlacement, "PlacementConstraint") + assert.Equal(t, tt.wantVolumes, foundVolumes, "VolumesConstraint") + assert.Equal(t, tt.wantResources, foundResources, "ResourceConstraint") + + if tt.wantResourceConstraint != nil { + assert.Equal(t, tt.wantResourceConstraint, resourceConstraint) + } + }) + } +} diff --git a/pkg/client/deploy/scheduler/service.go b/pkg/client/deploy/scheduler/service.go index 04527b41..5718a441 100644 --- a/pkg/client/deploy/scheduler/service.go +++ b/pkg/client/deploy/scheduler/service.go @@ -1,30 +1,82 @@ package scheduler import ( + "container/heap" "errors" - "github.com/psviderski/uncloud/internal/machine/api/pb" "github.com/psviderski/uncloud/pkg/api" ) +// MachineRanker provides a comparison function for sorting machines during scheduling. +type MachineRanker interface { + // Less returns true if machine a should be preferred over machine b for scheduling. + Less(a, b *Machine) bool +} + +// MachineRankerFunc is an adapter to allow ordinary functions to be used as MachineRankers. +type MachineRankerFunc func(a, b *Machine) bool + +func (f MachineRankerFunc) Less(a, b *Machine) bool { + return f(a, b) +} + +// SpreadRanker prefers machines with fewer total containers (existing + scheduled), spreading load evenly. +// This provides round-robin-like behavior across machines. +var SpreadRanker = MachineRankerFunc(func(a, b *Machine) bool { + aTotal := a.ExistingContainers + a.ScheduledContainers + bTotal := b.ExistingContainers + b.ScheduledContainers + return aTotal < bTotal +}) + type ServiceScheduler struct { state *ClusterState spec api.ServiceSpec constraints []Constraint + ranker MachineRanker + heap *machineHeap } // NewServiceScheduler creates a new ServiceScheduler with the given cluster state and service specification. func NewServiceScheduler(state *ClusterState, spec api.ServiceSpec) *ServiceScheduler { + return NewServiceSchedulerWithRanker(state, spec, defaultRanker(spec)) +} + +// NewServiceSchedulerWithRanker creates a new ServiceScheduler with a custom machine ranker. +func NewServiceSchedulerWithRanker(state *ClusterState, spec api.ServiceSpec, ranker MachineRanker) *ServiceScheduler { constraints := constraintsFromSpec(spec) return &ServiceScheduler{ state: state, spec: spec, constraints: constraints, + ranker: ranker, + } +} + +// defaultRanker selects the ranker based on resource reservations. When no CPU/memory +// reservations are set, use a round-robin-like ranker that ignores existing containers +// to preserve HA spread even if a machine already hosts other workloads. When any +// reservation is set, fall back to SpreadRanker which considers existing + scheduled. +func defaultRanker(spec api.ServiceSpec) MachineRanker { + resources := spec.Container.Resources + if resources.CPUReservation == 0 && resources.MemoryReservation == 0 { + return NoReservationRanker } + return SpreadRanker } -// EligibleMachines returns a list of machines that satisfy all constraints for the next scheduled container. +// NoReservationRanker ignores existing containers and balances only the containers +// being scheduled in the current plan. This behaves like round-robin across eligible +// machines when no resource reservations are requested. +var NoReservationRanker = MachineRankerFunc(func(a, b *Machine) bool { + if a.ScheduledContainers != b.ScheduledContainers { + return a.ScheduledContainers < b.ScheduledContainers + } + return a.Info.Id < b.Info.Id +}) + +// EligibleMachines returns a list of machines that satisfy all constraints. +// Returns an error if no machines are eligible. func (s *ServiceScheduler) EligibleMachines() ([]*Machine, error) { var available []*Machine for _, machine := range s.state.Machines { @@ -33,7 +85,7 @@ func (s *ServiceScheduler) EligibleMachines() ([]*Machine, error) { } } if len(available) == 0 { - return nil, errors.New("no machines available that satisfy all constraints") + return nil, errors.New("no eligible machines") } return available, nil } @@ -47,8 +99,93 @@ func (s *ServiceScheduler) evaluateConstraints(machine *Machine) bool { return true } -func (s *ServiceScheduler) ScheduleContainer() ([]*pb.MachineInfo, error) { - // TODO: organise machines in a heap and supply a sort function from the strategy. Each scheduled container - // should update the machine and reorder it in the heap. - return nil, errors.New("not implemented") +// ScheduleContainer finds the best eligible machine for the next container, reserves resources on it, +// and returns the machine. Returns an error if no machine can accommodate the container. +func (s *ServiceScheduler) ScheduleContainer() (*Machine, error) { + // Initialize heap on first call. + if s.heap == nil { + eligible, err := s.EligibleMachines() + if err != nil { + return nil, err + } + s.heap = &machineHeap{machines: eligible, ranker: s.ranker} + heap.Init(s.heap) + } + + // Re-filter the heap to remove machines that no longer satisfy constraints (e.g., out of resources). + s.heap.machines = filterEligible(s.heap.machines, s.evaluateConstraints) + if len(s.heap.machines) == 0 { + return nil, errors.New("no eligible machines with sufficient resources") + } + heap.Init(s.heap) + + // Pop the best machine. + m := heap.Pop(s.heap).(*Machine) + + // Reserve resources for this container. + resources := s.spec.Container.Resources + m.ReserveResources(resources.CPUReservation, resources.MemoryReservation) + m.ScheduledContainers++ + + // Push back with updated state so it gets re-ranked. + heap.Push(s.heap, m) + + return m, nil +} + +// UnscheduleContainer rolls back a previous reservation for a container on the given machine. +// Useful when scheduling determined that no new container needs to run (e.g., an existing one is up-to-date). +func (s *ServiceScheduler) UnscheduleContainer(m *Machine) { + if s.heap == nil { + return + } + + resources := s.spec.Container.Resources + m.ScheduledCPU -= resources.CPUReservation + m.ScheduledMemory -= resources.MemoryReservation + if m.ScheduledContainers > 0 { + m.ScheduledContainers-- + } + + // Re-rank machines after resource adjustment. + s.heap.machines = filterEligible(s.heap.machines, s.evaluateConstraints) + heap.Init(s.heap) +} + +func filterEligible(machines []*Machine, predicate func(*Machine) bool) []*Machine { + result := machines[:0] + for _, m := range machines { + if predicate(m) { + result = append(result, m) + } + } + return result +} + +// machineHeap implements heap.Interface for scheduling machines. +type machineHeap struct { + machines []*Machine + ranker MachineRanker +} + +func (h *machineHeap) Len() int { return len(h.machines) } + +func (h *machineHeap) Less(i, j int) bool { + return h.ranker.Less(h.machines[i], h.machines[j]) +} + +func (h *machineHeap) Swap(i, j int) { + h.machines[i], h.machines[j] = h.machines[j], h.machines[i] +} + +func (h *machineHeap) Push(x any) { + h.machines = append(h.machines, x.(*Machine)) +} + +func (h *machineHeap) Pop() any { + old := h.machines + n := len(old) + m := old[n-1] + h.machines = old[0 : n-1] + return m } diff --git a/pkg/client/deploy/scheduler/service_test.go b/pkg/client/deploy/scheduler/service_test.go new file mode 100644 index 00000000..d5a8e548 --- /dev/null +++ b/pkg/client/deploy/scheduler/service_test.go @@ -0,0 +1,504 @@ +package scheduler + +import ( + "testing" + + "github.com/psviderski/uncloud/internal/machine/api/pb" + "github.com/psviderski/uncloud/pkg/api" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSpreadRanker(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + machineA *Machine + machineB *Machine + wantLess bool // true if A should be preferred over B + }{ + { + name: "prefers machine with fewer total containers", + machineA: &Machine{ + ExistingContainers: 1, + ScheduledContainers: 0, + }, + machineB: &Machine{ + ExistingContainers: 3, + ScheduledContainers: 0, + }, + wantLess: true, + }, + { + name: "considers scheduled containers too", + machineA: &Machine{ + ExistingContainers: 1, + ScheduledContainers: 2, + }, + machineB: &Machine{ + ExistingContainers: 2, + ScheduledContainers: 0, + }, + wantLess: false, // A has 3 total, B has 2 total + }, + { + name: "considers both existing and scheduled", + machineA: &Machine{ + ExistingContainers: 2, + ScheduledContainers: 1, + }, + machineB: &Machine{ + ExistingContainers: 1, + ScheduledContainers: 3, + }, + wantLess: true, // A has 3 total, B has 4 total + }, + { + name: "equal counts", + machineA: &Machine{ + ExistingContainers: 2, + ScheduledContainers: 1, + }, + machineB: &Machine{ + ExistingContainers: 2, + ScheduledContainers: 1, + }, + wantLess: false, // Not less when equal + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := SpreadRanker.Less(tt.machineA, tt.machineB) + assert.Equal(t, tt.wantLess, result) + }) + } +} + +func TestServiceScheduler_EligibleMachines(t *testing.T) { + t.Parallel() + + const ( + core = int64(1e9) + gb = int64(1024 * 1024 * 1024) + ) + + tests := []struct { + name string + state *ClusterState + spec api.ServiceSpec + wantCount int + wantErr string + wantMachine string // Optional: check specific machine is included + }{ + { + name: "all machines eligible - no constraints", + state: &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + {Info: &pb.MachineInfo{Id: "m2", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + {Info: &pb.MachineInfo{Id: "m3", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + }, + }, + spec: api.ServiceSpec{}, + wantCount: 3, + }, + { + name: "placement constraint filters machines", + state: &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", Name: "node1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + {Info: &pb.MachineInfo{Id: "m2", Name: "node2", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + {Info: &pb.MachineInfo{Id: "m3", Name: "node3", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + }, + }, + spec: api.ServiceSpec{ + Placement: api.Placement{Machines: []string{"node1", "node3"}}, + }, + wantCount: 2, + }, + { + name: "resource constraint filters machines without capacity", + state: &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + {Info: &pb.MachineInfo{Id: "m2", TotalCpuNanos: 1 * core, TotalMemoryBytes: 8 * gb}}, // Not enough CPU + {Info: &pb.MachineInfo{Id: "m3", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + }, + }, + spec: api.ServiceSpec{ + Container: api.ContainerSpec{ + Resources: api.ContainerResources{CPUReservation: 2 * core}, + }, + }, + wantCount: 2, + }, + { + name: "multiple constraints combined", + state: &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", Name: "node1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + {Info: &pb.MachineInfo{Id: "m2", Name: "node2", TotalCpuNanos: 1 * core, TotalMemoryBytes: 8 * gb}}, // Not enough CPU + {Info: &pb.MachineInfo{Id: "m3", Name: "node3", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, // Not in placement + }, + }, + spec: api.ServiceSpec{ + Placement: api.Placement{Machines: []string{"node1", "node2"}}, + Container: api.ContainerSpec{ + Resources: api.ContainerResources{CPUReservation: 2 * core}, + }, + }, + wantCount: 1, + wantMachine: "m1", + }, + { + name: "no eligible machines - returns error", + state: &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 1 * core, TotalMemoryBytes: 8 * gb}}, + {Info: &pb.MachineInfo{Id: "m2", TotalCpuNanos: 1 * core, TotalMemoryBytes: 8 * gb}}, + }, + }, + spec: api.ServiceSpec{ + Container: api.ContainerSpec{ + Resources: api.ContainerResources{CPUReservation: 4 * core}, + }, + }, + wantErr: "no eligible machines", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sched := NewServiceScheduler(tt.state, tt.spec) + machines, err := sched.EligibleMachines() + + if tt.wantErr != "" { + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErr) + return + } + + require.NoError(t, err) + assert.Len(t, machines, tt.wantCount) + + if tt.wantMachine != "" { + found := false + for _, m := range machines { + if m.Info.Id == tt.wantMachine { + found = true + break + } + } + assert.True(t, found, "Expected machine %s to be in eligible list", tt.wantMachine) + } + }) + } +} + +func TestServiceScheduler_ScheduleContainer(t *testing.T) { + t.Parallel() + + const ( + core = int64(1e9) + gb = int64(1024 * 1024 * 1024) + ) + + t.Run("single machine single container", func(t *testing.T) { + state := &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + }, + } + spec := api.ServiceSpec{ + Container: api.ContainerSpec{ + Resources: api.ContainerResources{CPUReservation: 1 * core}, + }, + } + + sched := NewServiceScheduler(state, spec) + m, err := sched.ScheduleContainer() + + require.NoError(t, err) + assert.Equal(t, "m1", m.Info.Id) + assert.Equal(t, 1*core, m.ScheduledCPU) + assert.Equal(t, 1, m.ScheduledContainers) + }) + + t.Run("multiple machines prefers least loaded when reservations set", func(t *testing.T) { + state := &ClusterState{ + Machines: []*Machine{ + { + Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}, + ExistingContainers: 3, + }, + { + Info: &pb.MachineInfo{Id: "m2", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}, + ExistingContainers: 1, + }, + { + Info: &pb.MachineInfo{Id: "m3", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}, + ExistingContainers: 2, + }, + }, + } + spec := api.ServiceSpec{ + Container: api.ContainerSpec{Resources: api.ContainerResources{CPUReservation: 1 * core}}, + } + + sched := NewServiceScheduler(state, spec) + m, err := sched.ScheduleContainer() + + require.NoError(t, err) + assert.Equal(t, "m2", m.Info.Id) // Least loaded machine + }) + + t.Run("no reservations round robins ignoring existing load", func(t *testing.T) { + state := &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}, ExistingContainers: 6}, + {Info: &pb.MachineInfo{Id: "m2", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}, ExistingContainers: 2}, + }, + } + spec := api.ServiceSpec{} // no reservations + + sched := NewServiceScheduler(state, spec) + + for i := 0; i < 4; i++ { + _, err := sched.ScheduleContainer() + require.NoError(t, err) + } + + assert.Equal(t, 2, state.Machines[0].ScheduledContainers) + assert.Equal(t, 2, state.Machines[1].ScheduledContainers) + }) + + t.Run("reserves resources on selected machine", func(t *testing.T) { + state := &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + }, + } + spec := api.ServiceSpec{ + Container: api.ContainerSpec{ + Resources: api.ContainerResources{ + CPUReservation: 2 * core, + MemoryReservation: 1 * gb, + }, + }, + } + + sched := NewServiceScheduler(state, spec) + m, err := sched.ScheduleContainer() + + require.NoError(t, err) + assert.Equal(t, 2*core, m.ScheduledCPU) + assert.Equal(t, 1*gb, m.ScheduledMemory) + assert.Equal(t, 1, m.ScheduledContainers) + }) + + t.Run("multiple calls spread across machines", func(t *testing.T) { + state := &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + {Info: &pb.MachineInfo{Id: "m2", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + }, + } + spec := api.ServiceSpec{} + + sched := NewServiceScheduler(state, spec) + + m1, err := sched.ScheduleContainer() + require.NoError(t, err) + m2, err := sched.ScheduleContainer() + require.NoError(t, err) + m3, err := sched.ScheduleContainer() + require.NoError(t, err) + m4, err := sched.ScheduleContainer() + require.NoError(t, err) + + // Should alternate between machines (due to spread ranking) + scheduled := map[string]int{ + m1.Info.Id: 1, + m2.Info.Id: 1, + m3.Info.Id: 1, + m4.Info.Id: 1, + } + // Each machine should have 2 containers scheduled + assert.Equal(t, 2, state.Machines[0].ScheduledContainers) + assert.Equal(t, 2, state.Machines[1].ScheduledContainers) + _ = scheduled + }) + + t.Run("returns error when no capacity - resource exhaustion", func(t *testing.T) { + state := &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 2 * core, TotalMemoryBytes: 8 * gb}}, + }, + } + spec := api.ServiceSpec{ + Container: api.ContainerSpec{ + Resources: api.ContainerResources{CPUReservation: 1 * core}, + }, + } + + sched := NewServiceScheduler(state, spec) + + // First two should succeed + _, err := sched.ScheduleContainer() + require.NoError(t, err) + _, err = sched.ScheduleContainer() + require.NoError(t, err) + + // Third should fail - no more CPU capacity + _, err = sched.ScheduleContainer() + assert.Error(t, err) + assert.Contains(t, err.Error(), "no eligible machines") + }) + + t.Run("re-filters after resource exhaustion on some machines", func(t *testing.T) { + state := &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 2 * core, TotalMemoryBytes: 8 * gb}}, + {Info: &pb.MachineInfo{Id: "m2", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + }, + } + spec := api.ServiceSpec{ + Container: api.ContainerSpec{ + Resources: api.ContainerResources{CPUReservation: 1 * core}, + }, + } + + sched := NewServiceScheduler(state, spec) + + // Schedule 4 containers - first 2 per machine, then m1 runs out + for i := 0; i < 4; i++ { + m, err := sched.ScheduleContainer() + require.NoError(t, err) + t.Logf("Scheduled container %d on %s", i+1, m.Info.Id) + } + + // m1 should have 2 containers, m2 should have 2 containers + // (spread ranking keeps them balanced until m1 runs out) + assert.Equal(t, 2*core, state.Machines[0].ScheduledCPU) + assert.Equal(t, 2*core, state.Machines[1].ScheduledCPU) + + // Next 2 containers should go to m2 only (m1 is exhausted) + for i := 0; i < 2; i++ { + m, err := sched.ScheduleContainer() + require.NoError(t, err) + assert.Equal(t, "m2", m.Info.Id) + } + + // m2 should now be at capacity + _, err := sched.ScheduleContainer() + assert.Error(t, err) + }) +} + +func TestServiceScheduler_UnscheduleContainer(t *testing.T) { + t.Parallel() + + const ( + core = int64(1e9) + gb = int64(1024 * 1024 * 1024) + ) + + t.Run("decrements scheduled resources", func(t *testing.T) { + state := &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + }, + } + spec := api.ServiceSpec{ + Container: api.ContainerSpec{ + Resources: api.ContainerResources{ + CPUReservation: 1 * core, + MemoryReservation: 512 * 1024 * 1024, + }, + }, + } + + sched := NewServiceScheduler(state, spec) + m, err := sched.ScheduleContainer() + require.NoError(t, err) + + assert.Equal(t, 1*core, m.ScheduledCPU) + assert.Equal(t, int64(512*1024*1024), m.ScheduledMemory) + assert.Equal(t, 1, m.ScheduledContainers) + + sched.UnscheduleContainer(m) + + assert.Equal(t, int64(0), m.ScheduledCPU) + assert.Equal(t, int64(0), m.ScheduledMemory) + assert.Equal(t, 0, m.ScheduledContainers) + }) + + t.Run("scheduled containers minimum is 0", func(t *testing.T) { + state := &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + }, + } + spec := api.ServiceSpec{} + + sched := NewServiceScheduler(state, spec) + m, err := sched.ScheduleContainer() + require.NoError(t, err) + + // Unschedule twice - should not go negative + sched.UnscheduleContainer(m) + sched.UnscheduleContainer(m) + + assert.Equal(t, 0, m.ScheduledContainers) + }) + + t.Run("no-op if heap not initialized", func(t *testing.T) { + state := &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + }, + } + spec := api.ServiceSpec{ + Container: api.ContainerSpec{ + Resources: api.ContainerResources{CPUReservation: 1 * core}, + }, + } + + sched := NewServiceScheduler(state, spec) + + // Call UnscheduleContainer before any ScheduleContainer call + // Should not panic + sched.UnscheduleContainer(state.Machines[0]) + }) +} + +func TestServiceScheduler_CustomRanker(t *testing.T) { + t.Parallel() + + const ( + core = int64(1e9) + gb = int64(1024 * 1024 * 1024) + ) + + // Custom ranker that prefers machines with more memory + memoryRanker := MachineRankerFunc(func(a, b *Machine) bool { + return a.AvailableMemory() > b.AvailableMemory() + }) + + state := &ClusterState{ + Machines: []*Machine{ + {Info: &pb.MachineInfo{Id: "m1", TotalCpuNanos: 4 * core, TotalMemoryBytes: 4 * gb}}, + {Info: &pb.MachineInfo{Id: "m2", TotalCpuNanos: 4 * core, TotalMemoryBytes: 8 * gb}}, + {Info: &pb.MachineInfo{Id: "m3", TotalCpuNanos: 4 * core, TotalMemoryBytes: 16 * gb}}, + }, + } + spec := api.ServiceSpec{} + + sched := NewServiceSchedulerWithRanker(state, spec, memoryRanker) + m, err := sched.ScheduleContainer() + + require.NoError(t, err) + assert.Equal(t, "m3", m.Info.Id) // Should pick machine with most memory +} diff --git a/pkg/client/deploy/scheduler/state.go b/pkg/client/deploy/scheduler/state.go index 17eec0de..1de1de2e 100644 --- a/pkg/client/deploy/scheduler/state.go +++ b/pkg/client/deploy/scheduler/state.go @@ -18,6 +18,32 @@ type Machine struct { Info *pb.MachineInfo Volumes []volume.Volume ScheduledVolumes []api.VolumeSpec + // ExistingContainers is the number of containers already running on this machine (for ranking). + ExistingContainers int + // ScheduledCPU tracks CPU nanocores reserved by containers scheduled during this planning session. + ScheduledCPU int64 + // ScheduledMemory tracks memory bytes reserved by containers scheduled during this planning session. + ScheduledMemory int64 + // ScheduledContainers tracks the number of containers scheduled on this machine during this planning session. + ScheduledContainers int +} + +// AvailableCPU returns the available CPU nanocores on the machine after accounting for +// both running containers and containers scheduled during this planning session. +func (m *Machine) AvailableCPU() int64 { + return m.Info.TotalCpuNanos - m.Info.ReservedCpuNanos - m.ScheduledCPU +} + +// AvailableMemory returns the available memory bytes on the machine after accounting for +// both running containers and containers scheduled during this planning session. +func (m *Machine) AvailableMemory() int64 { + return m.Info.TotalMemoryBytes - m.Info.ReservedMemoryBytes - m.ScheduledMemory +} + +// ReserveResources reserves the given CPU and memory for a container scheduled on this machine. +func (m *Machine) ReserveResources(cpu, memory int64) { + m.ScheduledCPU += cpu + m.ScheduledMemory += memory } type Client interface { diff --git a/pkg/client/deploy/scheduler/state_test.go b/pkg/client/deploy/scheduler/state_test.go new file mode 100644 index 00000000..02e42e52 --- /dev/null +++ b/pkg/client/deploy/scheduler/state_test.go @@ -0,0 +1,305 @@ +package scheduler + +import ( + "testing" + + "github.com/psviderski/uncloud/internal/machine/api/pb" + "github.com/stretchr/testify/assert" +) + +func TestMachine_AvailableCPU(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + machine *Machine + wantCPU int64 + }{ + { + name: "fresh machine with no reservations", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4e9, // 4 cores + ReservedCpuNanos: 0, + }, + }, + wantCPU: 4e9, + }, + { + name: "machine with existing reserved resources", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4e9, + ReservedCpuNanos: 1e9, // 1 core reserved + }, + }, + wantCPU: 3e9, + }, + { + name: "machine with scheduled resources", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4e9, + ReservedCpuNanos: 0, + }, + ScheduledCPU: 2e9, // 2 cores scheduled + }, + wantCPU: 2e9, + }, + { + name: "machine with both reserved and scheduled resources", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4e9, + ReservedCpuNanos: 1e9, // 1 core reserved + }, + ScheduledCPU: 1e9, // 1 core scheduled + }, + wantCPU: 2e9, + }, + { + name: "fully utilized machine", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4e9, + ReservedCpuNanos: 2e9, + }, + ScheduledCPU: 2e9, + }, + wantCPU: 0, + }, + { + name: "over-committed machine returns negative", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalCpuNanos: 4e9, + ReservedCpuNanos: 3e9, + }, + ScheduledCPU: 2e9, + }, + wantCPU: -1e9, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.machine.AvailableCPU() + assert.Equal(t, tt.wantCPU, result) + }) + } +} + +func TestMachine_AvailableMemory(t *testing.T) { + t.Parallel() + + const gb = 1024 * 1024 * 1024 + + tests := []struct { + name string + machine *Machine + wantMemory int64 + }{ + { + name: "fresh machine with no reservations", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalMemoryBytes: 8 * gb, + ReservedMemoryBytes: 0, + }, + }, + wantMemory: 8 * gb, + }, + { + name: "machine with existing reserved resources", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalMemoryBytes: 8 * gb, + ReservedMemoryBytes: 2 * gb, + }, + }, + wantMemory: 6 * gb, + }, + { + name: "machine with scheduled resources", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalMemoryBytes: 8 * gb, + ReservedMemoryBytes: 0, + }, + ScheduledMemory: 3 * gb, + }, + wantMemory: 5 * gb, + }, + { + name: "machine with both reserved and scheduled resources", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalMemoryBytes: 8 * gb, + ReservedMemoryBytes: 2 * gb, + }, + ScheduledMemory: 2 * gb, + }, + wantMemory: 4 * gb, + }, + { + name: "fully utilized machine", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalMemoryBytes: 8 * gb, + ReservedMemoryBytes: 4 * gb, + }, + ScheduledMemory: 4 * gb, + }, + wantMemory: 0, + }, + { + name: "over-committed machine returns negative", + machine: &Machine{ + Info: &pb.MachineInfo{ + TotalMemoryBytes: 8 * gb, + ReservedMemoryBytes: 6 * gb, + }, + ScheduledMemory: 4 * gb, + }, + wantMemory: -2 * gb, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.machine.AvailableMemory() + assert.Equal(t, tt.wantMemory, result) + }) + } +} + +func TestMachine_ReserveResources(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + initialCPU int64 + initialMemory int64 + reserveCPU int64 + reserveMemory int64 + wantScheduledCPU int64 + wantScheduledMem int64 + }{ + { + name: "reserve CPU only", + initialCPU: 0, + initialMemory: 0, + reserveCPU: 1e9, + reserveMemory: 0, + wantScheduledCPU: 1e9, + wantScheduledMem: 0, + }, + { + name: "reserve memory only", + initialCPU: 0, + initialMemory: 0, + reserveCPU: 0, + reserveMemory: 512 * 1024 * 1024, + wantScheduledCPU: 0, + wantScheduledMem: 512 * 1024 * 1024, + }, + { + name: "reserve both CPU and memory", + initialCPU: 0, + initialMemory: 0, + reserveCPU: 2e9, + reserveMemory: 1024 * 1024 * 1024, + wantScheduledCPU: 2e9, + wantScheduledMem: 1024 * 1024 * 1024, + }, + { + name: "accumulate reservations", + initialCPU: 1e9, + initialMemory: 512 * 1024 * 1024, + reserveCPU: 1e9, + reserveMemory: 512 * 1024 * 1024, + wantScheduledCPU: 2e9, + wantScheduledMem: 1024 * 1024 * 1024, + }, + { + name: "zero reservation has no effect", + initialCPU: 1e9, + initialMemory: 512 * 1024 * 1024, + reserveCPU: 0, + reserveMemory: 0, + wantScheduledCPU: 1e9, + wantScheduledMem: 512 * 1024 * 1024, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + machine := &Machine{ + Info: &pb.MachineInfo{}, + ScheduledCPU: tt.initialCPU, + ScheduledMemory: tt.initialMemory, + } + + machine.ReserveResources(tt.reserveCPU, tt.reserveMemory) + + assert.Equal(t, tt.wantScheduledCPU, machine.ScheduledCPU) + assert.Equal(t, tt.wantScheduledMem, machine.ScheduledMemory) + }) + } +} + +func TestClusterState_Machine(t *testing.T) { + t.Parallel() + + state := &ClusterState{ + Machines: []*Machine{ + { + Info: &pb.MachineInfo{ + Id: "machine-1", + Name: "node1", + }, + }, + { + Info: &pb.MachineInfo{ + Id: "machine-2", + Name: "node2", + }, + }, + }, + } + + tests := []struct { + name string + nameOrID string + wantFound bool + wantID string + }{ + { + name: "find by ID", + nameOrID: "machine-1", + wantFound: true, + wantID: "machine-1", + }, + { + name: "find by name", + nameOrID: "node2", + wantFound: true, + wantID: "machine-2", + }, + { + name: "not found", + nameOrID: "nonexistent", + wantFound: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + machine, found := state.Machine(tt.nameOrID) + assert.Equal(t, tt.wantFound, found) + if tt.wantFound { + assert.Equal(t, tt.wantID, machine.Info.Id) + } + }) + } +} diff --git a/pkg/client/deploy/strategy.go b/pkg/client/deploy/strategy.go index 09cd07aa..49f99a30 100644 --- a/pkg/client/deploy/strategy.go +++ b/pkg/client/deploy/strategy.go @@ -2,10 +2,8 @@ package deploy import ( "fmt" - "math/rand/v2" "slices" - "github.com/psviderski/uncloud/internal/machine/api/pb" "github.com/psviderski/uncloud/internal/secret" "github.com/psviderski/uncloud/pkg/api" "github.com/psviderski/uncloud/pkg/client/deploy/scheduler" @@ -65,21 +63,6 @@ func (s *RollingStrategy) planReplicated(svc *api.Service, spec api.ServiceSpec) } sched := scheduler.NewServiceScheduler(s.state, spec) - // TODO: return a detailed report on required constraints and which ones are satisfied? - availableMachines, err := sched.EligibleMachines() - if err != nil { - return plan, err - } - - var matchedMachines []*pb.MachineInfo - for _, m := range availableMachines { - matchedMachines = append(matchedMachines, m.Info) - } - - // Randomise the order of machines to avoid always deploying to the same machines first. - rand.Shuffle(len(matchedMachines), func(i, j int) { - matchedMachines[i], matchedMachines[j] = matchedMachines[j], matchedMachines[i] - }) // Organise existing containers by machine. containersOnMachine := make(map[string][]api.ServiceContainer) @@ -119,44 +102,40 @@ func (s *RollingStrategy) planReplicated(svc *api.Service, spec api.ServiceSpec) for _, c := range svc.Containers { containersOnMachine[c.MachineID] = append(containersOnMachine[c.MachineID], c.Container) } + } - // Sort machines such that machines with the most up-to-date containers are first, followed by machines with - // existing containers, and finally machines without containers. - slices.SortFunc(matchedMachines, func(m1, m2 *pb.MachineInfo) int { - if upToDateContainersOnMachine[m1.Id] > 0 && upToDateContainersOnMachine[m2.Id] > 0 { - return upToDateContainersOnMachine[m2.Id] - upToDateContainersOnMachine[m1.Id] - } - if upToDateContainersOnMachine[m1.Id] > 0 { - return -1 - } - if upToDateContainersOnMachine[m2.Id] > 0 { - return 1 - } - return len(containersOnMachine[m2.Id]) - len(containersOnMachine[m1.Id]) - }) + // Set existing container counts on machines for the scheduler's spread ranking. + for _, m := range s.state.Machines { + m.ExistingContainers = len(containersOnMachine[m.Info.Id]) } - // Spread the containers across the available machines evenly using a simple round-robin approach, starting with - // machines that already have containers and prioritising machines with containers that match the desired spec. + // Schedule containers across eligible machines using the heap-based scheduler. for i := 0; i < int(spec.Replicas); i++ { - m := matchedMachines[i%len(matchedMachines)] - containers := containersOnMachine[m.Id] + // Get the best eligible machine for this container (accounts for resources and spreading). + m, err := sched.ScheduleContainer() + if err != nil { + return plan, fmt.Errorf("cannot schedule replica %d of service '%s': %w", i+1, spec.Name, err) + } + + containers := containersOnMachine[m.Info.Id] if len(containers) == 0 { - // No more existing containers on this machine, create a new one. + // No existing containers on this machine, create a new one. plan.Operations = append(plan.Operations, &RunContainerOperation{ ServiceID: plan.ServiceID, Spec: spec, - MachineID: m.Id, + MachineID: m.Info.Id, }) continue } ctr := containers[0] - containersOnMachine[m.Id] = containers[1:] + containersOnMachine[m.Info.Id] = containers[1:] if status, ok := containerSpecStatuses[ctr.ID]; ok { // Contains statuses for only running containers. if status == ContainerUpToDate { + sched.UnscheduleContainer(m) + // Container is already up-to-date, no changes needed. continue } // TODO: handle ContainerNeedsUpdate when update of mutable fields on a container is supported. @@ -167,7 +146,7 @@ func (s *RollingStrategy) planReplicated(svc *api.Service, spec api.ServiceSpec) plan.Operations = append(plan.Operations, &StopContainerOperation{ ServiceID: plan.ServiceID, ContainerID: ctr.ID, - MachineID: m.Id, + MachineID: m.Info.Id, }) } } @@ -176,12 +155,12 @@ func (s *RollingStrategy) planReplicated(svc *api.Service, spec api.ServiceSpec) plan.Operations = append(plan.Operations, &RunContainerOperation{ ServiceID: plan.ServiceID, Spec: spec, - MachineID: m.Id, + MachineID: m.Info.Id, }) // Remove the old container. plan.Operations = append(plan.Operations, &RemoveContainerOperation{ - MachineID: m.Id, + MachineID: m.Info.Id, Container: ctr, }) } @@ -226,6 +205,20 @@ func (s *RollingStrategy) planGlobal(svc *api.Service, spec api.ServiceSpec) (Pl return plan, err } + // Global mode requires all target machines to satisfy the constraints. + // If placement constraints specify machines, use that count; otherwise use all cluster machines. + targetMachineCount := len(s.state.Machines) + if len(spec.Placement.Machines) > 0 { + targetMachineCount = len(spec.Placement.Machines) + } + + if len(availableMachines) != targetMachineCount { + return plan, fmt.Errorf( + "global service '%s' requires all machines to satisfy constraints, but only %d of %d machines are eligible", + spec.Name, len(availableMachines), targetMachineCount, + ) + } + for _, m := range availableMachines { containers := containersOnMachine[m.Info.Id] ops, err := reconcileGlobalContainer(containers, spec, plan.ServiceID, m.Info.Id, s.ForceRecreate) diff --git a/pkg/client/deploy/strategy_test.go b/pkg/client/deploy/strategy_test.go new file mode 100644 index 00000000..57843063 --- /dev/null +++ b/pkg/client/deploy/strategy_test.go @@ -0,0 +1,627 @@ +package deploy + +import ( + "testing" + + "github.com/docker/docker/api/types/container" + "github.com/psviderski/uncloud/internal/machine/api/pb" + "github.com/psviderski/uncloud/pkg/api" + "github.com/psviderski/uncloud/pkg/client/deploy/scheduler" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + core = int64(1e9) + gb = int64(1024 * 1024 * 1024) +) + +// newTestClusterState creates a ClusterState with the given machines for testing. +func newTestClusterState(machines ...*scheduler.Machine) *scheduler.ClusterState { + return &scheduler.ClusterState{Machines: machines} +} + +// newTestMachine creates a Machine with the given parameters for testing. +func newTestMachine(id, name string, cpuCores, memoryGB int64) *scheduler.Machine { + return &scheduler.Machine{ + Info: &pb.MachineInfo{ + Id: id, + Name: name, + TotalCpuNanos: cpuCores * core, + TotalMemoryBytes: memoryGB * gb, + }, + } +} + +// countOperationsByType counts operations by their type. +func countOperationsByType(ops []Operation) map[string]int { + counts := make(map[string]int) + for _, op := range ops { + switch op.(type) { + case *RunContainerOperation: + counts["run"]++ + case *StopContainerOperation: + counts["stop"]++ + case *RemoveContainerOperation: + counts["remove"]++ + } + } + return counts +} + +// getMachineIDsFromRunOps extracts machine IDs from RunContainerOperations. +func getMachineIDsFromRunOps(ops []Operation) []string { + var machineIDs []string + for _, op := range ops { + if runOp, ok := op.(*RunContainerOperation); ok { + machineIDs = append(machineIDs, runOp.MachineID) + } + } + return machineIDs +} + +func TestRollingStrategy_planReplicated_WithResources(t *testing.T) { + t.Parallel() + + t.Run("schedule replicas across machines with sufficient resources", func(t *testing.T) { + state := newTestClusterState( + newTestMachine("m1", "node1", 4, 8), + newTestMachine("m2", "node2", 4, 8), + ) + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeReplicated, + Replicas: 4, + Container: api.ContainerSpec{ + Image: "nginx:latest", + Resources: api.ContainerResources{ + CPUReservation: 1 * core, + MemoryReservation: 1 * gb, + }, + }, + } + + strategy := &RollingStrategy{} + plan, err := strategy.Plan(state, nil, spec) + + require.NoError(t, err) + assert.NotEmpty(t, plan.ServiceID) + assert.Equal(t, "test-service", plan.ServiceName) + + // Should have 4 RunContainerOperations + counts := countOperationsByType(plan.Operations) + assert.Equal(t, 4, counts["run"]) + + // Should spread across machines + machineIDs := getMachineIDsFromRunOps(plan.Operations) + m1Count := 0 + m2Count := 0 + for _, mid := range machineIDs { + if mid == "m1" { + m1Count++ + } else if mid == "m2" { + m2Count++ + } + } + assert.Equal(t, 2, m1Count, "Should schedule 2 containers on m1") + assert.Equal(t, 2, m2Count, "Should schedule 2 containers on m2") + }) + + t.Run("error when replica count exceeds cluster capacity", func(t *testing.T) { + state := newTestClusterState( + newTestMachine("m1", "node1", 2, 8), // Can only fit 2 containers + newTestMachine("m2", "node2", 2, 8), // Can only fit 2 containers + ) + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeReplicated, + Replicas: 5, // Need 5 but cluster can only handle 4 + Container: api.ContainerSpec{ + Image: "nginx:latest", + Resources: api.ContainerResources{ + CPUReservation: 1 * core, + }, + }, + } + + strategy := &RollingStrategy{} + _, err := strategy.Plan(state, nil, spec) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot schedule replica") + }) + + t.Run("respects resource constraints during scheduling", func(t *testing.T) { + state := newTestClusterState( + newTestMachine("m1", "node1", 2, 8), // Small machine + newTestMachine("m2", "node2", 8, 16), // Large machine + ) + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeReplicated, + Replicas: 4, + Container: api.ContainerSpec{ + Image: "nginx:latest", + Resources: api.ContainerResources{ + CPUReservation: 2 * core, // Requires 2 cores per container + }, + }, + } + + strategy := &RollingStrategy{} + plan, err := strategy.Plan(state, nil, spec) + + require.NoError(t, err) + + // m1 can only fit 1 container (2 cores total, 2 cores per container) + // m2 can fit 3 containers (8 cores total, 2 cores per container) + // We need 4 containers, so: m1 gets 1, m2 gets 3 + machineIDs := getMachineIDsFromRunOps(plan.Operations) + m1Count := 0 + m2Count := 0 + for _, mid := range machineIDs { + if mid == "m1" { + m1Count++ + } else if mid == "m2" { + m2Count++ + } + } + assert.Equal(t, 1, m1Count, "m1 should have 1 container (limited by resources)") + assert.Equal(t, 3, m2Count, "m2 should have 3 containers") + }) + + t.Run("up-to-date containers dont consume additional scheduled resources", func(t *testing.T) { + // Create a machine with limited capacity + state := newTestClusterState( + newTestMachine("m1", "node1", 2, 8), + ) + // Simulate that this machine already has reserved resources for 1 running container + state.Machines[0].Info.ReservedCpuNanos = 1 * core + + existingService := &api.Service{ + ID: "svc-123", + Name: "test-service", + Mode: api.ServiceModeReplicated, + Containers: []api.MachineServiceContainer{ + { + MachineID: "m1", + Container: api.ServiceContainer{ + Container: api.Container{ + InspectResponse: container.InspectResponse{ + ContainerJSONBase: &container.ContainerJSONBase{ + ID: "container-1", + State: &container.State{Running: true}, + }, + }, + }, + ServiceSpec: api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeReplicated, + Replicas: 1, + Container: api.ContainerSpec{ + Image: "nginx:latest", + Resources: api.ContainerResources{ + CPUReservation: 1 * core, + }, + }, + }, + }, + }, + }, + } + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeReplicated, + Replicas: 2, // Scale from 1 to 2 + Container: api.ContainerSpec{ + Image: "nginx:latest", + Resources: api.ContainerResources{ + CPUReservation: 1 * core, + }, + }, + } + + strategy := &RollingStrategy{} + plan, err := strategy.Plan(state, existingService, spec) + + require.NoError(t, err) + + // Should only run 1 new container (existing is up-to-date) + counts := countOperationsByType(plan.Operations) + assert.Equal(t, 1, counts["run"], "Should only run 1 new container") + assert.Equal(t, 0, counts["remove"], "Should not remove any containers") + }) + + t.Run("spread behavior distributes containers across machines", func(t *testing.T) { + state := newTestClusterState( + newTestMachine("m1", "node1", 8, 16), + newTestMachine("m2", "node2", 8, 16), + newTestMachine("m3", "node3", 8, 16), + ) + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeReplicated, + Replicas: 6, + Container: api.ContainerSpec{ + Image: "nginx:latest", + }, + } + + strategy := &RollingStrategy{} + plan, err := strategy.Plan(state, nil, spec) + + require.NoError(t, err) + + machineIDs := getMachineIDsFromRunOps(plan.Operations) + machineCounts := make(map[string]int) + for _, mid := range machineIDs { + machineCounts[mid]++ + } + + // Each machine should get exactly 2 containers (even spread) + assert.Equal(t, 2, machineCounts["m1"], "m1 should have 2 containers") + assert.Equal(t, 2, machineCounts["m2"], "m2 should have 2 containers") + assert.Equal(t, 2, machineCounts["m3"], "m3 should have 2 containers") + }) + + t.Run("spread ranking prefers machines with fewer containers", func(t *testing.T) { + state := newTestClusterState( + newTestMachine("m1", "node1", 8, 16), + newTestMachine("m2", "node2", 8, 16), + ) + + // Existing service with 2 containers on m1 + existingService := &api.Service{ + ID: "svc-123", + Name: "test-service", + Mode: api.ServiceModeReplicated, + Containers: []api.MachineServiceContainer{ + { + MachineID: "m1", + Container: api.ServiceContainer{ + Container: api.Container{ + InspectResponse: container.InspectResponse{ + ContainerJSONBase: &container.ContainerJSONBase{ + ID: "c1", + State: &container.State{Running: true}, + }, + Config: &container.Config{ + Labels: map[string]string{}, + }, + }, + }, + ServiceSpec: api.ServiceSpec{ + Name: "test-service", + Container: api.ContainerSpec{Image: "nginx:old"}, + }, + }, + }, + { + MachineID: "m1", + Container: api.ServiceContainer{ + Container: api.Container{ + InspectResponse: container.InspectResponse{ + ContainerJSONBase: &container.ContainerJSONBase{ + ID: "c2", + State: &container.State{Running: true}, + }, + Config: &container.Config{ + Labels: map[string]string{}, + }, + }, + }, + ServiceSpec: api.ServiceSpec{ + Name: "test-service", + Container: api.ContainerSpec{Image: "nginx:old"}, + }, + }, + }, + }, + } + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeReplicated, + Replicas: 4, + Container: api.ContainerSpec{ + Image: "nginx:latest", // Image changed - needs recreate + Resources: api.ContainerResources{CPUReservation: 1 * core}, // trigger spread ranker that considers existing load + }, + } + + strategy := &RollingStrategy{} + plan, err := strategy.Plan(state, existingService, spec) + + require.NoError(t, err) + + // Spread ranking prefers machines with fewer containers: + // - m1 starts with 2 existing containers, m2 starts with 0 + // - Scheduler prefers m2 initially (0 < 2), then balances as containers are scheduled + // - Result: m2 gets more new containers since it started empty + machineIDs := getMachineIDsFromRunOps(plan.Operations) + machineCounts := make(map[string]int) + for _, mid := range machineIDs { + machineCounts[mid]++ + } + + // m2 should get more new containers since it started with 0 + // m1's existing containers get replaced/removed as part of the plan + assert.Greater(t, machineCounts["m2"], machineCounts["m1"], + "m2 should get more new containers since it was initially empty") + + // Total should still be 4 replicas + assert.Equal(t, 4, machineCounts["m1"]+machineCounts["m2"], "Total should be 4 replicas") + }) + + t.Run("port conflicts stop existing container before running new one", func(t *testing.T) { + state := newTestClusterState( + newTestMachine("m1", "node1", 2, 4), + ) + + existingService := &api.Service{ + ID: "svc-123", + Name: "test-service", + Mode: api.ServiceModeReplicated, + Containers: []api.MachineServiceContainer{ + { + MachineID: "m1", + Container: api.ServiceContainer{ + Container: api.Container{InspectResponse: container.InspectResponse{ + ContainerJSONBase: &container.ContainerJSONBase{ + ID: "c1", + State: &container.State{Running: true}, + }, + Config: &container.Config{Labels: map[string]string{ + api.LabelServicePorts: "8080:80/tcp@host", + }}, + }}, + ServiceSpec: api.ServiceSpec{ // old spec + Name: "test-service", + Mode: api.ServiceModeReplicated, + Container: api.ContainerSpec{Image: "nginx:old"}, + }, + }, + }, + }, + } + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeReplicated, + Replicas: 1, + Container: api.ContainerSpec{ + Image: "nginx:new", + }, + Ports: []api.PortSpec{{ + Mode: api.PortModeHost, + PublishedPort: 8080, + ContainerPort: 80, + Protocol: api.ProtocolTCP, + }}, + } + + strategy := &RollingStrategy{} + plan, err := strategy.Plan(state, existingService, spec) + + require.NoError(t, err) + counts := countOperationsByType(plan.Operations) + assert.Equal(t, 1, counts["stop"], "conflicting container should be stopped") + assert.Equal(t, 1, counts["run"], "new container should be run") + assert.Equal(t, 1, counts["remove"], "old container should be removed") + }) + + t.Run("force recreate replaces even up-to-date containers", func(t *testing.T) { + state := newTestClusterState( + newTestMachine("m1", "node1", 2, 4), + ) + + existingService := &api.Service{ + ID: "svc-123", + Name: "test-service", + Mode: api.ServiceModeReplicated, + Containers: []api.MachineServiceContainer{ + { + MachineID: "m1", + Container: api.ServiceContainer{ + Container: api.Container{InspectResponse: container.InspectResponse{ + ContainerJSONBase: &container.ContainerJSONBase{ + ID: "c1", + State: &container.State{Running: true}, + }, + Config: &container.Config{Labels: map[string]string{}}, + }}, + ServiceSpec: api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeReplicated, + Container: api.ContainerSpec{Image: "nginx:latest"}, + }, + }, + }, + }, + } + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeReplicated, + Replicas: 1, + Container: api.ContainerSpec{Image: "nginx:latest"}, + } + + strategy := &RollingStrategy{ForceRecreate: true} + plan, err := strategy.Plan(state, existingService, spec) + + require.NoError(t, err) + counts := countOperationsByType(plan.Operations) + assert.Equal(t, 1, counts["run"], "up-to-date container should still be recreated") + assert.Equal(t, 1, counts["remove"], "old container should be removed") + }) +} + +func TestRollingStrategy_planGlobal_WithResources(t *testing.T) { + t.Parallel() + + t.Run("all machines must satisfy constraints", func(t *testing.T) { + state := newTestClusterState( + newTestMachine("m1", "node1", 4, 8), + newTestMachine("m2", "node2", 4, 8), + newTestMachine("m3", "node3", 1, 8), // Not enough CPU + ) + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeGlobal, + Container: api.ContainerSpec{ + Image: "nginx:latest", + Resources: api.ContainerResources{ + CPUReservation: 2 * core, + }, + }, + } + + strategy := &RollingStrategy{} + _, err := strategy.Plan(state, nil, spec) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "global service") + assert.Contains(t, err.Error(), "2 of 3 machines are eligible") + }) + + t.Run("global service with resource requirements on sufficient cluster", func(t *testing.T) { + state := newTestClusterState( + newTestMachine("m1", "node1", 4, 8), + newTestMachine("m2", "node2", 4, 8), + newTestMachine("m3", "node3", 4, 8), + ) + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeGlobal, + Container: api.ContainerSpec{ + Image: "nginx:latest", + Resources: api.ContainerResources{ + CPUReservation: 2 * core, + MemoryReservation: 4 * gb, + }, + }, + } + + strategy := &RollingStrategy{} + plan, err := strategy.Plan(state, nil, spec) + + require.NoError(t, err) + + // Should have 3 RunContainerOperations (one per machine) + counts := countOperationsByType(plan.Operations) + assert.Equal(t, 3, counts["run"]) + }) + + t.Run("error when any machine lacks required resources", func(t *testing.T) { + state := newTestClusterState( + newTestMachine("m1", "node1", 4, 8), + newTestMachine("m2", "node2", 4, 2), // Not enough memory + ) + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeGlobal, + Container: api.ContainerSpec{ + Image: "nginx:latest", + Resources: api.ContainerResources{ + MemoryReservation: 4 * gb, + }, + }, + } + + strategy := &RollingStrategy{} + _, err := strategy.Plan(state, nil, spec) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "1 of 2 machines are eligible") + }) + + t.Run("global service without resource constraints succeeds", func(t *testing.T) { + state := newTestClusterState( + newTestMachine("m1", "node1", 1, 1), + newTestMachine("m2", "node2", 1, 1), + ) + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeGlobal, + Container: api.ContainerSpec{ + Image: "nginx:latest", + // No resource constraints + }, + } + + strategy := &RollingStrategy{} + plan, err := strategy.Plan(state, nil, spec) + + require.NoError(t, err) + counts := countOperationsByType(plan.Operations) + assert.Equal(t, 2, counts["run"]) + }) +} + +func TestRollingStrategy_Plan_RequiresClusterState(t *testing.T) { + t.Parallel() + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeReplicated, + Container: api.ContainerSpec{ + Image: "nginx:latest", + }, + } + + strategy := &RollingStrategy{} + _, err := strategy.Plan(nil, nil, spec) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "cluster state must be provided") +} + +func TestRollingStrategy_planGlobal_WithPlacementConstraints(t *testing.T) { + t.Parallel() + + t.Run("global service with x-machine deploys only to specified machines", func(t *testing.T) { + // Cluster has 3 machines, but we only want to deploy to 2 of them using x-machine + state := newTestClusterState( + newTestMachine("m1", "node1", 4, 8), + newTestMachine("m2", "node2", 4, 8), + newTestMachine("m3", "node3", 4, 8), + ) + + spec := api.ServiceSpec{ + Name: "test-service", + Mode: api.ServiceModeGlobal, + Container: api.ContainerSpec{ + Image: "nginx:latest", + }, + Placement: api.Placement{ + Machines: []string{"node1", "node2"}, // x-machine constraint: only deploy to node1 and node2 + }, + } + + strategy := &RollingStrategy{} + plan, err := strategy.Plan(state, nil, spec) + + // This should succeed - global with x-machine should deploy to all machines in the constraint list + require.NoError(t, err, "global service with x-machine should succeed when all specified machines are eligible") + + // Should have 2 RunContainerOperations (one per machine in the constraint) + counts := countOperationsByType(plan.Operations) + assert.Equal(t, 2, counts["run"], "should run containers on both specified machines") + + // Verify containers are scheduled on the correct machines + machineIDs := getMachineIDsFromRunOps(plan.Operations) + assert.Contains(t, machineIDs, "m1", "should deploy to node1") + assert.Contains(t, machineIDs, "m2", "should deploy to node2") + assert.NotContains(t, machineIDs, "m3", "should not deploy to node3") + }) +} diff --git a/website/docs/3-concepts/5-scheduling.md b/website/docs/3-concepts/5-scheduling.md new file mode 100644 index 00000000..4908a525 --- /dev/null +++ b/website/docs/3-concepts/5-scheduling.md @@ -0,0 +1,65 @@ +# Scheduling + +How Uncloud decides where to run your containers, and what to do when resources or constraints get in the way. + +## Overview + +- One container is placed at a time on the "best" eligible machine. +- Eligibility comes from constraints (machines, volumes, resources). +- Ranking depends on whether you request CPU/memory reservations: + - **With reservations:** prefer machines with fewer total containers (existing + already scheduled in this plan). + - **Without reservations:** round-robin across eligible machines (creating a HA setup), ignoring existing containers to keep the spread even. + +## Eligibility checks + +Before each placement Uncloud filters machines: + +- **`x-machines`:** only listed machines are allowed. +- **Volumes:** required Docker volumes must exist on, or be scheduled for, the machine. +- **Resources:** if CPU/memory reservations are set, the machine must have enough available capacity. + +If a machine fails any constraint mid-plan (for example, runs out of CPU), it drops out for the remaining placements. + +## Resource reservations + +When you set reservations, Uncloud only places a container on machines with enough headroom: + +``` +available = total - reserved_by_running - reserved_by_containers_already_scheduled_in_this_plan +``` + +Example: + +```yaml +services: + api: + image: myapp:latest + deploy: + replicas: 3 + resources: + reservations: + cpus: '0.5' + memory: 512M +``` + +If a machine runs out partway through scheduling, remaining replicas are placed on other eligible machines. + +## Service modes + +- **Replicated (`deploy.mode: replicated`):** run `replicas` containers across eligible machines. If some machines are ineligible, others are still used. +- **Global (`deploy.mode: global`):** run exactly one container on every eligible machine. If any machine is ineligible, the deployment fails. + +## Volumes + +Services that mount named Docker volumes can only run on machines where those volumes exist or are scheduled to be created. Volume constraints are applied together with other placement rules. + +## Port conflicts and replacements + +- If a running container conflicts with requested host ports, Uncloud stops it before starting the new one and removes it afterward. +- `force_recreate` replaces containers even when they already match the requested spec. + +## Troubleshooting + +- **"No eligible machines":** check `x-machines`, required volumes on target machines, and CPU/memory reservations versus capacity. +- **Uneven spread:** happens when some machines are ineligible (constraints or capacity) or already host more containers; otherwise the ranker evens out placements. +- **Reservations ignored?:** reservations are opt-in; set `resources.reservations` to make capacity part of eligibility and ranking. diff --git a/website/docs/4-guides/1-deployments/3-deploy-global-services.md b/website/docs/4-guides/1-deployments/3-deploy-global-services.md index d7208c29..72585b17 100644 --- a/website/docs/4-guides/1-deployments/3-deploy-global-services.md +++ b/website/docs/4-guides/1-deployments/3-deploy-global-services.md @@ -58,8 +58,8 @@ The default mode is `replicated`, where you specify the number of replicas. | Mode | Replicas | Placement | |------------------------|-----------------------------------------------|---------------------------------------------------------------------| -| `replicated` (default) | You specify with `scale` or `deploy.replicas` | Uncloud evenly spreads replicas across all machines or `x-machines` | -| `global` | Always one per machine | One replica on each machine or each `x-machines` machine | +| `replicated` (default) | You specify with `scale` or `deploy.replicas` | Spreads across eligible machines or `x-machines` (balances by load when reservations set; round-robins when not) | +| `global` | Always one per machine | One replica on each eligible machine or each `x-machines` machine; fails if any required machine is ineligible | ## See also diff --git a/website/docs/8-compose-file-reference/1-support-matrix.md b/website/docs/8-compose-file-reference/1-support-matrix.md index dd155e05..808c3d17 100644 --- a/website/docs/8-compose-file-reference/1-support-matrix.md +++ b/website/docs/8-compose-file-reference/1-support-matrix.md @@ -40,7 +40,7 @@ The following table shows the support status for main Compose features: | `mode` | ✅ Supported | Either `global` or `replicated` | | `placement` | ❌ Not supported | Use `x-machines` extension | | `replicas` | ✅ Supported | Number of container replicas | -| `resources` | ⚠️ Limited | CPU, memory limits and device reservations | +| `resources` | ✅ Supported | CPU/memory limits and reservations, device requests | | `restart_policy` | ❌ Not supported | Defaults to `unless-stopped` | | `rollback_config` | ❌ Not supported | See [#151](https://github.com/psviderski/uncloud/issues/151) | | `update_config` | ❌ Not supported | See [#151](https://github.com/psviderski/uncloud/issues/151) |