Skip to content

Commit

Permalink
fix: eip and nat crd can delete even if nat gw pod deleted and ipatab… (
Browse files Browse the repository at this point in the history
#1917)

* fix: eip and nat crd can delete even if nat gw pod deleted and ipatabels nat gw not enable

fix: should delete eip nat successfully after disable nat gw| delete nat gw pod

* fix: 确保add操作不会因pod不存在而跳过,del操作不会因pod不存在而阻塞

Co-authored-by: zhangbingbing <zhangbingbing@yealink.com>
(cherry picked from commit 4882c35)
  • Loading branch information
bobz965 authored and oilbeater committed Sep 19, 2022
1 parent 95ebe00 commit bcaf1e7
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 95 deletions.
54 changes: 8 additions & 46 deletions pkg/controller/vpc_nat_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,7 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error {
c.vpcNatGwKeyMutex.Lock(key)
defer c.vpcNatGwKeyMutex.Unlock(key)
if vpcNatEnabled != "true" {
// wait and check again
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to addOrUpdateVpcNatGw, vpcNatEnabled='%s'", vpcNatEnabled)
}
return fmt.Errorf("iptables nat gw not enable")
}
gw, err := c.vpcNatGatewayLister.Get(key)
if err != nil {
Expand Down Expand Up @@ -299,9 +295,6 @@ func (c *Controller) syncVpcNatGwRules(key string) error {
// sync all nat crd
pod, err := c.getNatGwPod(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}

Expand All @@ -319,10 +312,7 @@ func (c *Controller) syncVpcNatGwRules(key string) error {

func (c *Controller) handleInitVpcNatGw(key string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed init vpc nat gateway, vpcNatEnabled='%s'", vpcNatEnabled)
}
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.Lock(key)
defer c.vpcNatGwKeyMutex.Unlock(key)
Expand All @@ -341,9 +331,6 @@ func (c *Controller) handleInitVpcNatGw(key string) error {

oriPod, err := c.getNatGwPod(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
pod := oriPod.DeepCopy()
Expand Down Expand Up @@ -377,10 +364,7 @@ func (c *Controller) handleInitVpcNatGw(key string) error {

func (c *Controller) handleUpdateVpcFloatingIp(natGwKey string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to update vpc floatingIp, vpcNatEnabled='%s'", vpcNatEnabled)
}
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.Lock(natGwKey)
defer c.vpcNatGwKeyMutex.Unlock(natGwKey)
Expand Down Expand Up @@ -412,10 +396,7 @@ func (c *Controller) handleUpdateVpcFloatingIp(natGwKey string) error {

func (c *Controller) handleUpdateVpcEip(natGwKey string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to update vpc eip, vpcNatEnabled='%s'", vpcNatEnabled)
}
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.Lock(natGwKey)
defer c.vpcNatGwKeyMutex.Unlock(natGwKey)
Expand Down Expand Up @@ -444,10 +425,7 @@ func (c *Controller) handleUpdateVpcEip(natGwKey string) error {

func (c *Controller) handleUpdateVpcSnat(natGwKey string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to update vpc snat, vpcNatEnabled='%s'", vpcNatEnabled)
}
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.Lock(natGwKey)
defer c.vpcNatGwKeyMutex.Unlock(natGwKey)
Expand Down Expand Up @@ -476,10 +454,7 @@ func (c *Controller) handleUpdateVpcSnat(natGwKey string) error {

func (c *Controller) handleUpdateVpcDnat(natGwKey string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed update vpc dnat, vpcNatEnabled='%s'", vpcNatEnabled)
}
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.Lock(natGwKey)
defer c.vpcNatGwKeyMutex.Unlock(natGwKey)
Expand Down Expand Up @@ -509,26 +484,17 @@ func (c *Controller) handleUpdateVpcDnat(natGwKey string) error {

func (c *Controller) handleUpdateNatGwSubnetRoute(natGwKey string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to update subnet route, vpcNatEnabled='%s'", vpcNatEnabled)
}
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.Lock(natGwKey)
defer c.vpcNatGwKeyMutex.Unlock(natGwKey)
gw, err := c.vpcNatGatewayLister.Get(natGwKey)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}

oriPod, err := c.getNatGwPod(natGwKey)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
pod := oriPod.DeepCopy()
Expand Down Expand Up @@ -746,8 +712,7 @@ func (c *Controller) getNatGwPod(name string) (*corev1.Pod, error) {
if err != nil {
return nil, err
} else if len(pods) == 0 {
time.Sleep(2 * time.Second)
return nil, fmt.Errorf("pod '%s' not exist", name)
return nil, k8serrors.NewNotFound(v1.Resource("pod"), name)
} else if len(pods) != 1 {
time.Sleep(5 * time.Second)
return nil, fmt.Errorf("too many pod")
Expand Down Expand Up @@ -776,9 +741,6 @@ func (c *Controller) initCreateAt(key string) (err error) {
}
pod, err := c.getNatGwPod(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
createAt = pod.CreationTimestamp.Format("2006-01-02T15:04:05")
Expand Down
19 changes: 5 additions & 14 deletions pkg/controller/vpc_nat_gw_eip.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,7 @@ func (c *Controller) processNextDeleteIptablesEipWorkItem() bool {

func (c *Controller) handleAddIptablesEip(key string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to add vpc nat eip, vpcNatEnabled='%s'", vpcNatEnabled)
}
return fmt.Errorf("iptables nat gw not enable")
}

c.vpcNatGwKeyMutex.Lock(key)
Expand Down Expand Up @@ -330,15 +327,8 @@ func (c *Controller) handleResetIptablesEip(key string) error {
}

func (c *Controller) handleUpdateIptablesEip(key string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to del vpc nat eip, vpcNatEnabled='%s'", vpcNatEnabled)
}
}
c.vpcNatGwKeyMutex.Lock(key)
defer c.vpcNatGwKeyMutex.Unlock(key)

cachedEip, err := c.iptablesEipsLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -367,6 +357,10 @@ func (c *Controller) handleUpdateIptablesEip(key string) error {
}
return nil
}
// add or update should make sure vpc nat enabled
if vpcNatEnabled != "true" {
return fmt.Errorf("iptables nat gw not enable")
}
if eip.Status.IP != "" && eip.Spec.V4ip == "" {
// eip spec V4ip is removed
if err = c.createOrUpdateCrdEip(key, eip.Namespace, eip.Status.IP, eip.Spec.V6ip, eip.Spec.MacAddress, eip.Spec.NatGwDp); err != nil {
Expand Down Expand Up @@ -542,9 +536,6 @@ func (c *Controller) GetEip(eipName string) (*kubeovnv1.IptablesEIP, error) {
func (c *Controller) createEipInPod(dp, gw, v4Cidr string) error {
gwPod, err := c.getNatGwPod(dp)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
var addRules []string
Expand Down
56 changes: 21 additions & 35 deletions pkg/controller/vpc_nat_gw_nat.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,7 @@ func (c *Controller) processNextDeleteIptablesSnatRuleWorkItem() bool {

func (c *Controller) handleAddIptablesFip(key string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to add vpc fip rule, vpcNatEnabled='%s'", vpcNatEnabled)
}
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.Lock(key)
defer c.vpcNatGwKeyMutex.Unlock(key)
Expand Down Expand Up @@ -571,12 +568,6 @@ func (c *Controller) handleAddIptablesFip(key string) error {
}

func (c *Controller) handleUpdateIptablesFip(key string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to del vpc fip rule, vpcNatEnabled='%s'", vpcNatEnabled)
}
}
c.vpcNatGwKeyMutex.Lock(key)
defer c.vpcNatGwKeyMutex.Unlock(key)

Expand All @@ -603,6 +594,10 @@ func (c *Controller) handleUpdateIptablesFip(key string) error {
c.resetIptablesEipQueue.Add(fip.Spec.EIP)
return nil
}
// add or update should make sure vpc nat enabled
if vpcNatEnabled != "true" {
return fmt.Errorf("iptables nat gw not enable")
}
eipName := cachedFip.Spec.EIP
if len(eipName) == 0 {
klog.Errorf("failed to update fip rule, should set eip ")
Expand Down Expand Up @@ -680,10 +675,7 @@ func (c *Controller) handleDelIptablesFip(key string) error {

func (c *Controller) handleAddIptablesDnatRule(key string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to add vpc dnat rule, vpcNatEnabled='%s'", vpcNatEnabled)
}
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.Lock(key)
defer c.vpcNatGwKeyMutex.Unlock(key)
Expand Down Expand Up @@ -749,12 +741,6 @@ func (c *Controller) handleAddIptablesDnatRule(key string) error {
}

func (c *Controller) handleUpdateIptablesDnatRule(key string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to del vpc dnat rule, vpcNatEnabled='%s'", vpcNatEnabled)
}
}
c.vpcNatGwKeyMutex.Lock(key)
defer c.vpcNatGwKeyMutex.Unlock(key)

Expand Down Expand Up @@ -801,6 +787,10 @@ func (c *Controller) handleUpdateIptablesDnatRule(key string) error {
if dup, err := c.isDnatDuplicated(eipName, dnat.Name, dnat.Spec.ExternalPort); dup || err != nil {
return err
}
// add or update should make sure vpc nat enabled
if vpcNatEnabled != "true" {
return fmt.Errorf("iptables nat gw not enable")
}
if c.dnatChangeEip(dnat, eip) {
klog.V(3).Infof("dnat change ip, old ip '%s', new ip %s", dnat.Status.V4ip, eip.Spec.V4ip)
if err = c.deleteDnatInPod(dnat.Status.NatGwDp, dnat.Spec.Protocol,
Expand Down Expand Up @@ -863,10 +853,7 @@ func (c *Controller) handleDelIptablesDnatRule(key string) error {

func (c *Controller) handleAddIptablesSnatRule(key string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to add vpc snat rule, vpcNatEnabled='%s'", vpcNatEnabled)
}
return fmt.Errorf("iptables nat gw not enable")
}
c.vpcNatGwKeyMutex.Lock(key)
defer c.vpcNatGwKeyMutex.Unlock(key)
Expand Down Expand Up @@ -930,12 +917,6 @@ func (c *Controller) handleAddIptablesSnatRule(key string) error {
}

func (c *Controller) handleUpdateIptablesSnatRule(key string) error {
if vpcNatEnabled != "true" {
time.Sleep(10 * time.Second)
if vpcNatEnabled != "true" {
return fmt.Errorf("failed to del vpc snat rule, vpcNatEnabled='%s'", vpcNatEnabled)
}
}
c.vpcNatGwKeyMutex.Lock(key)
defer c.vpcNatGwKeyMutex.Unlock(key)

Expand Down Expand Up @@ -981,6 +962,10 @@ func (c *Controller) handleUpdateIptablesSnatRule(key string) error {
err = fmt.Errorf("failed to update snat %s, eip '%s' is used by %s", key, eipName, eip.Status.Nat)
return err
}
// add or update should make sure vpc nat enabled
if vpcNatEnabled != "true" {
return fmt.Errorf("iptables nat gw not enable")
}
// snat change eip
if c.snatChangeEip(snat, eip) {
klog.V(3).Infof("snat change ip, old ip %s, new ip %s", snat.Status.V4ip, eip.Spec.V4ip)
Expand Down Expand Up @@ -1495,9 +1480,6 @@ func (c *Controller) redoSnat(key, redo string, eipReady bool) error {
func (c *Controller) createFipInPod(dp, v4ip, internalIP string) error {
gwPod, err := c.getNatGwPod(dp)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
var addRules []string
Expand Down Expand Up @@ -1546,7 +1528,9 @@ func (c *Controller) createDnatInPod(dp, protocol, v4ip, internalIp, externalPor
func (c *Controller) deleteDnatInPod(dp, protocol, v4ip, internalIp, externalPort, internalPort string) error {
gwPod, err := c.getNatGwPod(dp)
if err != nil {
klog.Errorf("failed to get nat gw pod, %v", err)
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
// del nat
Expand Down Expand Up @@ -1578,7 +1562,9 @@ func (c *Controller) createSnatInPod(dp, v4ip, internalCIDR string) error {
func (c *Controller) deleteSnatInPod(dp, v4ip, internalCIDR string) error {
gwPod, err := c.getNatGwPod(dp)
if err != nil {
klog.Errorf("failed to get nat gw pod, %v", err)
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
// del nat
Expand Down

0 comments on commit bcaf1e7

Please sign in to comment.