Skip to content

Commit

Permalink
fix pod annotation may override by patch (#1480)
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Zhu <hackzhuyan@gmail.com>
  • Loading branch information
halfcrazy committed Apr 25, 2022
1 parent e772ee9 commit 881622d
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 63 deletions.
11 changes: 8 additions & 3 deletions pkg/controller/inspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func (c *Controller) inspectPod() error {
klog.Errorf("failed to list logical switch port, %v", err)
return err
}
for _, oripod := range pods {
pod := oripod.DeepCopy()
for _, oriPod := range pods {
pod := oriPod.DeepCopy()
if pod.Spec.HostNetwork {
continue
}
Expand Down Expand Up @@ -55,7 +55,12 @@ func (c *Controller) inspectPod() error {
if !isLspExist {
delete(pod.Annotations, fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName))
delete(pod.Annotations, fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName))
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace"), metav1.PatchOptions{}, ""); err != nil {
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
klog.Errorf("patch pod %s/%s failed %v during inspection", pod.Name, pod.Namespace, err)
return err
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/controller/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ func (c *Controller) processNextAddNamespaceWorkItem() bool {
}

func (c *Controller) handleAddNamespace(key string) error {
orinamespace, err := c.namespacesLister.Get(key)
oriNamespace, err := c.namespacesLister.Get(key)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
namespace := orinamespace.DeepCopy()
namespace := oriNamespace.DeepCopy()

var ls string
var lss, cidrs, excludeIps []string
Expand Down Expand Up @@ -177,9 +177,7 @@ func (c *Controller) handleAddNamespace(key string) error {
excludeIps = append(excludeIps, strings.Join(subnet.Spec.ExcludeIps, ","))
}

op := "replace"
if namespace.Annotations == nil || len(namespace.Annotations) == 0 {
op = "add"
namespace.Annotations = map[string]string{}
} else {
if namespace.Annotations[util.LogicalSwitchAnnotation] == strings.Join(lss, ",") {
Expand All @@ -190,7 +188,12 @@ func (c *Controller) handleAddNamespace(key string) error {
namespace.Annotations[util.CidrAnnotation] = strings.Join(cidrs, ";")
namespace.Annotations[util.ExcludeIpsAnnotation] = strings.Join(excludeIps, ";")

if _, err = c.config.KubeClient.CoreV1().Namespaces().Patch(context.Background(), key, types.JSONPatchType, generatePatchPayload(namespace.Annotations, op), metav1.PatchOptions{}, ""); err != nil {
patch, err := util.GenerateStrategicMergePatchPayload(oriNamespace, namespace)
if err != nil {
return err
}
if _, err = c.config.KubeClient.CoreV1().Namespaces().Patch(context.Background(), key,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
klog.Errorf("patch namespace %s failed %v", key, err)
}
return err
Expand Down
42 changes: 19 additions & 23 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"strings"
"time"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ipam"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/util"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v3/pkg/logging"
multustypes "gopkg.in/k8snetworkplumbingwg/multus-cni.v3/pkg/types"
v1 "k8s.io/api/core/v1"
Expand All @@ -21,11 +25,6 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ipam"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/util"
)

func isPodAlive(p *v1.Pod) bool {
Expand Down Expand Up @@ -486,9 +485,8 @@ func (c *Controller) handleAddPod(key string) error {
return err
}

op := "replace"
oriPod := pod.DeepCopy()
if len(pod.Annotations) == 0 {
op = "add"
pod.Annotations = map[string]string{}
}
isVmPod, vmName := isVmPod(pod)
Expand Down Expand Up @@ -579,7 +577,12 @@ func (c *Controller) handleAddPod(key string) error {
}
}

if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(context.Background(), name, types.JSONPatchType, generatePatchPayload(pod.Annotations, op), metav1.PatchOptions{}, ""); err != nil {
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(context.Background(), name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
if k8serrors.IsNotFound(err) {
// Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod.
// Then we need to recycle the resource again.
Expand Down Expand Up @@ -756,14 +759,14 @@ func (c *Controller) handleUpdatePod(key string) error {
c.podKeyMutex.Lock(key)
defer c.podKeyMutex.Unlock(key)

oripod, err := c.podsLister.Pods(namespace).Get(name)
oriPod, err := c.podsLister.Pods(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
pod := oripod.DeepCopy()
pod := oriPod.DeepCopy()
podName := c.getNameByPod(pod)

klog.Infof("update pod %s/%s", namespace, name)
Expand Down Expand Up @@ -909,7 +912,12 @@ func (c *Controller) handleUpdatePod(key string) error {

pod.Annotations[fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)] = "true"
}
if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(context.Background(), name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace"), metav1.PatchOptions{}, ""); err != nil {
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(context.Background(), name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
if k8serrors.IsNotFound(err) {
// Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod.
// Then we need to recycle the resource again.
Expand Down Expand Up @@ -1326,18 +1334,6 @@ func (c *Controller) acquireAddress(pod *v1.Pod, podNet *kubeovnNet) (string, st
return "", "", "", podNet.Subnet, ipam.ErrNoAvailable
}

func generatePatchPayload(annotations map[string]string, op string) []byte {
patchPayloadTemplate :=
`[{
"op": "%s",
"path": "/metadata/annotations",
"value": %s
}]`

raw, _ := json.Marshal(annotations)
return []byte(fmt.Sprintf(patchPayloadTemplate, op, raw))
}

func (c *Controller) acquireStaticAddress(key, nicName, ip, mac, subnet string, liveMigration bool) (string, string, string, error) {
var v4IP, v6IP string
var err error
Expand Down
66 changes: 48 additions & 18 deletions pkg/controller/vpc_nat_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,14 @@ func (c *Controller) handleInitVpcNatGw(key string) error {
return fmt.Errorf("failed to initialize vpc nat gateway %s: %v", key, err)
}

oripod, err := c.getNatGwPod(key)
oriPod, err := c.getNatGwPod(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
pod := oripod.DeepCopy()
pod := oriPod.DeepCopy()

if pod.Status.Phase != corev1.PodRunning {
time.Sleep(5 * 1000)
Expand All @@ -353,7 +353,12 @@ func (c *Controller) handleInitVpcNatGw(key string) error {
return err
}
pod.Annotations[util.VpcNatGatewayInitAnnotation] = "true"
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace"), metav1.PatchOptions{}, ""); err != nil {
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
klog.Errorf("patch pod %s/%s failed %v", pod.Name, pod.Namespace, err)
return err
}
Expand All @@ -374,14 +379,14 @@ func (c *Controller) handleUpdateVpcEips(natGwKey string) error {
return err
}

oripod, err := c.getNatGwPod(natGwKey)
oriPod, err := c.getNatGwPod(natGwKey)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
pod := oripod.DeepCopy()
pod := oriPod.DeepCopy()

var toBeDelEips, oldEips []*kubeovnv1.Eip
if eipAnnotation, ok := pod.Annotations[util.VpcEipsAnnotation]; ok {
Expand Down Expand Up @@ -431,7 +436,12 @@ func (c *Controller) handleUpdateVpcEips(natGwKey string) error {
return err
}
pod.Annotations[util.VpcEipsAnnotation] = string(eipBytes)
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace"), metav1.PatchOptions{}, ""); err != nil {
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
klog.Errorf("patch pod %s/%s failed %v", pod.Name, pod.Namespace, err)
return err
}
Expand All @@ -452,14 +462,14 @@ func (c *Controller) handleUpdateVpcFloatingIp(natGwKey string) error {
return err
}

oripod, err := c.getNatGwPod(natGwKey)
oriPod, err := c.getNatGwPod(natGwKey)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
pod := oripod.DeepCopy()
pod := oriPod.DeepCopy()

// check md5
newMd5 := fmt.Sprintf("%x", structhash.Md5(gw.Spec.FloatingIpRules, 1))
Expand All @@ -480,7 +490,12 @@ func (c *Controller) handleUpdateVpcFloatingIp(natGwKey string) error {

// update annotation
pod.Annotations[util.VpcFloatingIpMd5Annotation] = newMd5
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace"), metav1.PatchOptions{}, ""); err != nil {
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
klog.Errorf("patch pod %s/%s failed %v", pod.Name, pod.Namespace, err)
return err
}
Expand All @@ -502,14 +517,14 @@ func (c *Controller) handleUpdateVpcSnat(natGwKey string) error {
return err
}

oripod, err := c.getNatGwPod(natGwKey)
oriPod, err := c.getNatGwPod(natGwKey)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
pod := oripod.DeepCopy()
pod := oriPod.DeepCopy()

// check md5
newMd5 := fmt.Sprintf("%x", structhash.Md5(gw.Spec.SnatRules, 1))
Expand All @@ -530,7 +545,12 @@ func (c *Controller) handleUpdateVpcSnat(natGwKey string) error {

// update annotation
pod.Annotations[util.VpcSnatMd5Annotation] = newMd5
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace"), metav1.PatchOptions{}, ""); err != nil {
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
klog.Errorf("patch pod %s/%s failed %v", pod.Name, pod.Namespace, err)
return err
}
Expand All @@ -551,14 +571,14 @@ func (c *Controller) handleUpdateVpcDnat(natGwKey string) error {
return err
}

oripod, err := c.getNatGwPod(natGwKey)
oriPod, err := c.getNatGwPod(natGwKey)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
pod := oripod.DeepCopy()
pod := oriPod.DeepCopy()

// check md5
newMd5 := fmt.Sprintf("%x", structhash.Md5(gw.Spec.DnatRules, 1))
Expand All @@ -579,7 +599,12 @@ func (c *Controller) handleUpdateVpcDnat(natGwKey string) error {

// update annotation
pod.Annotations[util.VpcDnatMd5Annotation] = newMd5
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace"), metav1.PatchOptions{}, ""); err != nil {
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
klog.Errorf("patch pod %s/%s failed %v", pod.Name, pod.Namespace, err)
return err
}
Expand All @@ -600,14 +625,14 @@ func (c *Controller) handleUpdateNatGwSubnetRoute(natGwKey string) error {
return err
}

oripod, err := c.getNatGwPod(natGwKey)
oriPod, err := c.getNatGwPod(natGwKey)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
pod := oripod.DeepCopy()
pod := oriPod.DeepCopy()

gwSubnet, err := c.subnetsLister.Get(gw.Spec.Subnet)
if err != nil {
Expand Down Expand Up @@ -671,7 +696,12 @@ func (c *Controller) handleUpdateNatGwSubnetRoute(natGwKey string) error {
return err
}
pod.Annotations[util.VpcCIDRsAnnotation] = string(cidrBytes)
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace"), metav1.PatchOptions{}, ""); err != nil {
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
klog.Errorf("patch pod %s/%s failed %v", pod.Name, pod.Namespace, err)
return err
}
Expand Down
29 changes: 15 additions & 14 deletions pkg/ovn_leader_checker/ovn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package ovn_leader_checker

import (
"context"
"encoding/json"
"flag"
"fmt"
"github.com/kubeovn/kube-ovn/pkg/util"
"io/ioutil"
"os"
exec "os/exec"
Expand Down Expand Up @@ -293,20 +293,21 @@ func stealLock() {

}

func generatePatchPayload(labels map[string]string, op string) []byte {
patchPayloadTemplate :=
`[{
"op": "%s",
"path": "/metadata/labels",
"value": %s
}]`

raw, _ := json.Marshal(labels)
return []byte(fmt.Sprintf(patchPayloadTemplate, op, raw))
}

func patchPodLabels(cfg *Configuration, pod *v1.Pod, labels map[string]string) error {
_, err := cfg.KubeClient.CoreV1().Pods(pod.ObjectMeta.Namespace).Patch(context.Background(), pod.ObjectMeta.Name, types.JSONPatchType, generatePatchPayload(labels, "replace"), metav1.PatchOptions{}, "")
oriPod := pod.DeepCopy()
if pod.Labels == nil {
pod.Labels = labels
} else {
for k, v := range labels {
pod.Labels[k] = v
}
}
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
return err
}
_, err = cfg.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "")
return err
}

Expand Down

0 comments on commit 881622d

Please sign in to comment.