/
listener.go
320 lines (283 loc) · 11.1 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
// Copyright 2016-2019 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ipcache
import (
"context"
"fmt"
"net"
"os"
"sync"
"time"
"github.com/cilium/cilium/pkg/bpf"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/logging/logfields"
ipcacheMap "github.com/cilium/cilium/pkg/maps/ipcache"
"github.com/cilium/cilium/pkg/node"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/source"
"github.com/sirupsen/logrus"
)
var log = logging.DefaultLogger.WithField(logfields.LogSubsys, "datapath-ipcache")
// datapath is an interface to the datapath implementation, used to apply
// changes that are made within this module.
type datapath interface {
TriggerReloadWithoutCompile(reason string) (*sync.WaitGroup, error)
}
// BPFListener implements the ipcache.IPIdentityMappingBPFListener
// interface with an IPCache store that is backed by BPF maps.
//
// One listener is shared between callers of OnIPIdentityCacheChange() and the
// controller launched from OnIPIdentityCacheGC(). However, The listener is not
// updated after initialization so no locking is provided for access.
type BPFListener struct {
// bpfMap is the BPF map that this listener will update when events are
// received from the IPCache.
bpfMap *ipcacheMap.Map
// datapath allows this listener to trigger BPF program regeneration.
datapath datapath
}
func newListener(m *ipcacheMap.Map, d datapath) *BPFListener {
return &BPFListener{
bpfMap: m,
datapath: d,
}
}
// NewListener returns a new listener to push IPCache entries into BPF maps.
func NewListener(d datapath) *BPFListener {
return newListener(ipcacheMap.IPCache, d)
}
// OnIPIdentityCacheChange is called whenever there is a change of state in the
// IPCache (pkg/ipcache).
// TODO (FIXME): GH-3161.
//
// 'oldIPIDPair' is ignored here, because in the BPF maps an update for the
// IP->ID mapping will replace any existing contents; knowledge of the old pair
// is not required to upsert the new pair.
func (l *BPFListener) OnIPIdentityCacheChange(modType ipcache.CacheModification, cidr net.IPNet,
oldHostIP, newHostIP net.IP, oldID *identity.NumericIdentity, newID identity.NumericIdentity, encryptKey uint8) {
scopedLog := log
if option.Config.Debug {
scopedLog = log.WithFields(logrus.Fields{
logfields.IPAddr: cidr,
logfields.Identity: newID,
logfields.Modification: modType,
})
}
scopedLog.Debug("Daemon notified of IP-Identity cache state change")
// TODO - see if we can factor this into an interface under something like
// pkg/datapath instead of in the daemon directly so that the code is more
// logically located.
// Update BPF Maps.
key := ipcacheMap.NewKey(cidr.IP, cidr.Mask)
switch modType {
case ipcache.Upsert:
value := ipcacheMap.RemoteEndpointInfo{
SecurityIdentity: uint32(newID),
Key: encryptKey,
}
if newHostIP != nil {
// If the hostIP is specified and it doesn't point to
// the local host, then the ipcache should be populated
// with the hostIP so that this traffic can be guided
// to a tunnel endpoint destination.
externalIP := node.GetExternalIPv4()
if ip4 := newHostIP.To4(); ip4 != nil && !ip4.Equal(externalIP) {
copy(value.TunnelEndpoint[:], ip4)
}
}
err := l.bpfMap.Update(&key, &value)
if err != nil {
scopedLog.WithError(err).WithFields(logrus.Fields{
"key": key.String(),
"value": value.String(),
logfields.IPAddr: cidr,
logfields.Identity: newID,
logfields.Modification: modType,
}).Warning("unable to update bpf map")
}
case ipcache.Delete:
err := l.bpfMap.Delete(&key)
if err != nil {
scopedLog.WithError(err).WithFields(logrus.Fields{
"key": key.String(),
logfields.IPAddr: cidr,
logfields.Identity: newID,
logfields.Modification: modType,
}).Warning("unable to delete from bpf map")
}
default:
scopedLog.Warning("cache modification type not supported")
}
}
// updateStaleEntriesFunction returns a DumpCallback that will update the
// specified "keysToRemove" map with entries that exist in the BPF map which
// do not exist in the in-memory ipcache.
//
// Must be called while holding ipcache.IPIdentityCache.Lock for reading.
func updateStaleEntriesFunction(keysToRemove map[string]*ipcacheMap.Key) bpf.DumpCallback {
return func(key bpf.MapKey, _ bpf.MapValue) {
k := key.(*ipcacheMap.Key)
keyToIP := k.String()
// Don't RLock as part of the same goroutine.
if i, exists := ipcache.IPIdentityCache.LookupByPrefixRLocked(keyToIP); !exists {
switch i.Source {
case source.KVStore, source.Local:
// Cannot delete from map during callback because DumpWithCallback
// RLocks the map.
keysToRemove[keyToIP] = k.DeepCopy()
}
}
}
}
// handleMapShuffleFailure attempts to move the map with name 'backup' back to
// 'realized', and logs a warning message if this can't be achieved.
func handleMapShuffleFailure(src, dst string) {
backupPath := bpf.MapPath(src)
realizedPath := bpf.MapPath(dst)
if err := os.Rename(backupPath, realizedPath); err != nil {
log.WithError(err).WithFields(logrus.Fields{
logfields.BPFMapPath: realizedPath,
}).Warningf("Unable to recover during error renaming map paths")
}
}
// shuffleMaps attempts to move the map with name 'realized' to 'backup' and
// 'pending' to 'realized'. If an error occurs, attempts to return the maps
// back to their original paths.
func shuffleMaps(realized, backup, pending string) error {
realizedPath := bpf.MapPath(realized)
backupPath := bpf.MapPath(backup)
pendingPath := bpf.MapPath(pending)
if err := os.Rename(realizedPath, backupPath); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("Unable to back up existing ipcache: %s", err)
}
if err := os.Rename(pendingPath, realizedPath); err != nil {
handleMapShuffleFailure(backup, realized)
return fmt.Errorf("Unable to shift ipcache into new location: %s", err)
}
return nil
}
// garbageCollect implements GC of the ipcache map in one of two ways:
//
// On Linux 4.9, 4.10 or 4.16 and later:
// Periodically sweep through every element in the BPF map and check it
// against the in-memory copy of the map. If it doesn't exist in memory,
// delete the entry.
// On Linux 4.11 to 4.15:
// Create a brand new map, populate it with all of the IPCache entries from
// the in-memory cache, delete the old map, and trigger regeneration of all
// BPF programs so that they pick up the new map.
//
// Returns an error if garbage collection failed to occur.
func (l *BPFListener) garbageCollect(ctx context.Context) (*sync.WaitGroup, error) {
log.Debug("Running garbage collection for BPF IPCache")
if ipcacheMap.SupportsDelete() {
// Since controllers run asynchronously, need to make sure
// IPIdentityCache is not being updated concurrently while we
// do GC;
ipcache.IPIdentityCache.RLock()
defer ipcache.IPIdentityCache.RUnlock()
keysToRemove := map[string]*ipcacheMap.Key{}
if err := l.bpfMap.DumpWithCallback(updateStaleEntriesFunction(keysToRemove)); err != nil {
return nil, fmt.Errorf("error dumping ipcache BPF map: %s", err)
}
// Remove all keys which are not in in-memory cache from BPF map
// for consistency.
for _, k := range keysToRemove {
log.WithFields(logrus.Fields{logfields.BPFMapKey: k}).
Debug("deleting from ipcache BPF map")
if err := l.bpfMap.Delete(k); err != nil {
return nil, fmt.Errorf("error deleting key %s from ipcache BPF map: %s", k, err)
}
}
} else {
// Since controllers run asynchronously, need to make sure
// IPIdentityCache is not being updated concurrently while we
// do GC;
ipcache.IPIdentityCache.RLock()
// Populate the map at the new path
pendingMapName := fmt.Sprintf("%s_pending", ipcacheMap.Name)
pendingMap := ipcacheMap.NewMap(pendingMapName)
if _, err := pendingMap.OpenOrCreate(); err != nil {
ipcache.IPIdentityCache.RUnlock()
return nil, fmt.Errorf("Unable to create %s map: %s", pendingMapName, err)
}
pendingListener := newListener(pendingMap, l.datapath)
ipcache.IPIdentityCache.DumpToListenerLocked(pendingListener)
err := pendingMap.Close()
if err != nil {
log.WithError(err).WithField("map-name", pendingMapName).Warning("unable to close map")
}
// Move the maps around on the filesystem so that BPF reload
// will pick up the new paths without requiring recompilation.
backupMapName := fmt.Sprintf("%s_old", ipcacheMap.Name)
if err := shuffleMaps(ipcacheMap.Name, backupMapName, pendingMapName); err != nil {
ipcache.IPIdentityCache.RUnlock()
return nil, err
}
// Reopen the ipcache map so that new writes and reads will use
// the new map
if err := ipcacheMap.Reopen(); err != nil {
handleMapShuffleFailure(backupMapName, ipcacheMap.Name)
ipcache.IPIdentityCache.RUnlock()
return nil, err
}
// Unlock the ipcache as in order for
// TriggerReloadWithoutCompile() to succeed, other endpoint
// regenerations which are blocking on the ipcache lock may
// need to succeed first (#11946)
ipcache.IPIdentityCache.RUnlock()
wg, err := l.datapath.TriggerReloadWithoutCompile("datapath ipcache")
if err != nil {
// We can't really undo the map rename again as ipcache
// operations had already been permitted so the backup
// map is potentially outdated. Fail hard to restart
// the agent so we reconstruct the ipcache from
// scratch.
log.WithError(err).Fatal("Endpoint datapath reload triggered by ipcache GC failed. Inconsistent state.")
}
_ = os.RemoveAll(bpf.MapPath(backupMapName))
return wg, nil
}
return nil, nil
}
// OnIPIdentityCacheGC spawns a controller which synchronizes the BPF IPCache Map
// with the in-memory IP-Identity cache.
func (l *BPFListener) OnIPIdentityCacheGC() {
// This controller ensures that the in-memory IP-identity cache is in-sync
// with the BPF map on disk. These can get out of sync if the cilium-agent
// is offline for some time, as the maps persist on the BPF filesystem.
// In the case that there is some loss of event history in the key-value
// store (e.g., compaction in etcd), we cannot rely upon the key-value store
// fully to give us the history of all events. As such, periodically check
// for inconsistencies in the data-path with that in the agent to ensure
// consistent state.
controller.NewManager().UpdateController("ipcache-bpf-garbage-collection",
controller.ControllerParams{
DoFunc: func(ctx context.Context) error {
wg, err := l.garbageCollect(ctx)
if err != nil {
return err
}
if wg != nil {
wg.Wait()
}
return nil
},
RunInterval: 5 * time.Minute,
},
)
}