Skip to content

Commit

Permalink
refactor: refactor cni-server
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed Apr 16, 2020
1 parent d99ffff commit 8d85365
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 176 deletions.
7 changes: 7 additions & 0 deletions cmd/daemon/cniserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"github.com/alauda/kube-ovn/pkg/util"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
_ "net/http/pprof"
Expand Down Expand Up @@ -32,6 +33,12 @@ func main() {
klog.Fatalf("init node gateway failed %v", err)
}

if config.NetworkType == util.NetworkTypeVlan {
if err = daemon.InitVlan(config); err != nil {
klog.Fatalf("init vlan config failed %v", err)
}
}

stopCh := signals.SetupSignalHandler()
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0,
kubeinformers.WithTweakListOptions(func(listOption *v1.ListOptions) {
Expand Down
6 changes: 6 additions & 0 deletions dist/images/cleanup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ for ns in $(kubectl get ns -o name |cut -c 11-); do
kubectl annotate pod --all ovn.kubernetes.io/mac_address- -n "$ns"
kubectl annotate pod --all ovn.kubernetes.io/port_name- -n "$ns"
kubectl annotate pod --all ovn.kubernetes.io/allocated- -n "$ns"
kubectl annotate pod --all ovn.kubernetes.io/routed- -n "$ns"
kubectl annotate pod --all ovn.kubernetes.io/vlan_id- -n "$ns"
kubectl annotate pod --all ovn.kubernetes.io/vlan_range- -n "$ns"
kubectl annotate pod --all ovn.kubernetes.io/network_types- -n "$ns"
kubectl annotate pod --all ovn.kubernetes.io/provider_interface_name- -n "$ns"
kubectl annotate pod --all ovn.kubernetes.io/host_interface_name- -n "$ns"
done
2 changes: 2 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@ spec:
- --encap-checksum=true
- --service-cluster-ip-range=$SVC_CIDR
- --iface=${IFACE}
- --network-type=$NETWORK_TYPE
- --default-interface-name=$VLAN_INTERFACE_NAME
securityContext:
capabilities:
add: ["NET_ADMIN", "SYS_ADMIN", "SYS_PTRACE"]
Expand Down
10 changes: 10 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type Configuration struct {
NodeLocalDNSIP string
EncapChecksum bool
PprofPort int
NetworkType string
DefaultProviderName string
DefaultInterfaceName string
}

// ParseFlags will parse cmd args then init kubeClient and configuration
Expand All @@ -55,6 +58,10 @@ func ParseFlags() (*Configuration, error) {
argNodeLocalDnsIP = pflag.String("node-local-dns-ip", "", "If use nodelocaldns the local dns server ip should be set here, default empty.")
argEncapChecksum = pflag.Bool("encap-checksum", true, "Enable checksum, default: true")
argPprofPort = pflag.Int("pprof-port", 10665, "The port to get profiling data, default: 10665")

argsNetworkType = pflag.String("network-type", "geneve", "The ovn network type, default: geneve")
argsDefaultProviderName = pflag.String("default-provider-name", "provider", "The vlan or xvlan type default provider interface name, default: provider")
argsDefaultInterfaceName = pflag.String("default-interface-name", "", "The default host interface name in the vlan/xvlan type")
)

// mute info log for ipset lib
Expand Down Expand Up @@ -95,6 +102,9 @@ func ParseFlags() (*Configuration, error) {
ServiceClusterIPRange: *argServiceClusterIPRange,
NodeLocalDNSIP: *argNodeLocalDnsIP,
EncapChecksum: *argEncapChecksum,
NetworkType: *argsNetworkType,
DefaultProviderName: *argsDefaultProviderName,
DefaultInterfaceName: *argsDefaultInterfaceName,
}

if err := config.initNicConfig(); err != nil {
Expand Down
208 changes: 37 additions & 171 deletions pkg/daemon/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,21 @@ package daemon

import (
"fmt"
"github.com/alauda/kube-ovn/pkg/ovs"
"github.com/vishvananda/netlink"
"net"
"net/http"
"strings"
"time"

kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1"
clientset "github.com/alauda/kube-ovn/pkg/client/clientset/versioned"
"github.com/alauda/kube-ovn/pkg/request"
"github.com/alauda/kube-ovn/pkg/util"

"github.com/emicklei/go-restful"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1"
clientset "github.com/alauda/kube-ovn/pkg/client/clientset/versioned"
"github.com/alauda/kube-ovn/pkg/request"
"github.com/alauda/kube-ovn/pkg/util"
)

type cniServerHandler struct {
Expand All @@ -35,26 +32,23 @@ func createCniServerHandler(config *Configuration) *cniServerHandler {

func (csh cniServerHandler) handleAdd(req *restful.Request, resp *restful.Response) {
podRequest := request.CniRequest{}
err := req.ReadEntity(&podRequest)
if err != nil {
if err := req.ReadEntity(&podRequest); err != nil {
errMsg := fmt.Errorf("parse add request failed %v", err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusBadRequest, request.CniResponse{Err: errMsg.Error()})
return
}

klog.Infof("add port request %v", podRequest)

var macAddr, ip, ipAddr, cidr, gw, subnet, ingress, egress, networkType, vlanID, vlanRange, providerInterfaceName, hostInterfaceName string
pod, err := csh.KubeClient.CoreV1().Pods(podRequest.PodNamespace).Get(podRequest.PodName, v1.GetOptions{})
if err != nil {
errMsg := fmt.Errorf("get pod %s/%s failed %v", podRequest.PodNamespace, podRequest.PodName, err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
}

var macAddr, ip, ipAddr, cidr, gw, subnet, ingress, egress, vlanID string
for i := 0; i < 10; i++ {
pod, err := csh.KubeClient.CoreV1().Pods(podRequest.PodNamespace).Get(podRequest.PodName, v1.GetOptions{})
if err != nil {
errMsg := fmt.Errorf("get pod %s/%s failed %v", podRequest.PodNamespace, podRequest.PodName, err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
}
if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podRequest.Provider)] != "true" {
klog.Infof("wait address for pod %s/%s ", podRequest.PodNamespace, podRequest.PodName)
// wait controller assign an address
Expand All @@ -76,21 +70,30 @@ func (csh cniServerHandler) handleAdd(req *restful.Request, resp *restful.Respon
ingress = pod.Annotations[util.IngressRateAnnotation]
egress = pod.Annotations[util.EgressRateAnnotation]
vlanID = pod.Annotations[util.VlanIdAnnotation]
networkType = pod.Annotations[util.NetworkType]
providerInterfaceName = pod.Annotations[util.ProviderInterfaceName]
hostInterfaceName = pod.Annotations[util.HostInterfaceName]
vlanRange = pod.Annotations[util.VlanRangeAnnotation]

ipAddr = fmt.Sprintf("%s/%s", ip, strings.Split(cidr, "/")[1])
break
}

if macAddr == "" || ip == "" || cidr == "" || gw == "" {
errMsg := fmt.Errorf("no available ip for pod %s/%s", podRequest.PodNamespace, podRequest.PodName)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
if err := csh.createOrUpdateIPCr(podRequest, subnet, ip, macAddr); err != nil {
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: err.Error()})
return
}

if podRequest.Provider == util.OvnProvider {
klog.Infof("create container mac %s, ip %s, cidr %s, gw %s", macAddr, ipAddr, cidr, gw)
err := csh.configureNic(podRequest.PodName, podRequest.PodNamespace, podRequest.NetNs, podRequest.ContainerID, macAddr, ipAddr, gw, ingress, egress, vlanID)
if err != nil {
errMsg := fmt.Errorf("configure nic failed %v", err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
}
}

resp.WriteHeaderAndEntity(http.StatusOK, request.CniResponse{Protocol: util.CheckProtocol(ipAddr), IpAddress: strings.Split(ipAddr, "/")[0], MacAddress: macAddr, CIDR: cidr, Gateway: gw})
}

func (csh cniServerHandler) createOrUpdateIPCr(podRequest request.CniRequest, subnet, ip, macAddr string) error {
ipCr, err := csh.KubeOvnClient.KubeovnV1().IPs().Get(fmt.Sprintf("%s.%s", podRequest.PodName, podRequest.PodNamespace), metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
Expand All @@ -115,97 +118,25 @@ func (csh cniServerHandler) handleAdd(req *restful.Request, resp *restful.Respon
if err != nil {
errMsg := fmt.Errorf("failed to create ip crd for %s, %v", ip, err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
return errMsg
}
} else {
errMsg := fmt.Errorf("failed to get ip crd for %s, %v", ip, err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
return errMsg
}
} else {
ipCr.Labels[subnet] = ""
ipCr.Spec.AttachSubnets = append(ipCr.Spec.AttachSubnets, subnet)
ipCr.Spec.AttachIPs = append(ipCr.Spec.AttachIPs, ip)
ipCr.Spec.AttachMacs = append(ipCr.Spec.AttachMacs, macAddr)
_, err := csh.KubeOvnClient.KubeovnV1().IPs().Update(ipCr)
if err != nil {
if _, err := csh.KubeOvnClient.KubeovnV1().IPs().Update(ipCr); err != nil {
errMsg := fmt.Errorf("failed to update ip crd for %s, %v", ip, err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
}
}

ipAddr = fmt.Sprintf("%s/%s", ip, strings.Split(cidr, "/")[1])
if podRequest.Provider == util.OvnProvider {
klog.Infof("create container mac %s, ip %s, cidr %s, gw %s", macAddr, ipAddr, cidr, gw)
err = csh.configureNic(podRequest.PodName, podRequest.PodNamespace, podRequest.NetNs, podRequest.ContainerID, macAddr, ipAddr, gw, ingress, egress)
if err != nil {
errMsg := fmt.Errorf("configure nic failed %v", err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
}
}

if util.IsProviderVlan(networkType, providerInterfaceName) {
//create patch port
exists, err := providerBridgeExists()
if err != nil {
errMsg := fmt.Errorf("check provider bridge exists failed, %v", err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
}

if !exists {
//create br-provider
if err = configProviderPort(providerInterfaceName); err != nil {
errMsg := fmt.Errorf("configure patch port br-provider failed %v", err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
}

//add a host nic to br-provider
ifName := csh.getInterfaceName(hostInterfaceName)
if ifName == "" {
errMsg := fmt.Errorf("failed get host nic to add ovs br-provider")
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
}

if err = configProviderNic(ifName); err != nil {
errMsg := fmt.Errorf("add nic %s to port br-provider failed %v", ifName, err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
}
}

if err = csh.addRoute(ipAddr); err != nil {
errMsg := fmt.Errorf("add pod route failed, %v", err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
}
}

//set ovs port tag
if util.IsNetworkVlan(networkType, vlanID, vlanRange) {
hostNicName, _ := generateNicName(podRequest.ContainerID)
if err := ovs.SetPortTag(hostNicName, vlanID); err != nil {
errMsg := fmt.Errorf("configure port tag failed %v", err)
klog.Error(errMsg)
resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: errMsg.Error()})
return
return errMsg
}
}

resp.WriteHeaderAndEntity(http.StatusOK, request.CniResponse{Protocol: util.CheckProtocol(ipAddr), IpAddress: strings.Split(ipAddr, "/")[0], MacAddress: macAddr, CIDR: cidr, Gateway: gw})
return nil
}

func (csh cniServerHandler) handleDel(req *restful.Request, resp *restful.Response) {
Expand Down Expand Up @@ -240,68 +171,3 @@ func (csh cniServerHandler) handleDel(req *restful.Request, resp *restful.Respon

resp.WriteHeader(http.StatusNoContent)
}

//get host nic name
func (csh cniServerHandler) getInterfaceName(hostInterfaceName string) string {
var interfaceName string

node, err := csh.Config.KubeClient.CoreV1().Nodes().Get(csh.Config.NodeName, metav1.GetOptions{})
if err == nil {
labels := node.GetLabels()
interfaceName = labels[util.HostInterfaceName]
}

if interfaceName != "" {
return interfaceName
}

if hostInterfaceName != "" {
return hostInterfaceName
}

if csh.Config.Iface != "" {
return csh.Config.Iface
}

return ""
}

//add a static route. If it is not added, the pod will not receive packets from the host nic
func (csh cniServerHandler) addRoute(ipAddr string) error {
nic, err := netlink.LinkByName(util.NodeNic)
if err != nil {
klog.Errorf("failed to get nic %s", util.NodeNic)
return fmt.Errorf("failed to get nic %s", util.NodeNic)
}

existRoutes, err := netlink.RouteList(nic, netlink.FAMILY_V4)
if err != nil {
return err
}

_, cidr, _ := net.ParseCIDR(ipAddr)
for _, route := range existRoutes {
if route.Dst == cidr {
return nil
}
}

node, err := csh.Config.KubeClient.CoreV1().Nodes().Get(csh.Config.NodeName, metav1.GetOptions{})
if err != nil {
klog.Errorf("failed to get node %s %v", csh.Config.NodeName, err)
return err
}

gateway, ok := node.Annotations[util.GatewayAnnotation]
if !ok {
klog.Errorf("annotation for node %s ovn.kubernetes.io/gateway not exists", node.Name)
return fmt.Errorf("annotation for node ovn.kubernetes.io/gateway not exists")
}

gw := net.ParseIP(gateway)
if err = netlink.RouteReplace(&netlink.Route{Dst: cidr, LinkIndex: nic.Attrs().Index, Scope: netlink.SCOPE_UNIVERSE, Gw: gw}); err != nil {
klog.Errorf("failed to add route %v", err)
}

return err
}

0 comments on commit 8d85365

Please sign in to comment.