/
endpoint.go
975 lines (813 loc) · 30.9 KB
/
endpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
// Copyright 2016-2020 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"context"
"fmt"
"net"
"sync"
"time"
"github.com/cilium/cilium/api/v1/models"
. "github.com/cilium/cilium/api/v1/server/restapi/endpoint"
"github.com/cilium/cilium/pkg/api"
"github.com/cilium/cilium/pkg/completion"
"github.com/cilium/cilium/pkg/endpoint"
endpointid "github.com/cilium/cilium/pkg/endpoint/id"
"github.com/cilium/cilium/pkg/endpoint/regeneration"
"github.com/cilium/cilium/pkg/endpointmanager"
"github.com/cilium/cilium/pkg/k8s"
k8sConst "github.com/cilium/cilium/pkg/k8s/apis/cilium.io"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/maps/lxcmap"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/workloads"
"github.com/go-openapi/runtime/middleware"
)
type getEndpoint struct {
d *Daemon
}
func NewGetEndpointHandler(d *Daemon) GetEndpointHandler {
return &getEndpoint{d: d}
}
func (h *getEndpoint) Handle(params GetEndpointParams) middleware.Responder {
log.WithField(logfields.Params, logfields.Repr(params)).Debug("GET /endpoint request")
resEPs := getEndpointList(params)
if params.Labels != nil && len(resEPs) == 0 {
return NewGetEndpointNotFound()
}
return NewGetEndpointOK().WithPayload(resEPs)
}
func getEndpointList(params GetEndpointParams) []*models.Endpoint {
var (
epModelsWg, epsAppendWg sync.WaitGroup
convertedLabels labels.Labels
resEPs []*models.Endpoint
)
if params.Labels != nil {
// Convert params.Labels to model that we can compare with the endpoint's labels.
convertedLabels = labels.NewLabelsFromModel(params.Labels)
}
eps := endpointmanager.GetEndpoints()
epModelsCh := make(chan *models.Endpoint, len(eps))
epModelsWg.Add(len(eps))
for _, ep := range eps {
go func(wg *sync.WaitGroup, epChan chan<- *models.Endpoint, ep *endpoint.Endpoint) {
if ep.HasLabels(convertedLabels) {
epChan <- ep.GetModel()
}
wg.Done()
}(&epModelsWg, epModelsCh, ep)
}
epsAppendWg.Add(1)
// This needs to be done over channels since we might not receive all
// the existing endpoints since not all endpoints contain the list of
// labels that we will use to filter in `ep.HasLabels(convertedLabels)`
go func(epsAppended *sync.WaitGroup) {
for ep := range epModelsCh {
resEPs = append(resEPs, ep)
}
epsAppended.Done()
}(&epsAppendWg)
epModelsWg.Wait()
close(epModelsCh)
epsAppendWg.Wait()
return resEPs
}
type getEndpointID struct {
d *Daemon
}
func NewGetEndpointIDHandler(d *Daemon) GetEndpointIDHandler {
return &getEndpointID{d: d}
}
func (h *getEndpointID) Handle(params GetEndpointIDParams) middleware.Responder {
log.WithField(logfields.EndpointID, params.ID).Debug("GET /endpoint/{id} request")
ep, err := endpointmanager.Lookup(params.ID)
if err != nil {
return api.Error(GetEndpointIDInvalidCode, err)
} else if ep == nil {
return NewGetEndpointIDNotFound()
} else {
return NewGetEndpointIDOK().WithPayload(ep.GetModel())
}
}
type putEndpointID struct {
d *Daemon
}
func NewPutEndpointIDHandler(d *Daemon) PutEndpointIDHandler {
return &putEndpointID{d: d}
}
// fetchK8sLabels wraps the k8s package to fetch and provide
// endpoint metadata. It implements endpoint.MetadataResolverCB.
func fetchK8sLabels(ep *endpoint.Endpoint) (labels.Labels, labels.Labels, error) {
lbls, err := k8s.GetPodLabels(ep.GetK8sNamespace(), ep.GetK8sPodName())
if err != nil {
return nil, nil, err
}
k8sLbls := labels.Map2Labels(lbls, labels.LabelSourceK8s)
identityLabels, infoLabels := labels.FilterLabels(k8sLbls)
return identityLabels, infoLabels, nil
}
func invalidDataError(ep *endpoint.Endpoint, err error) (*endpoint.Endpoint, int, error) {
ep.Logger(daemonSubsys).WithError(err).Warning("Creation of endpoint failed due to invalid data")
ep.SetState(endpoint.StateInvalid, "Invalid endpoint")
return nil, PutEndpointIDInvalidCode, err
}
func (d *Daemon) errorDuringCreation(ep *endpoint.Endpoint, err error) (*endpoint.Endpoint, int, error) {
d.deleteEndpointQuiet(ep, endpoint.DeleteConfig{
// The IP has been provided by the caller and must be released
// by the caller
NoIPRelease: true,
})
ep.Logger(daemonSubsys).WithError(err).Warning("Creation of endpoint failed")
return nil, PutEndpointIDFailedCode, err
}
// createEndpoint attempts to create the endpoint corresponding to the change
// request that was specified.
func (d *Daemon) createEndpoint(ctx context.Context, epTemplate *models.EndpointChangeRequest) (*endpoint.Endpoint, int, error) {
if option.Config.EnableEndpointRoutes {
if epTemplate.DatapathConfiguration == nil {
epTemplate.DatapathConfiguration = &models.EndpointDatapathConfiguration{}
}
// Indicate to insert a per endpoint route instead of routing
// via cilium_host interface
epTemplate.DatapathConfiguration.InstallEndpointRoute = true
// Since routing occurs via endpoint interface directly, BPF
// program is needed on that device at egress as BPF program on
// cilium_host interface is bypassed
epTemplate.DatapathConfiguration.RequireEgressProg = true
// Delegate routing to the Linux stack rather than tail-calling
// between BPF programs.
disabled := false
epTemplate.DatapathConfiguration.RequireRouting = &disabled
}
ep, err := endpoint.NewEndpointFromChangeModel(d, epTemplate)
if err != nil {
return invalidDataError(ep, fmt.Errorf("unable to parse endpoint parameters: %s", err))
}
oldEp := endpointmanager.LookupCiliumID(ep.ID)
if oldEp != nil {
return invalidDataError(ep, fmt.Errorf("endpoint ID %d already exists", ep.ID))
}
oldEp = endpointmanager.LookupContainerID(ep.ContainerID)
if oldEp != nil {
return invalidDataError(ep, fmt.Errorf("endpoint for container %s already exists", ep.ContainerID))
}
var checkIDs []string
if ep.IPv4.IsSet() {
checkIDs = append(checkIDs, endpointid.NewID(endpointid.IPv4Prefix, ep.IPv4.String()))
}
if ep.IPv6.IsSet() {
checkIDs = append(checkIDs, endpointid.NewID(endpointid.IPv6Prefix, ep.IPv6.String()))
}
for _, id := range checkIDs {
oldEp, err := endpointmanager.Lookup(id)
if err != nil {
return invalidDataError(ep, err)
} else if oldEp != nil {
return invalidDataError(ep, fmt.Errorf("IP %s is already in use", id))
}
}
if err = endpoint.APICanModify(ep); err != nil {
return invalidDataError(ep, err)
}
addLabels := labels.NewLabelsFromModel(epTemplate.Labels)
infoLabels := labels.NewLabelsFromModel([]string{})
if len(addLabels) > 0 {
if lbls := addLabels.FindReserved(); lbls != nil {
return invalidDataError(ep, fmt.Errorf("not allowed to add reserved labels: %s", lbls))
}
addLabels, _, _ = checkLabels(addLabels, nil)
if len(addLabels) == 0 {
return invalidDataError(ep, fmt.Errorf("no valid labels provided"))
}
}
if ep.K8sNamespaceAndPodNameIsSet() && k8s.IsEnabled() {
identityLabels, info, err := fetchK8sLabels(ep)
if err != nil {
ep.Logger("api").WithError(err).Warning("Unable to fetch kubernetes labels")
} else {
addLabels.MergeLabels(identityLabels)
infoLabels.MergeLabels(info)
}
}
if len(addLabels) == 0 {
// If the endpoint has no labels, give the endpoint a special identity with
// label reserved:init so we can generate a custom policy for it until we
// get its actual identity.
addLabels = labels.Labels{
labels.IDNameInit: labels.NewLabel(labels.IDNameInit, "", labels.LabelSourceReserved),
}
}
// Static pods (mirror pods) might be configured before the apiserver
// is available or has received the notification that includes the
// static pod's labels. In this case, start a controller to attempt to
// resolve the labels.
if ep.K8sNamespaceAndPodNameIsSet() && k8s.IsEnabled() {
// If there are labels, but no pod namespace, then it's
// likely that there are no k8s labels at all. Resolve.
if _, k8sLabelsConfigured := addLabels[k8sConst.PodNamespaceLabel]; !k8sLabelsConfigured {
ep.RunMetadataResolver(fetchK8sLabels)
}
}
err = endpointmanager.AddEndpoint(d, ep, "Create endpoint from API PUT")
logger := ep.Logger(daemonSubsys)
if err != nil {
return d.errorDuringCreation(ep, fmt.Errorf("unable to insert endpoint into manager: %s", err))
}
ep.UpdateLabels(ctx, addLabels, infoLabels, true)
select {
case <-ctx.Done():
return d.errorDuringCreation(ep, fmt.Errorf("request cancelled while resolving identity"))
default:
}
if err := ep.LockAlive(); err != nil {
return d.errorDuringCreation(ep, fmt.Errorf("endpoint was deleted while processing the request"))
}
// Now that we have ep.ID we can pin the map from this point. This
// also has to happen before the first build took place.
if err = ep.PinDatapathMap(); err != nil {
ep.Unlock()
return d.errorDuringCreation(ep, fmt.Errorf("unable to pin datapath maps: %s", err))
}
build := ep.GetStateLocked() == endpoint.StateReady
if build {
ep.SetStateLocked(endpoint.StateWaitingToRegenerate, "Identity is known at endpoint creation time")
}
ep.Unlock()
if build {
// Do not synchronously regenerate the endpoint when first creating it.
// We have custom logic later for waiting for specific checkpoints to be
// reached upon regeneration later (checking for when BPF programs have
// been compiled), as opposed to waiting for the entire regeneration to
// be complete (including proxies being configured). This is done to
// avoid a chicken-and-egg problem with L7 policies are imported which
// select the endpoint being generated, as when such policies are
// imported, regeneration blocks on waiting for proxies to be
// configured. When Cilium is used with Istio, though, the proxy is
// started as a sidecar, and is not launched yet when this specific code
// is executed; if we waited for regeneration to be complete, including
// proxy configuration, this code would effectively deadlock addition
// of endpoints.
ep.Regenerate(®eneration.ExternalRegenerationMetadata{
Reason: "Initial build on endpoint creation",
ParentContext: ctx,
})
}
// Only used for CRI-O since it does not support events.
if d.workloadsEventsCh != nil && ep.GetContainerID() != "" {
d.workloadsEventsCh <- &workloads.EventMessage{
WorkloadID: ep.GetContainerID(),
EventType: workloads.EventTypeStart,
}
}
// Wait for endpoint to be in "ready" state if specified in API call.
if !epTemplate.SyncBuildEndpoint {
return ep, 0, nil
}
logger.Info("Waiting for endpoint to be generated")
// Default timeout for PUT /endpoint/{id} is 60 seconds, so put timeout
// in this function a bit below that timeout. If the timeout for clients
// in API is below this value, they will get a message containing
// "context deadline exceeded" if the operation takes longer than the
// client's configured timeout value.
ctx, cancel := context.WithTimeout(ctx, endpoint.EndpointGenerationTimeout)
// Check the endpoint's state and labels periodically.
ticker := time.NewTicker(1 * time.Second)
defer func() {
cancel()
ticker.Stop()
}()
// Wait for any successful BPF regeneration, which is indicated by any
// positive policy revision (>0). As long as at least one BPF
// regeneration is successful, the endpoint has network connectivity
// so we can return from the creation API call.
revCh := ep.WaitForPolicyRevision(ctx, 1, nil)
waitForSuccessfulBuild:
for {
select {
case <-revCh:
if ctx.Err() == nil {
// At least one BPF regeneration has successfully completed.
break waitForSuccessfulBuild
}
case <-ctx.Done():
case <-ticker.C:
if err := ep.RLockAlive(); err != nil {
return d.errorDuringCreation(ep, fmt.Errorf("endpoint was deleted while waiting for initial endpoint generation to complete"))
}
hasSidecarProxy := ep.HasSidecarProxy()
ep.RUnlock()
if hasSidecarProxy && ep.HasBPFProgram() {
// If the endpoint is determined to have a sidecar proxy,
// return immediately to let the sidecar container start,
// in case it is required to enforce L7 rules.
logger.Info("Endpoint has sidecar proxy, returning from synchronous creation request before regeneration has succeeded")
break waitForSuccessfulBuild
}
}
if ctx.Err() != nil {
return d.errorDuringCreation(ep, fmt.Errorf("timeout while waiting for initial endpoint generation to complete"))
}
}
// The endpoint has been successfully created, stop the expiration
// timers of all attached IPs
if addressing := epTemplate.Addressing; addressing != nil {
if uuid := addressing.IPV4ExpirationUUID; uuid != "" {
if ip := net.ParseIP(addressing.IPV4); ip != nil {
if err := d.ipam.StopExpirationTimer(ip, uuid); err != nil {
return d.errorDuringCreation(ep, err)
}
}
}
if uuid := addressing.IPV6ExpirationUUID; uuid != "" {
if ip := net.ParseIP(addressing.IPV6); ip != nil {
if err := d.ipam.StopExpirationTimer(ip, uuid); err != nil {
return d.errorDuringCreation(ep, err)
}
}
}
}
return ep, 0, nil
}
func (h *putEndpointID) Handle(params PutEndpointIDParams) middleware.Responder {
log.WithField(logfields.Params, logfields.Repr(params)).Debug("PUT /endpoint/{id} request")
epTemplate := params.Endpoint
ep, code, err := h.d.createEndpoint(params.HTTPRequest.Context(), epTemplate)
if err != nil {
return api.Error(code, err)
}
ep.Logger(daemonSubsys).Info("Successful endpoint creation")
return NewPutEndpointIDCreated()
}
type patchEndpointID struct {
d *Daemon
}
func NewPatchEndpointIDHandler(d *Daemon) PatchEndpointIDHandler {
return &patchEndpointID{d: d}
}
func validPatchTransitionState(state models.EndpointState) bool {
switch string(state) {
case "", endpoint.StateWaitingForIdentity, endpoint.StateReady:
return true
}
return false
}
func (h *patchEndpointID) Handle(params PatchEndpointIDParams) middleware.Responder {
scopedLog := log.WithField(logfields.Params, logfields.Repr(params))
scopedLog.Debug("PATCH /endpoint/{id} request")
epTemplate := params.Endpoint
// Validate the template. Assignment afterwards is atomic.
// Note: newEp's labels are ignored.
newEp, err2 := endpoint.NewEndpointFromChangeModel(h.d, epTemplate)
if err2 != nil {
return api.Error(PutEndpointIDInvalidCode, err2)
}
// Log invalid state transitions, but do not error out for backwards
// compatibility.
if !validPatchTransitionState(epTemplate.State) {
scopedLog.Debugf("PATCH /endpoint/{id} to invalid state '%s'", epTemplate.State)
}
ep, err := endpointmanager.Lookup(params.ID)
if err != nil {
return api.Error(GetEndpointIDInvalidCode, err)
}
if ep == nil {
return NewPatchEndpointIDNotFound()
}
if err = endpoint.APICanModify(ep); err != nil {
return api.Error(PatchEndpointIDInvalidCode, err)
}
// FIXME: Support changing these?
// - container ID
// - docker network id
// - docker endpoint id
//
// Support arbitrary changes? Support only if unset?
if err := ep.LockAlive(); err != nil {
return NewPatchEndpointIDNotFound()
}
changed := false
if epTemplate.InterfaceIndex != 0 && ep.IfIndex != newEp.IfIndex {
ep.IfIndex = newEp.IfIndex
changed = true
}
if epTemplate.InterfaceName != "" && ep.IfName != newEp.IfName {
ep.IfName = newEp.IfName
changed = true
}
// Only support transition to waiting-for-identity state, also
// if the request is for ready state, as we will check the
// existence of the security label below. Other transitions
// are always internally managed, but we do not error out for
// backwards compatibility.
if epTemplate.State != "" &&
validPatchTransitionState(epTemplate.State) &&
ep.GetStateLocked() != endpoint.StateWaitingForIdentity {
// Will not change state if the current state does not allow the transition.
if ep.SetStateLocked(endpoint.StateWaitingForIdentity, "Update endpoint from API PATCH") {
changed = true
}
}
if epTemplate.Mac != "" && bytes.Compare(ep.LXCMAC, newEp.LXCMAC) != 0 {
ep.LXCMAC = newEp.LXCMAC
changed = true
}
if epTemplate.HostMac != "" && bytes.Compare(ep.GetNodeMAC(), newEp.NodeMAC) != 0 {
ep.SetNodeMACLocked(newEp.NodeMAC)
changed = true
}
if epTemplate.Addressing != nil {
if ip := epTemplate.Addressing.IPV6; ip != "" && bytes.Compare(ep.IPv6, newEp.IPv6) != 0 {
ep.IPv6 = newEp.IPv6
changed = true
}
if ip := epTemplate.Addressing.IPV4; ip != "" && bytes.Compare(ep.IPv4, newEp.IPv4) != 0 {
ep.IPv4 = newEp.IPv4
changed = true
}
}
// TODO: Do something with the labels?
// addLabels := labels.NewLabelsFromModel(params.Endpoint.Labels)
// If desired state is waiting-for-identity but identity is already
// known, bump it to ready state immediately to force re-generation
if ep.GetStateLocked() == endpoint.StateWaitingForIdentity && ep.SecurityIdentity != nil {
ep.SetStateLocked(endpoint.StateReady, "Preparing to force endpoint regeneration because identity is known while handling API PATCH")
changed = true
}
reason := ""
if changed {
// Force policy regeneration as endpoint's configuration was changed.
// Other endpoints need not be regenerated as no labels were changed.
// Note that we still need to (eventually) regenerate the endpoint for
// the changes to take effect.
ep.ForcePolicyCompute()
// Transition to waiting-to-regenerate if ready.
if ep.GetStateLocked() == endpoint.StateReady {
ep.SetStateLocked(endpoint.StateWaitingToRegenerate, "Forcing endpoint regeneration because identity is known while handling API PATCH")
}
switch ep.GetStateLocked() {
case endpoint.StateWaitingToRegenerate:
reason = "Waiting on endpoint regeneration because identity is known while handling API PATCH"
case endpoint.StateWaitingForIdentity:
reason = "Waiting on endpoint initial program regeneration while handling API PATCH"
}
}
ep.UpdateLogger(nil)
ep.Unlock()
if reason != "" {
if err := ep.RegenerateWait(reason); err != nil {
return api.Error(PatchEndpointIDFailedCode, err)
}
// FIXME: Special return code to indicate regeneration happened?
}
return NewPatchEndpointIDOK()
}
func (d *Daemon) deleteEndpoint(ep *endpoint.Endpoint) int {
scopedLog := log.WithField(logfields.EndpointID, ep.ID)
errs := d.deleteEndpointQuiet(ep, endpoint.DeleteConfig{
// If the IP is managed by an external IPAM, it does not need to be released
NoIPRelease: ep.DatapathConfiguration.ExternalIPAM,
})
for _, err := range errs {
scopedLog.WithError(err).Warn("Ignoring error while deleting endpoint")
}
return len(errs)
}
// deleteEndpointQuiet sets the endpoint into disconnecting state and removes
// it from Cilium, releasing all resources associated with it such as its
// visibility in the endpointmanager, its BPF programs and maps, (optional) IP,
// L7 policy configuration, directories and controllers.
//
// Specific users such as the cilium-health EP may choose not to release the IP
// when deleting the endpoint. Most users should pass true for releaseIP.
func (d *Daemon) deleteEndpointQuiet(ep *endpoint.Endpoint, conf endpoint.DeleteConfig) []error {
// Only used for CRI-O since it does not support events.
if d.workloadsEventsCh != nil && ep.GetContainerID() != "" {
d.workloadsEventsCh <- &workloads.EventMessage{
WorkloadID: ep.GetContainerID(),
EventType: workloads.EventTypeDelete,
}
}
errs := []error{}
// Since the endpoint is being deleted, we no longer need to run events
// in its event queue. This is a no-op if the queue has already been
// closed elsewhere.
ep.EventQueue.Stop()
// Wait for the queue to be drained in case an event which is currently
// running for the endpoint tries to acquire the lock - we cannot be sure
// what types of events will be pushed onto the EventQueue for an endpoint
// and when they will happen. After this point, no events for the endpoint
// will be processed on its EventQueue, specifically regenerations.
ep.EventQueue.WaitToBeDrained()
// Wait for existing builds to complete and prevent further builds
ep.BuildMutex.Lock()
// Given that we are deleting the endpoint and that no more builds are
// going to occur for this endpoint, close the channel which signals whether
// the endpoint has its BPF program compiled or not to avoid it persisting
// if anything is blocking on it. If a delete request has already been
// enqueued for this endpoint, this is a no-op.
ep.CloseBPFProgramChannel()
// Lock out any other writers to the endpoint. In case multiple delete
// requests have been enqueued, have all of them except the first
// return here. Ignore the request if the endpoint is already
// disconnected.
if err := ep.LockAlive(); err != nil {
ep.BuildMutex.Unlock()
return []error{}
}
ep.SetStateLocked(endpoint.StateDisconnecting, "Deleting endpoint")
// Remove the endpoint before we clean up. This ensures it is no longer
// listed or queued for rebuilds.
endpointmanager.Remove(ep)
defer func() {
repr, err := monitorAPI.EndpointDeleteRepr(ep)
// Ignore endpoint deletion if EndpointDeleteRepr != nil
if err == nil {
d.SendNotification(monitorAPI.AgentNotifyEndpointDeleted, repr)
}
}()
// If dry mode is enabled, no changes to BPF maps are performed
if !option.Config.DryMode {
if errs2 := lxcmap.DeleteElement(ep); errs2 != nil {
errs = append(errs, errs2...)
}
if errs2 := ep.DeleteMapsLocked(); errs2 != nil {
errs = append(errs, errs2...)
}
}
if !conf.NoIPRelease {
if option.Config.EnableIPv4 {
if err := d.ipam.ReleaseIP(ep.IPv4.IP()); err != nil {
errs = append(errs, fmt.Errorf("unable to release ipv4 address: %s", err))
}
}
if option.Config.EnableIPv6 {
if err := d.ipam.ReleaseIP(ep.IPv6.IP()); err != nil {
errs = append(errs, fmt.Errorf("unable to release ipv6 address: %s", err))
}
}
}
completionCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
proxyWaitGroup := completion.NewWaitGroup(completionCtx)
errs = append(errs, ep.LeaveLocked(proxyWaitGroup, conf)...)
ep.Unlock()
err := ep.WaitForProxyCompletions(proxyWaitGroup)
if err != nil {
errs = append(errs, fmt.Errorf("unable to remove proxy redirects: %s", err))
}
cancel()
if option.Config.IsFlannelMasterDeviceSet() &&
option.Config.FlannelUninstallOnExit {
ep.DeleteBPFProgramLocked()
}
ep.BuildMutex.Unlock()
return errs
}
func (d *Daemon) DeleteEndpoint(id string) (int, error) {
if ep, err := endpointmanager.Lookup(id); err != nil {
return 0, api.Error(DeleteEndpointIDInvalidCode, err)
} else if ep == nil {
return 0, api.New(DeleteEndpointIDNotFoundCode, "endpoint not found")
} else if err = endpoint.APICanModify(ep); err != nil {
return 0, api.Error(DeleteEndpointIDInvalidCode, err)
} else {
return d.deleteEndpoint(ep), nil
}
}
type deleteEndpointID struct {
daemon *Daemon
}
func NewDeleteEndpointIDHandler(d *Daemon) DeleteEndpointIDHandler {
return &deleteEndpointID{daemon: d}
}
func (h *deleteEndpointID) Handle(params DeleteEndpointIDParams) middleware.Responder {
log.WithField(logfields.Params, logfields.Repr(params)).Debug("DELETE /endpoint/{id} request")
d := h.daemon
if nerr, err := d.DeleteEndpoint(params.ID); err != nil {
if apierr, ok := err.(*api.APIError); ok {
return apierr
}
return api.Error(DeleteEndpointIDErrorsCode, err)
} else if nerr > 0 {
return NewDeleteEndpointIDErrors().WithPayload(int64(nerr))
} else {
return NewDeleteEndpointIDOK()
}
}
// EndpointUpdate updates the options of the given endpoint and regenerates the endpoint
func (d *Daemon) EndpointUpdate(id string, cfg *models.EndpointConfigurationSpec) error {
ep, err := endpointmanager.Lookup(id)
if err != nil {
return api.Error(PatchEndpointIDInvalidCode, err)
} else if ep == nil {
return api.New(PatchEndpointIDConfigNotFoundCode, "endpoint %s not found", id)
} else if err = endpoint.APICanModify(ep); err != nil {
return api.Error(PatchEndpointIDInvalidCode, err)
}
if err := ep.Update(cfg); err != nil {
switch err.(type) {
case endpoint.UpdateValidationError:
return api.Error(PatchEndpointIDConfigInvalidCode, err)
default:
return api.Error(PatchEndpointIDConfigFailedCode, err)
}
}
if err := ep.RLockAlive(); err != nil {
return api.Error(PatchEndpointIDNotFoundCode, err)
}
endpointmanager.UpdateReferences(ep)
ep.RUnlock()
return nil
}
type patchEndpointIDConfig struct {
daemon *Daemon
}
func NewPatchEndpointIDConfigHandler(d *Daemon) PatchEndpointIDConfigHandler {
return &patchEndpointIDConfig{daemon: d}
}
func (h *patchEndpointIDConfig) Handle(params PatchEndpointIDConfigParams) middleware.Responder {
log.WithField(logfields.Params, logfields.Repr(params)).Debug("PATCH /endpoint/{id}/config request")
d := h.daemon
if err := d.EndpointUpdate(params.ID, params.EndpointConfiguration); err != nil {
if apierr, ok := err.(*api.APIError); ok {
return apierr
}
return api.Error(PatchEndpointIDFailedCode, err)
}
return NewPatchEndpointIDConfigOK()
}
type getEndpointIDConfig struct {
daemon *Daemon
}
func NewGetEndpointIDConfigHandler(d *Daemon) GetEndpointIDConfigHandler {
return &getEndpointIDConfig{daemon: d}
}
func (h *getEndpointIDConfig) Handle(params GetEndpointIDConfigParams) middleware.Responder {
log.WithField(logfields.Params, logfields.Repr(params)).Debug("GET /endpoint/{id}/config")
ep, err := endpointmanager.Lookup(params.ID)
if err != nil {
return api.Error(GetEndpointIDInvalidCode, err)
} else if ep == nil {
return NewGetEndpointIDConfigNotFound()
} else {
cfgStatus := &models.EndpointConfigurationStatus{
Realized: &models.EndpointConfigurationSpec{
LabelConfiguration: &models.LabelConfigurationSpec{
User: ep.OpLabels.Custom.GetModel(),
},
Options: *ep.Options.GetMutableModel(),
},
Immutable: *ep.Options.GetImmutableModel(),
}
return NewGetEndpointIDConfigOK().WithPayload(cfgStatus)
}
}
type getEndpointIDLabels struct {
daemon *Daemon
}
func NewGetEndpointIDLabelsHandler(d *Daemon) GetEndpointIDLabelsHandler {
return &getEndpointIDLabels{daemon: d}
}
func (h *getEndpointIDLabels) Handle(params GetEndpointIDLabelsParams) middleware.Responder {
log.WithField(logfields.Params, logfields.Repr(params)).Debug("GET /endpoint/{id}/labels")
ep, err := endpointmanager.Lookup(params.ID)
if err != nil {
return api.Error(GetEndpointIDInvalidCode, err)
}
if ep == nil {
return NewGetEndpointIDLabelsNotFound()
}
if err := ep.RLockAlive(); err != nil {
return api.Error(GetEndpointIDInvalidCode, err)
}
spec := &models.LabelConfigurationSpec{
User: ep.OpLabels.Custom.GetModel(),
}
cfg := models.LabelConfiguration{
Spec: spec,
Status: &models.LabelConfigurationStatus{
Realized: spec,
SecurityRelevant: ep.OpLabels.OrchestrationIdentity.GetModel(),
Derived: ep.OpLabels.OrchestrationInfo.GetModel(),
Disabled: ep.OpLabels.Disabled.GetModel(),
},
}
ep.RUnlock()
return NewGetEndpointIDLabelsOK().WithPayload(&cfg)
}
type getEndpointIDLog struct {
d *Daemon
}
func NewGetEndpointIDLogHandler(d *Daemon) GetEndpointIDLogHandler {
return &getEndpointIDLog{d: d}
}
func (h *getEndpointIDLog) Handle(params GetEndpointIDLogParams) middleware.Responder {
log.WithField(logfields.EndpointID, params.ID).Debug("GET /endpoint/{id}/log request")
ep, err := endpointmanager.Lookup(params.ID)
if err != nil {
return api.Error(GetEndpointIDLogInvalidCode, err)
} else if ep == nil {
return NewGetEndpointIDLogNotFound()
} else {
return NewGetEndpointIDLogOK().WithPayload(ep.Status.GetModel())
}
}
type getEndpointIDHealthz struct {
d *Daemon
}
func NewGetEndpointIDHealthzHandler(d *Daemon) GetEndpointIDHealthzHandler {
return &getEndpointIDHealthz{d: d}
}
func (h *getEndpointIDHealthz) Handle(params GetEndpointIDHealthzParams) middleware.Responder {
log.WithField(logfields.EndpointID, params.ID).Debug("GET /endpoint/{id}/log request")
ep, err := endpointmanager.Lookup(params.ID)
if err != nil {
return api.Error(GetEndpointIDHealthzInvalidCode, err)
} else if ep == nil {
return NewGetEndpointIDHealthzNotFound()
} else {
return NewGetEndpointIDHealthzOK().WithPayload(ep.GetHealthModel())
}
}
func checkLabels(add, del labels.Labels) (addLabels, delLabels labels.Labels, ok bool) {
addLabels, _ = labels.FilterLabels(add)
delLabels, _ = labels.FilterLabels(del)
if len(addLabels) == 0 && len(delLabels) == 0 {
return nil, nil, false
}
return addLabels, delLabels, true
}
// modifyEndpointIdentityLabelsFromAPI adds and deletes the given labels on given endpoint ID.
// Performs checks for whether the endpoint may be modified by an API call.
// The received `add` and `del` labels will be filtered with the valid label prefixes.
// The `add` labels take precedence over `del` labels, this means if the same
// label is set on both `add` and `del`, that specific label will exist in the
// endpoint's labels.
// Returns an HTTP response code and an error msg (or nil on success).
func (d *Daemon) modifyEndpointIdentityLabelsFromAPI(id string, add, del labels.Labels) (int, error) {
addLabels, delLabels, ok := checkLabels(add, del)
if !ok {
return 0, nil
}
if lbls := addLabels.FindReserved(); lbls != nil {
return PatchEndpointIDLabelsUpdateFailedCode, fmt.Errorf("Not allowed to add reserved labels: %s", lbls)
} else if lbls := delLabels.FindReserved(); lbls != nil {
return PatchEndpointIDLabelsUpdateFailedCode, fmt.Errorf("Not allowed to delete reserved labels: %s", lbls)
}
ep, err := endpointmanager.Lookup(id)
if err != nil {
return PatchEndpointIDInvalidCode, err
}
if ep == nil {
return PatchEndpointIDLabelsNotFoundCode, fmt.Errorf("Endpoint ID %s not found", id)
}
if err = endpoint.APICanModify(ep); err != nil {
return PatchEndpointIDInvalidCode, err
}
if err := ep.ModifyIdentityLabels(addLabels, delLabels); err != nil {
return PatchEndpointIDLabelsNotFoundCode, err
}
return PatchEndpointIDLabelsOKCode, nil
}
type putEndpointIDLabels struct {
daemon *Daemon
}
func NewPatchEndpointIDLabelsHandler(d *Daemon) PatchEndpointIDLabelsHandler {
return &putEndpointIDLabels{daemon: d}
}
func (h *putEndpointIDLabels) Handle(params PatchEndpointIDLabelsParams) middleware.Responder {
log.WithField(logfields.Params, logfields.Repr(params)).Debug("PATCH /endpoint/{id}/labels request")
d := h.daemon
mod := params.Configuration
lbls := labels.NewLabelsFromModel(mod.User)
ep, err := endpointmanager.Lookup(params.ID)
if err != nil {
return api.Error(PutEndpointIDInvalidCode, err)
} else if ep == nil {
return NewPatchEndpointIDLabelsNotFound()
}
if err := ep.RLockAlive(); err != nil {
return api.Error(PutEndpointIDInvalidCode, err)
}
add, del := ep.OpLabels.SplitUserLabelChanges(lbls)
ep.RUnlock()
code, err := d.modifyEndpointIdentityLabelsFromAPI(params.ID, add, del)
if err != nil {
return api.Error(code, err)
}
return NewPatchEndpointIDLabelsOK()
}