Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions charts/ack-operator-crd/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ spec:
type: boolean
auto_renew_period:
type: integer
auto_scaling_enabled:
nullable: true
type: boolean
data_disk:
items:
properties:
Expand Down Expand Up @@ -149,6 +152,12 @@ spec:
login_password:
nullable: true
type: string
max_instances:
nullable: true
type: integer
min_instances:
nullable: true
type: integer
name:
nullable: true
type: string
Expand Down Expand Up @@ -235,6 +244,12 @@ spec:
type: string
nullable: true
type: array
zoneIds:
items:
nullable: true
type: string
nullable: true
type: array
type: object
status:
properties:
Expand Down
22 changes: 21 additions & 1 deletion controller/ack-cluster-config-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,21 @@ func (h *Handler) checkAndUpdate(config *ackv1.ACKClusterConfig) (*ackv1.ACKClus
if err != nil {
return cfg, err
}
// 如果 ACK NodePool 没有节点则判断 ack 还在 updating 状态
if nodePoolsInfo == nil || len(nodePoolsInfo.Nodepools) == 0 {
if cfg.Status.Phase != ackConfigUpdatingPhase {
cfg = cfg.DeepCopy()
cfg.Status.Phase = ackConfigUpdatingPhase
cfg, err = h.ackCC.UpdateStatus(cfg)
if err != nil {
return cfg, err
}
}

logrus.Infof("waiting for cluster [%s] to update node pools: no nodepool information available yet", cfg.Name)
h.ackEnqueueAfter(cfg.Namespace, cfg.Name, 30*time.Second)
return cfg, nil
}
for _, np := range nodePoolsInfo.Nodepools {
if np == nil {
logrus.Warn("Warning update cluster: The nodepool is nil, indicating no nodepool information is available")
Expand All @@ -257,7 +272,12 @@ func (h *Handler) checkAndUpdate(config *ackv1.ACKClusterConfig) (*ackv1.ACKClus
return cfg, err
}
}
logrus.Infof("waiting for cluster [%s] to update node pool [%s]", cfg.Name, *np.NodepoolInfo.Name)
nodePoolName := "<unknown>"
if np.NodepoolInfo != nil && np.NodepoolInfo.Name != nil {
nodePoolName = *np.NodepoolInfo.Name
}

logrus.Infof("waiting for cluster [%s] to [%s] node pool [%s]", cfg.Name, status, nodePoolName)
h.ackEnqueueAfter(cfg.Namespace, cfg.Name, 30*time.Second)
return cfg, nil
}
Expand Down
7 changes: 6 additions & 1 deletion controller/upstream_rancher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package controller

import (
"fmt"
ackapi "github.com/alibabacloud-go/cs-20151215/v7/client"

ackapi "github.com/alibabacloud-go/cs-20151215/v7/client"
"github.com/alibabacloud-go/tea/tea"
"github.com/cnrancher/ack-operator/internal/ack"
ackv1 "github.com/cnrancher/ack-operator/pkg/apis/ack.pandaria.io/v1"
Expand Down Expand Up @@ -35,6 +35,11 @@ func BuildUpstreamClusterState(secretsCache wranglerv1.SecretCache, configSpec *
newSpec.KubernetesVersion = tea.StringValue(cluster.CurrentVersion)
newSpec.RegionID = tea.StringValue(cluster.RegionId)
newSpec.VpcID = tea.StringValue(cluster.VpcId)
if len(configSpec.ZoneIDs) > 0 {
newSpec.ZoneIDs = configSpec.ZoneIDs
} else {
newSpec.ZoneIDs = []string{tea.StringValue(cluster.ZoneId)}
}
newSpec.PauseClusterUpgrade = pauseClusterUpgrade
newSpec.ClusterIsUpgrading = clusterIsUpgrading
newSpec.NodePoolList, err = GetNodePoolConfigInfo(secretsCache, configSpec)
Expand Down
156 changes: 112 additions & 44 deletions internal/ack/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,14 @@ func validateCreateRequest(configSpec *ackv1.ACKClusterConfigSpec) error {
return fmt.Errorf("cluster display name is required")
} else if configSpec.RegionID == "" {
return fmt.Errorf("region id is required")
} else if configSpec.VpcID == "" && !configSpec.SnatEntry {
return fmt.Errorf("snat entry is required when vpc is auto created")
}

if len(configSpec.ZoneIDs) == 0 {
if configSpec.VpcID == "" {
return fmt.Errorf("vpcId is required if zoneIds are not provided")
}
} else if configSpec.VpcID != "" || len(configSpec.VswitchIds) != 0 {
return fmt.Errorf("zoneIds should not be used together with vpcId and vSwitchIds")
}
return nil
}

Expand All @@ -147,6 +151,7 @@ func newClusterCreateRequest(configSpec *ackv1.ACKClusterConfigSpec) *ackapi.Cre
req.SshFlags = tea.Bool(configSpec.SSHFlags)
req.Addons = ConvertAddons(configSpec)
req.VswitchIds = tea.StringSlice(configSpec.VswitchIds)
req.ZoneIds = tea.StringSlice(configSpec.ZoneIDs)
// PodVswitchIds 虽然标记了废弃,但是目前还是需要传入
req.PodVswitchIds = tea.StringSlice(configSpec.PodVswitchIds)

Expand All @@ -157,50 +162,113 @@ func newClusterCreateRequest(configSpec *ackv1.ACKClusterConfigSpec) *ackapi.Cre
}

func getInitWorkerFromDefaultNodePool(configSpec *ackv1.ACKClusterConfigSpec, req *ackapi.CreateClusterRequest) {
nodePools := make([]*ackapi.Nodepool, 0, 1)
nodePools := make([]*ackapi.Nodepool, 0, len(configSpec.NodePoolList))

for _, pool := range configSpec.NodePoolList {
if pool.Name == DefaultNodePoolName {
var dataDiskList []*ackapi.DataDisk
for _, dataDisk := range pool.DataDisk {
dataDiskList = append(dataDiskList, &ackapi.DataDisk{
Category: tea.String(dataDisk.Category),
Size: tea.Int64(dataDisk.Size),
Encrypted: tea.String(dataDisk.Encrypted),
AutoSnapshotPolicyId: tea.String(dataDisk.AutoSnapshotPolicyID),
})
}
nodePools = append(nodePools, &ackapi.Nodepool{
AutoScaling: &ackapi.NodepoolAutoScaling{
Enable: tea.Bool(false),
MaxInstances: tea.Int64(pool.InstancesNum),
MinInstances: tea.Int64(pool.InstancesNum),
Type: tea.String(pool.ScalingType),
},
NodepoolInfo: &ackapi.NodepoolNodepoolInfo{
Name: tea.String(pool.Name),
},
KubernetesConfig: &ackapi.NodepoolKubernetesConfig{
Runtime: tea.String(pool.Runtime),
RuntimeVersion: tea.String(pool.RuntimeVersion),
},
ScalingGroup: &ackapi.NodepoolScalingGroup{
AutoRenew: tea.Bool(pool.AutoRenew),
AutoRenewPeriod: tea.Int64(pool.AutoRenewPeriod),
InstanceChargeType: tea.String(pool.InstanceChargeType),
InstanceTypes: tea.StringSlice(pool.InstanceTypes),
KeyPair: tea.String(pool.KeyPair),
Period: tea.Int64(pool.Period),
PeriodUnit: tea.String(pool.PeriodUnit),
ImageType: tea.String(pool.Platform),
DataDisks: dataDiskList,
SystemDiskCategory: tea.String(pool.SystemDiskCategory),
SystemDiskSize: tea.Int64(pool.SystemDiskSize),
VswitchIds: tea.StringSlice(pool.VSwitchIds),
DesiredSize: tea.Int64(pool.InstancesNum),
},
var dataDiskList []*ackapi.DataDisk
for _, dataDisk := range pool.DataDisk {
dataDiskList = append(dataDiskList, &ackapi.DataDisk{
Category: tea.String(dataDisk.Category),
Size: tea.Int64(dataDisk.Size),
Encrypted: tea.String(dataDisk.Encrypted),
AutoSnapshotPolicyId: tea.String(dataDisk.AutoSnapshotPolicyID),
})
break
}

enable := false
minIns := pool.InstancesNum
maxIns := pool.InstancesNum

if pool.AutoScalingEnabled != nil && *pool.AutoScalingEnabled {
enable = true
if pool.MinInstances != nil {
minIns = *pool.MinInstances
}
if pool.MaxInstances != nil {
maxIns = *pool.MaxInstances
}
}

if enable && minIns > maxIns {
minIns, maxIns = maxIns, minIns
}

scalingGroup := &ackapi.NodepoolScalingGroup{
AutoRenew: tea.Bool(pool.AutoRenew),
AutoRenewPeriod: tea.Int64(pool.AutoRenewPeriod),
InstanceChargeType: tea.String(pool.InstanceChargeType),
InstanceTypes: tea.StringSlice(pool.InstanceTypes),
KeyPair: tea.String(pool.KeyPair),
Period: tea.Int64(pool.Period),
PeriodUnit: tea.String(pool.PeriodUnit),
ImageType: tea.String(pool.Platform),
DataDisks: dataDiskList,
SystemDiskCategory: tea.String(pool.SystemDiskCategory),
SystemDiskSize: tea.Int64(pool.SystemDiskSize),
VswitchIds: tea.StringSlice(pool.VSwitchIds),
}

if !enable {
scalingGroup.DesiredSize = tea.Int64(pool.InstancesNum)
}

nodePools = append(nodePools, &ackapi.Nodepool{
AutoScaling: &ackapi.NodepoolAutoScaling{
Enable: tea.Bool(enable),
MaxInstances: tea.Int64(maxIns),
MinInstances: tea.Int64(minIns),
Type: tea.String(pool.ScalingType),
},
NodepoolInfo: &ackapi.NodepoolNodepoolInfo{
Name: tea.String(pool.Name),
},
KubernetesConfig: &ackapi.NodepoolKubernetesConfig{
Runtime: tea.String(pool.Runtime),
RuntimeVersion: tea.String(pool.RuntimeVersion),
},
ScalingGroup: scalingGroup,
})
}

req.Nodepools = nodePools
}

func cleanStringSlice(in []string) []string {
out := make([]string, 0, len(in))
seen := map[string]struct{}{}

for _, v := range in {
if v == "" {
continue
}
if _, ok := seen[v]; ok {
continue
}
seen[v] = struct{}{}
out = append(out, v)
}

return out
}

func cleanTeaStringSlice(in []*string) []string {
out := make([]string, 0, len(in))
seen := map[string]struct{}{}

for _, v := range in {
if v == nil {
continue
}
s := tea.StringValue(v)
if s == "" {
continue
}
if _, ok := seen[s]; ok {
continue
}
seen[s] = struct{}{}
out = append(out, s)
}

return out
}
Loading
Loading