Skip to content

Commit

Permalink
fix: error conn event log (#1856)
Browse files Browse the repository at this point in the history
Signed-off-by: 逆流而上 <1666888816@qq.com>
  • Loading branch information
DokiDoki1103 committed Jan 19, 2024
1 parent e5722ab commit 3e3fa43
Showing 1 changed file with 29 additions and 55 deletions.
84 changes: 29 additions & 55 deletions event/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package event

import (
"fmt"
"github.com/goodrain/rainbond/pkg/gogo"
"io"
"os"
"strings"
Expand Down Expand Up @@ -63,13 +64,7 @@ type manager struct {

var defaultManager Manager

const (
//REQUESTTIMEOUT time out
REQUESTTIMEOUT = 1000 * time.Millisecond
//MAXRETRIES 重试
MAXRETRIES = 3 // Before we abandon
buffersize = 1000
)
const buffersize = 1000

// NewManager 创建manager
func NewManager(conf EventConfig) error {
Expand Down Expand Up @@ -107,64 +102,43 @@ func CloseManager() {
func (m *manager) Start() error {
m.lock.Lock()
defer m.lock.Unlock()
for i := 0; i < len(m.eventServer); i++ {
h := handle{
cacheChan: make(chan []byte, buffersize),
stop: make(chan struct{}),
server: m.eventServer[i],
manager: m,
ctx: m.ctx,
}
m.handles[m.eventServer[i]] = h
go h.HandleLog()
if len(m.eventServer) == 0 {
logrus.Errorf("event log server is empty , plase set it in config file.")
return nil
}
//if m.dis != nil {
// m.dis.AddProject("event_log_event_grpc", m)
//}
go m.GC()
return nil
}
defaultServer := m.eventServer[0]

// UpdateEndpoints -
func (m *manager) UpdateEndpoints(endpoints ...*config.Endpoint) {
m.lock.Lock()
defer m.lock.Unlock()
if endpoints == nil || len(endpoints) < 1 {
return
}
//清空不可用节点信息,以服务发现为主
m.abnormalServer = make(map[string]string)
//增加新节点
var new = make(map[string]string)
for _, end := range endpoints {
new[end.URL] = end.URL
if _, ok := m.handles[end.URL]; !ok {
err := gogo.Go(func(ctx context.Context) error {
for {
h := handle{
cacheChan: make(chan []byte, buffersize),
stop: make(chan struct{}),
server: end.URL,
server: defaultServer,
manager: m,
ctx: m.ctx,
}
m.handles[end.URL] = h
logrus.Infof("Add event server endpoint,%s", end.URL)
go h.HandleLog()
}
}
//删除旧节点
for k := range m.handles {
if _, ok := new[k]; !ok {
delete(m.handles, k)
logrus.Infof("Remove event server endpoint,%s", k)
m.handles[defaultServer] = h
err := h.HandleLog()
if err != nil {
time.Sleep(time.Second * 10)
logrus.Warnf("event log server %s connect error: %v. auto retry after 10 seconds ", defaultServer, err)
continue
}
return nil
}
})

if err != nil {
logrus.Errorf("event log server %s connect error, %v", defaultServer, err)
return err
}
var eventServer []string
for k := range new {
eventServer = append(eventServer, k)
}
m.eventServer = eventServer
m.config.EventLogServers = eventServer
logrus.Debugf("update event handle core success,handle core count:%d, event server count:%d", len(m.handles), len(m.eventServer))

go m.GC()
return nil
}

// UpdateEndpoints - 不需要去更新节点信息
func (m *manager) UpdateEndpoints(endpoints ...*config.Endpoint) {
}

// Error -
Expand Down

0 comments on commit 3e3fa43

Please sign in to comment.