Skip to content

Commit

Permalink
When netpol is added to a workload, the workload's POD can be accesse…
Browse files Browse the repository at this point in the history
…d using service

Co-authored-by: wang_yudong <wang_yudong@inspur.com>
  • Loading branch information
zhangzujian and wangyd1988 committed Dec 16, 2021
1 parent 0be7e33 commit d8e84cf
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 23 deletions.
1 change: 1 addition & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func NewController(config *Configuration) *Controller {
})

serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueAddService,
DeleteFunc: controller.enqueueDeleteService,
UpdateFunc: controller.enqueueUpdateService,
})
Expand Down
234 changes: 213 additions & 21 deletions pkg/controller/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,58 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}

// set svc address_set
svcAsNameIPv4 := strings.Replace(fmt.Sprintf("%s.%s.service.%s", np.Name, np.Namespace, kubeovnv1.ProtocolIPv4), "-", ".", -1)
svcAsNameIPv6 := strings.Replace(fmt.Sprintf("%s.%s.service.%s", np.Name, np.Namespace, kubeovnv1.ProtocolIPv6), "-", ".", -1)
svcIpv4s, svcIpv6s, err := c.fetchSelectedSvc(np.Namespace, &np.Spec.PodSelector)
if err != nil {
klog.Errorf("failed to fetchSelectedSvc svcIPs result %v", err)
return err
}
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
protocol := util.CheckProtocol(cidrBlock)
svcAsName := svcAsNameIPv4
svcIPs := svcIpv4s
if protocol == kubeovnv1.ProtocolIPv6 {
svcAsName = svcAsNameIPv6
svcIPs = svcIpv6s
}
if err := c.ovnClient.CreateNpAddressSet(svcAsName, np.Namespace, np.Name, "service"); err != nil {
klog.Errorf("failed to create address_set %s, %v", svcAsNameIPv4, err)
return err
}
if err := c.ovnClient.SetAddressesToAddressSet(svcIPs, svcAsName); err != nil {
klog.Errorf("failed to set netpol svc, %v", err)
return err
}
}

// before update or add ingress info,we should first delete acl and address_set
if err := c.ovnClient.DeleteACL(pgName, "to-lport"); err != nil {
klog.Errorf("failed to delete np %s ingress acls, %v", key, err)
return err
}

ingressAsNames, err := c.ovnClient.ListNpAddressSet(np.Namespace, np.Name, "ingress")
if err != nil {
klog.Errorf("failed to list ingress address_set, %v", err)
return err
}
for _, ingressAsName := range ingressAsNames {
if err := c.ovnClient.DeleteAddressSet(ingressAsName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}

if hasIngressRule(np) {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
protocol := util.CheckProtocol(cidrBlock)
svcAsName := svcAsNameIPv4
if protocol == kubeovnv1.ProtocolIPv6 {
svcAsName = svcAsNameIPv6
}

for idx, npr := range np.Spec.Ingress {
// A single address set must contain addresses of the same type and the name must be unique within table, so IPv4 and IPv6 address set should be different
ingressAllowAsName := fmt.Sprintf("%s.%s.%d", ingressAllowAsNamePrefix, protocol, idx)
Expand Down Expand Up @@ -256,7 +305,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}

if len(allows) != 0 || len(excepts) != 0 {
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, npr.Ports); err != nil {
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, svcAsName, protocol, npr.Ports); err != nil {
klog.Errorf("failed to create ingress acls for np %s, %v", key, err)
return err
}
Expand All @@ -275,7 +324,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}
ingressPorts := []netv1.NetworkPolicyPort{}
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, protocol, ingressPorts); err != nil {
if err := c.ovnClient.CreateIngressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, ingressAllowAsName, ingressExceptAsName, svcAsName, protocol, ingressPorts); err != nil {
klog.Errorf("failed to create ingress acls for np %s, %v", key, err)
return err
}
Expand Down Expand Up @@ -324,9 +373,31 @@ func (c *Controller) handleUpdateNp(key string) error {
}
}

// before update or add egress info, we should first delete acl and address_set
if err := c.ovnClient.DeleteACL(pgName, "from-lport"); err != nil {
klog.Errorf("failed to delete np %s egress acls, %v", key, err)
return err
}

egressAsNames, err := c.ovnClient.ListNpAddressSet(np.Namespace, np.Name, "egress")
if err != nil {
klog.Errorf("failed to list egress address_set, %v", err)
return err
}
for _, egressAsName := range egressAsNames {
if err := c.ovnClient.DeleteAddressSet(egressAsName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}
if hasEgressRule(np) {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
protocol := util.CheckProtocol(cidrBlock)
svcAsName := svcAsNameIPv4
if protocol == kubeovnv1.ProtocolIPv6 {
svcAsName = svcAsNameIPv6
}

for idx, npr := range np.Spec.Egress {
// A single address set must contain addresses of the same type and the name must be unique within table, so IPv4 and IPv6 address set should be different
egressAllowAsName := fmt.Sprintf("%s.%s.%d", egressAllowAsNamePrefix, protocol, idx)
Expand Down Expand Up @@ -372,7 +443,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}

if len(allows) != 0 || len(excepts) != 0 {
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, npr.Ports); err != nil {
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, npr.Ports, svcAsName); err != nil {
klog.Errorf("failed to create egress acls for np %s, %v", key, err)
return err
}
Expand All @@ -391,7 +462,7 @@ func (c *Controller) handleUpdateNp(key string) error {
return err
}
egressPorts := []netv1.NetworkPolicyPort{}
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, egressPorts); err != nil {
if err := c.ovnClient.CreateEgressACL(fmt.Sprintf("%s/%s", np.Namespace, np.Name), pgName, egressAllowAsName, egressExceptAsName, protocol, egressPorts, svcAsName); err != nil {
klog.Errorf("failed to create egress acls for np %s, %v", key, err)
return err
}
Expand Down Expand Up @@ -422,24 +493,8 @@ func (c *Controller) handleUpdateNp(key string) error {
}
}
}
} else {
if err := c.ovnClient.DeleteACL(pgName, "from-lport"); err != nil {
klog.Errorf("failed to delete np %s egress acls, %v", key, err)
return err
}

asNames, err := c.ovnClient.ListNpAddressSet(np.Namespace, np.Name, "egress")
if err != nil {
klog.Errorf("failed to list address_set, %v", err)
return err
}
for _, asName := range asNames {
if err := c.ovnClient.DeleteAddressSet(asName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}
}

if err := c.ovnClient.CreateGatewayACL(pgName, subnet.Spec.Gateway, subnet.Spec.CIDRBlock); err != nil {
klog.Errorf("failed to create gateway acl, %v", err)
return err
Expand All @@ -459,6 +514,18 @@ func (c *Controller) handleDeleteNp(key string) error {
klog.Errorf("failed to delete np %s port group, %v", key, err)
}

svcAsNames, err := c.ovnClient.ListNpAddressSet(namespace, name, "service")
if err != nil {
klog.Errorf("failed to list svc address_set, %v", err)
return err
}
for _, asName := range svcAsNames {
if err := c.ovnClient.DeleteAddressSet(asName); err != nil {
klog.Errorf("failed to delete np %s address set, %v", key, err)
return err
}
}

ingressAsNames, err := c.ovnClient.ListNpAddressSet(namespace, name, "ingress")
if err != nil {
klog.Errorf("failed to list address_set, %v", err)
Expand Down Expand Up @@ -518,6 +585,45 @@ func (c *Controller) fetchSelectedPorts(namespace string, selector *metav1.Label
return ports, nil
}

func (c *Controller) fetchSelectedSvc(namespace string, selector *metav1.LabelSelector) ([]string, []string, error) {
sel, err := metav1.LabelSelectorAsSelector(selector)
if err != nil {
return nil, nil, fmt.Errorf("error creating label selector, %v", err)
}
pods, err := c.podsLister.Pods(namespace).List(sel)
if err != nil {
return nil, nil, fmt.Errorf("failed to list pods, %v", err)
}

svcIpv4s := make([]string, 0)
svcIpv6s := make([]string, 0)
svcs, err := c.servicesLister.Services(namespace).List(labels.Everything())
if err != nil {
klog.Errorf("failed to list svc, %v", err)
return nil, nil, err
}

for _, pod := range pods {
if !isPodAlive(pod) {
continue
}
if !pod.Spec.HostNetwork && pod.Annotations[util.AllocatedAnnotation] == "true" {
svcIpv4, err := svcMatchPods(svcs, pod, kubeovnv1.ProtocolIPv4)
if err != nil {
return nil, nil, err
}
svcIpv4s = append(svcIpv4s, svcIpv4...)

svcIpv6, err := svcMatchPods(svcs, pod, kubeovnv1.ProtocolIPv6)
if err != nil {
return nil, nil, err
}
svcIpv6s = append(svcIpv6s, svcIpv6...)
}
}
return svcIpv4s, svcIpv6s, nil
}

func hasIngressRule(np *netv1.NetworkPolicy) bool {
for _, pt := range np.Spec.PolicyTypes {
if strings.Contains(string(pt), string(netv1.PolicyTypeIngress)) {
Expand Down Expand Up @@ -580,17 +686,78 @@ func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, np
if err != nil {
return nil, nil, fmt.Errorf("failed to list pod, %v", err)
}
svcs, err := c.servicesLister.Services(ns).List(labels.Everything())
if err != nil {
klog.Errorf("failed to list svc, %v", err)
return nil, nil, fmt.Errorf("failed to list svc, %v", err)
}

for _, pod := range pods {
for _, podIP := range pod.Status.PodIPs {
if podIP.IP != "" && util.CheckProtocol(podIP.IP) == protocol {
selectedAddresses = append(selectedAddresses, podIP.IP)
if len(svcs) == 0 {
continue
}
klog.Infof("svc is %v", svcs)
svcIPs, err := svcMatchPods(svcs, pod, protocol)
if err != nil {
return nil, nil, err
}
klog.Infof("svcIPs is %v", svcIPs)
selectedAddresses = append(selectedAddresses, svcIPs...)
}
}
}
}
return selectedAddresses, exceptAddresses, nil
}

func svcMatchPods(svcs []*corev1.Service, pod *corev1.Pod, protocol string) ([]string, error) {
matchSvcs := []string{}
// find svc ip by pod's info
for _, svc := range svcs {
isMatch, err := isSvcMatchPod(svc, pod)
if err != nil {
return nil, err
}
if isMatch {
clusterIPs := svc.Spec.ClusterIPs
if len(clusterIPs) == 0 && svc.Spec.ClusterIP != "" && svc.Spec.ClusterIP != corev1.ClusterIPNone {
clusterIPs = []string{svc.Spec.ClusterIP}
}
protocolClusterIPs := getProtocolSvcIp(clusterIPs, protocol)
if len(protocolClusterIPs) != 0 {
matchSvcs = append(matchSvcs, protocolClusterIPs...)
}
}
}
return matchSvcs, nil
}
func getProtocolSvcIp(clusterIPs []string, protocol string) []string {
protocolClusterIPs := []string{}
for _, clusterIP := range clusterIPs {
if clusterIP != "" && clusterIP != corev1.ClusterIPNone && util.CheckProtocol(clusterIP) == protocol {
protocolClusterIPs = append(protocolClusterIPs, clusterIP)
}
}
return protocolClusterIPs
}
func isSvcMatchPod(svc *corev1.Service, pod *corev1.Pod) (bool, error) {
ss := metav1.SetAsLabelSelector(svc.Spec.Selector)
sel, err := metav1.LabelSelectorAsSelector(ss)
if err != nil {
return false, fmt.Errorf("error fetch label selector, %v", err)
}
if pod.Labels == nil {
return false, nil
}
if sel.Matches(labels.Set(pod.Labels)) {
return true, nil
}
return false, nil
}

func (c *Controller) podMatchNetworkPolicies(pod *corev1.Pod) []string {
podNs, _ := c.namespacesLister.Get(pod.Namespace)
nps, _ := c.npsLister.NetworkPolicies(corev1.NamespaceAll).List(labels.Everything())
Expand All @@ -603,6 +770,31 @@ func (c *Controller) podMatchNetworkPolicies(pod *corev1.Pod) []string {
return match
}

func (c *Controller) svcMatchNetworkPolicies(svc *corev1.Service) ([]string, error) {
// find all match pod
pods, err := c.podsLister.Pods(svc.Namespace).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list pods, %v", err)
}

// find all match netpol
nps, err := c.npsLister.NetworkPolicies(corev1.NamespaceAll).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list netpols, %v", err)
}
match := []string{}
for _, pod := range pods {
podNs, _ := c.namespacesLister.Get(pod.Namespace)
for _, np := range nps {
if isPodMatchNetworkPolicy(pod, *podNs, np, np.Namespace) {
match = append(match, fmt.Sprintf("%s/%s", np.Namespace, np.Name))
}
}
}
klog.Infof("match svc is %v", match)
return match, nil
}

func isPodMatchNetworkPolicy(pod *corev1.Pod, podNs corev1.Namespace, policy *netv1.NetworkPolicy, policyNs string) bool {
sel, _ := metav1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
if pod.Labels == nil {
Expand Down
36 changes: 36 additions & 0 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,30 @@ type vpcService struct {
Protocol v1.Protocol
}

func (c *Controller) enqueueAddService(obj interface{}) {
if !c.isLeader() {
return
}
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
svc := obj.(*v1.Service)
klog.V(3).Infof("enqueue update service %s", key)

var netpols []string
if netpols, err = c.svcMatchNetworkPolicies(svc); err != nil {
utilruntime.HandleError(err)
return
}

for _, np := range netpols {
c.updateNpQueue.Add(np)
}
}

func (c *Controller) enqueueDeleteService(obj interface{}) {
if !c.isLeader() {
return
Expand All @@ -29,6 +53,18 @@ func (c *Controller) enqueueDeleteService(obj interface{}) {
//klog.V(3).Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name)
klog.Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name)
if svc.Spec.ClusterIP != v1.ClusterIPNone && svc.Spec.ClusterIP != "" {

var netpols []string
var err error
if netpols, err = c.svcMatchNetworkPolicies(svc); err != nil {
utilruntime.HandleError(err)
return
}

for _, np := range netpols {
c.updateNpQueue.Add(np)
}

for _, port := range svc.Spec.Ports {
vpcSvc := &vpcService{
Vip: fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port),
Expand Down

0 comments on commit d8e84cf

Please sign in to comment.