Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/restore/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http"
restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job"
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
"github.com/dalibo/cnpg-i-pgbackrest/internal/utils"
wal_pgbackrest "github.com/dalibo/cnpg-i-pgbackrest/internal/wal"
"google.golang.org/grpc"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -39,6 +40,9 @@ func (c *CNPGI) Start(ctx context.Context) error {
PgDataPath: c.PGDataPath,
PgWalFolderToSymlink: "/var/lib/postgresql/wal/pg_wal",
})

utils.AddHealthCheck(server)

return nil
}

Expand Down
19 changes: 17 additions & 2 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestDeployInstance(t *testing.T) {
// create a test CloudNativePG Cluster
clusterName := "cluster-demo"
p := maps.Clone(cluster.DefaultParamater)
c, err := cluster.Create(ctx, k8sClient, ns, clusterName, 1, "100M", p)
c, err := cluster.Create(ctx, k8sClient, ns, clusterName, 1, "100M", p, false)
if err != nil {
t.Fatalf("failed to create cluster: %v", err)
}
Expand All @@ -220,7 +220,7 @@ func TestDeployInstance(t *testing.T) {
t.Fatal("can't delete cluster")
}
}()
if ready, err := k8sClient.PodsIsReady(ctx, ns, clusterName+"-1", 80, 3); err != nil {
if ready, err := k8sClient.PodIsReady(ctx, ns, clusterName+"-1", 80, 3); err != nil {
t.Fatalf("error when requesting pod status, %s", err.Error())
} else if !ready {
t.Fatal("pod not ready")
Expand Down Expand Up @@ -261,4 +261,19 @@ func TestDeployInstance(t *testing.T) {
if fBackup.Timestamp.Start == 0 || fBackup == lBackup {
t.Fatal("registered backup data are invalid after second backup")
}
// delete cluster, we will recreate it from backup
if err := k8sClient.Delete(ctx, c); err != nil {
t.Fatal("can't delete cluster")
}
if _, err = k8sClient.PodIsAbsent(ctx, ns, clusterName+"-1", 10, 3); err != nil {
t.Fatal("can't ensure cluster is absent")
}
if _, err = cluster.Create(ctx, k8sClient, ns, clusterName, 1, "100M", p, true); err != nil {
t.Fatal("can't recreate cluster from backup")
}
if ready, err := k8sClient.PodIsReady(ctx, ns, clusterName+"-1", 80, 3); err != nil {
t.Fatalf("error when requesting pod status, %s", err.Error())
} else if !ready {
t.Fatal("pod not ready")
}
}
35 changes: 26 additions & 9 deletions test/e2e/internal/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ func New(
nbrInstances int,
size string,
pluginParam map[string]string,
restore bool,
) *cloudnativepgv1.Cluster {

pluginConfig := []cloudnativepgv1.PluginConfiguration{
{
Name: "pgbackrest.dalibo.com",
Parameters: pluginParam,
},
}
cluster := &cloudnativepgv1.Cluster{
TypeMeta: metav1.TypeMeta{
Kind: "Cluster",
Expand All @@ -41,19 +47,29 @@ func New(
Spec: cloudnativepgv1.ClusterSpec{
Instances: nbrInstances,
ImagePullPolicy: corev1.PullIfNotPresent,
Plugins: []cloudnativepgv1.PluginConfiguration{
{
Name: "pgbackrest.dalibo.com",
Parameters: pluginParam,
},
},
Plugins: pluginConfig,
PostgresConfiguration: cloudnativepgv1.PostgresConfiguration{
Parameters: map[string]string{},
},
StorageConfiguration: cloudnativepgv1.StorageConfiguration{
Size: size,
},
}}
},
}
if restore {
externalName := "origin"
cluster.Spec.Bootstrap = &cloudnativepgv1.BootstrapConfiguration{
Recovery: &cloudnativepgv1.BootstrapRecovery{
Source: externalName,
},
}
cluster.Spec.ExternalClusters = []cloudnativepgv1.ExternalCluster{
{
Name: externalName,
PluginConfiguration: &pluginConfig[0],
},
}
}
return cluster
}

Expand All @@ -65,8 +81,9 @@ func Create(
nbrInstances int,
size string,
pluginParam map[string]string,
recovery bool,
) (*cloudnativepgv1.Cluster, error) {
m := New(namespace, name, nbrInstances, size, pluginParam)
m := New(namespace, name, nbrInstances, size, pluginParam, recovery)
if err := k8sClient.Create(ctx, m); err != nil {
return nil, err
}
Expand Down
124 changes: 98 additions & 26 deletions test/e2e/internal/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand Down Expand Up @@ -164,41 +165,112 @@ func (cl K8sClient) DeploymentIsReady(
)
}

func (cl K8sClient) PodsIsReady(
type PodConditionFunc func(pod *corev1.Pod, err error) (done bool, errOut error)

func (cl K8sClient) waitForPod(
ctx context.Context,
podName types.NamespacedName,
maxRetry uint,
retryInterval uint,
condition PodConditionFunc,
) error {
if maxRetry == 0 {
return fmt.Errorf("maxRetry should be non-zero value")
}

pod := &corev1.Pod{}
timeout := time.Duration(maxRetry) * time.Duration(retryInterval) * time.Second
// may be replace by PollUntilContextTimeout and let the call build the adequate ctx
err := wait.PollUntilContextTimeout(
ctx,
time.Duration(retryInterval)*time.Second,
timeout,
true,
func(ctx context.Context) (bool, error) {
err := cl.client.Get(ctx, podName, pod)
return condition(pod, err)
},
)

if err != nil {
return fmt.Errorf(
"timeout waiting for pod %s in namespace %s: %w",
podName.Name,
podName.Namespace,
err,
)
}

return nil
}

func (cl K8sClient) PodIsReady(
ctx context.Context,
namespace string,
name string,
maxRetry uint,
retryInterval uint,
) (bool, error) {
waitedRessource := &corev1.Pod{}
podFqdn := types.NamespacedName{Name: name, Namespace: namespace}
if maxRetry == 0 {
return false, fmt.Errorf("maxRetry should be non-zero value")
}
for range maxRetry {
err := cl.client.Get(ctx, podFqdn, waitedRessource)
if errors.IsNotFound(err) {
time.Sleep(2 * time.Second) // Deployment not created yet, wait and retry
continue
}
if err != nil {
return false, fmt.Errorf("error to get deployment information %w", err)
}
switch waitedRessource.Status.Phase {
case corev1.PodRunning:
return true, nil
case corev1.PodFailed:
return false, fmt.Errorf("pod in failed status")
}
time.Sleep(time.Duration(retryInterval) * time.Second)
podName := types.NamespacedName{Name: name, Namespace: namespace}

err := cl.waitForPod(
ctx,
podName,
maxRetry,
retryInterval,
func(pod *corev1.Pod, err error) (bool, error) {
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return true, err
}
switch pod.Status.Phase {
case corev1.PodRunning:
return true, nil
case corev1.PodFailed:
return true, fmt.Errorf("pod in failed status")
default:
return false, nil
}
},
)
if err != nil {
return false, err
}
return false, fmt.Errorf(
"max retry %d reached, when monitoring %s on namespace %s",
return true, nil
}

func (cl K8sClient) PodIsAbsent(
ctx context.Context,
namespace string,
name string,
maxRetry uint,
retryInterval uint,
) (bool, error) {
podName := types.NamespacedName{Name: name, Namespace: namespace}

err := cl.waitForPod(
ctx,
podName,
maxRetry,
name,
namespace,
retryInterval,
func(_ *corev1.Pod, err error) (bool, error) {
if err != nil {
if errors.IsNotFound(err) {
return true, nil // pod is gone
}
return true, err
}
return false, nil // still exists
},
)

if err != nil {
return false, err
}

return true, nil
}

func (cl K8sClient) CreateSelfsignedIssuer(
Expand Down
Loading