Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 91 additions & 25 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cluster
// Postgres CustomResourceDefinition object i.e. Spilo

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -70,7 +71,7 @@ type kubeResources struct {
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately
// Pods are treated separately
}

// Cluster describes postgresql cluster
Expand All @@ -88,14 +89,19 @@ type Cluster struct {
podSubscribersMu sync.RWMutex
pgDb *sql.DB
mu sync.Mutex
ctx context.Context
cancelFunc context.CancelFunc
syncMu sync.Mutex // protects syncRunning and needsResync
syncRunning bool
needsResync bool
userSyncStrategy spec.UserSyncer
deleteOptions metav1.DeleteOptions
podEventsQueue *cache.FIFO
replicationSlots map[string]interface{}

teamsAPIClient teams.Interface
oauthTokenGetter OAuthTokenGetter
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
KubeClient k8sutil.KubernetesClient // TODO: move clients to the better place?
currentProcess Process
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
Expand All @@ -120,9 +126,12 @@ type compareLogicalBackupJobResult struct {
}

// New creates a new cluster. This function should be called from a controller.
func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster {
func New(ctx context.Context, cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster {
deletePropagationPolicy := metav1.DeletePropagationOrphan

// Create a cancellable context for this cluster
clusterCtx, cancelFunc := context.WithCancel(ctx)

podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) {
e, ok := obj.(PodEvent)
if !ok {
Expand All @@ -137,6 +146,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
}

cluster := &Cluster{
ctx: clusterCtx,
cancelFunc: cancelFunc,
Config: cfg,
Postgresql: pgSpec,
pgUsers: make(map[string]spec.PgUser),
Expand All @@ -149,7 +160,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
PatroniEndpoints: make(map[string]*v1.Endpoints),
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim),
Streams: make(map[string]*zalandov1.FabricEventStream)},
Streams: make(map[string]*zalandov1.FabricEventStream),
},
userSyncStrategy: users.DefaultUserSyncStrategy{
PasswordEncryption: passwordEncryption,
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
Expand All @@ -175,6 +187,62 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
return cluster
}

// Cancel cancels the cluster's context, which will cause any ongoing
// context-aware operations (like Sync) to return early.
func (c *Cluster) Cancel() {
if c.cancelFunc != nil {
c.cancelFunc()
}
}

// StartSync attempts to start a sync operation. Returns true if sync can start
// (no sync currently running and context not cancelled). Returns false if a sync
// is already running (needsResync is set) or if context is cancelled (deletion in progress).
func (c *Cluster) StartSync() bool {
c.syncMu.Lock()
defer c.syncMu.Unlock()

// Check if context is cancelled (deletion in progress)
select {
case <-c.ctx.Done():
return false
default:
}

if c.syncRunning {
c.needsResync = true
return false
}
c.syncRunning = true
c.needsResync = false
return true
}

// EndSync marks the sync operation as complete.
func (c *Cluster) EndSync() {
c.syncMu.Lock()
defer c.syncMu.Unlock()
c.syncRunning = false
}

// NeedsResync returns true if a resync was requested while sync was running,
// and clears the flag. Returns false if context is cancelled (deletion in progress).
func (c *Cluster) NeedsResync() bool {
c.syncMu.Lock()
defer c.syncMu.Unlock()

// Check if context is cancelled (deletion in progress)
select {
case <-c.ctx.Done():
return false
default:
}

result := c.needsResync
c.needsResync = false
return result
}

func (c *Cluster) clusterName() spec.NamespacedName {
return util.NameFromMeta(c.ObjectMeta)
}
Expand Down Expand Up @@ -276,7 +344,7 @@ func (c *Cluster) Create() (err error) {
errStatus error
)
if err == nil {
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) // TODO: are you sure it's running?
} else {
c.logger.Warningf("cluster created failed: %v", err)
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
Expand Down Expand Up @@ -440,7 +508,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
var match, needsRollUpdate, needsReplace bool

match = true
//TODO: improve me
// TODO: improve me
if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas {
match = false
reasons = append(reasons, "new statefulset's number of replicas does not match the current one")
Expand Down Expand Up @@ -672,7 +740,6 @@ func compareResourcesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resourc
}
}
return true

}

func compareEnv(a, b []v1.EnvVar) bool {
Expand Down Expand Up @@ -707,9 +774,7 @@ func compareEnv(a, b []v1.EnvVar) bool {
}

func compareSpiloConfiguration(configa, configb string) bool {
var (
oa, ob spiloConfiguration
)
var oa, ob spiloConfiguration

var err error
err = json.Unmarshal([]byte(configa), &oa)
Expand Down Expand Up @@ -818,7 +883,6 @@ func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]
}

return reason != "", reason

}

func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
Expand Down Expand Up @@ -895,7 +959,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLog
}

func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
// TODO: improve comparison
if !reflect.DeepEqual(new.Spec, cur.Spec) {
return false, "new PDB's spec does not match the current one"
}
Expand Down Expand Up @@ -944,8 +1008,17 @@ func (c *Cluster) removeFinalizer() error {
}

c.logger.Infof("removing finalizer %s", finalizerName)
finalizers := util.RemoveString(c.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers)

// Fetch the latest version of the object to avoid resourceVersion conflicts
clusterName := c.clusterName()
latestPg, err := c.KubeClient.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Get(
context.TODO(), clusterName.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching latest postgresql for finalizer removal: %v", err)
}

finalizers := util.RemoveString(latestPg.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(clusterName, latestPg, finalizers)
if err != nil {
return fmt.Errorf("error removing finalizer: %v", err)
}
Expand Down Expand Up @@ -1063,7 +1136,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}

c.logger.Debug("syncing secrets")
//TODO: mind the secrets of the deleted/new users
// TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil {
c.logger.Errorf("could not sync secrets: %v", err)
updateFailed = true
Expand Down Expand Up @@ -1101,7 +1174,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {

// logical backup job
func() {

// create if it did not exist
if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup {
c.logger.Debug("creating backup cron job")
Expand Down Expand Up @@ -1129,7 +1201,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true
}
}

}()

// Roles and Databases
Expand Down Expand Up @@ -1206,7 +1277,7 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
func (c *Cluster) Delete() error {
var anyErrors = false
anyErrors := false
c.mu.Lock()
defer c.mu.Unlock()
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
Expand Down Expand Up @@ -1297,7 +1368,6 @@ func (c *Cluster) NeedsRepair() (bool, acidv1.PostgresStatus) {
c.specMu.RLock()
defer c.specMu.RUnlock()
return !c.Status.Success(), c.Status

}

// ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue.
Expand Down Expand Up @@ -1406,7 +1476,6 @@ func (c *Cluster) initSystemUsers() {
}

func (c *Cluster) initPreparedDatabaseRoles() error {

if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB
c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}}
}
Expand Down Expand Up @@ -1472,10 +1541,9 @@ func (c *Cluster) initPreparedDatabaseRoles() error {
}

func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix, searchPath, secretNamespace string) error {

for defaultRole, inherits := range defaultRoles {
namespace := c.Namespace
//if namespaced secrets are allowed
// if namespaced secrets are allowed
if secretNamespace != "" {
if c.Config.OpConfig.EnableCrossNamespaceSecret {
namespace = secretNamespace
Expand Down Expand Up @@ -1543,7 +1611,7 @@ func (c *Cluster) initRobotUsers() error {
}
}

//if namespaced secrets are allowed
// if namespaced secrets are allowed
if c.Config.OpConfig.EnableCrossNamespaceSecret {
if strings.Contains(username, ".") {
splits := strings.Split(username, ".")
Expand Down Expand Up @@ -1594,7 +1662,6 @@ func (c *Cluster) initAdditionalOwnerRoles() {

func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) error {
teamMembers, err := c.getTeamMembers(teamID)

if err != nil {
return fmt.Errorf("could not get list of team members for team %q: %v", teamID, err)
}
Expand Down Expand Up @@ -1633,7 +1700,6 @@ func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) e
}

func (c *Cluster) initHumanUsers() error {

var clusterIsOwnedBySuperuserTeam bool
superuserTeams := []string{}

Expand Down
8 changes: 4 additions & 4 deletions pkg/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var logger = logrus.New().WithField("test", "cluster")
// 1 cluster, primary endpoint, 2 services, the secrets, the statefulset and pods being ready
var eventRecorder = record.NewFakeRecorder(7)

var cl = New(
var cl = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestCreate(t *testing.T) {
client.Postgresqls(clusterNamespace).Create(context.TODO(), &pg, metav1.CreateOptions{})
client.Pods(clusterNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{})

var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Expand Down Expand Up @@ -1629,7 +1629,7 @@ func TestCompareLogicalBackupJob(t *testing.T) {
},
}

var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Expand Down Expand Up @@ -1778,7 +1778,7 @@ func TestCrossNamespacedSecrets(t *testing.T) {
},
}

var cluster = New(
var cluster = New(context.Background(),
Config{
OpConfig: config.Config{
ConnectionPooler: config.ConnectionPooler{
Expand Down
Loading
Loading