/
notify.go
111 lines (98 loc) · 2.74 KB
/
notify.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Package notify provides notification functionality.
package notify
import (
"context"
"fmt"
"sync"
"sync/atomic"
log "github.com/go-pkgz/lgr"
"github.com/umputun/remark/backend/app/store"
)
// Service delivers notifications to multiple destinations
type Service struct {
dataService Store
destinations []Destination
queue chan request
closed uint32 // non-zero means closed. uses uint instead of bool for atomic
ctx context.Context
cancel context.CancelFunc
}
// Destination defines interface for a given destination service, like telegram, email and so on
type Destination interface {
fmt.Stringer
Send(ctx context.Context, req request) error
}
// Store defines the minimal interface accessing stored commens used by notifier
type Store interface {
Get(locator store.Locator, id string) (store.Comment, error)
}
type request struct {
comment store.Comment
parent store.Comment
}
const defaultQueueSize = 100
const uiNav = "#remark42__comment-"
// NewService makes notification service routing comments to all destinations.
func NewService(dataService Store, size int, destinations ...Destination) *Service {
if size <= 0 {
size = defaultQueueSize
}
ctx, cancel := context.WithCancel(context.Background())
res := Service{
dataService: dataService,
queue: make(chan request, size),
destinations: destinations,
ctx: ctx,
cancel: cancel,
}
if len(destinations) > 0 {
go res.do()
}
log.Printf("[INFO] create notifier service, queue size=%d, destinations=%d", size, len(destinations))
return &res
}
// Submit comment to internal channel if not busy, drop if can't send
func (s *Service) Submit(comment store.Comment) {
if len(s.destinations) == 0 || atomic.LoadUint32(&s.closed) != 0 {
return
}
parentComment := store.Comment{}
if s.dataService != nil {
if p, err := s.dataService.Get(comment.Locator, comment.ParentID); err == nil {
parentComment = p
}
}
select {
case s.queue <- request{comment: comment, parent: parentComment}:
default:
log.Printf("[WARN] can't send comment notification to queue, %+v", comment)
}
}
// Close queue channel and wait for completion
func (s *Service) Close() {
if s.queue != nil {
log.Print("[DEBUG] close notifier")
close(s.queue)
s.cancel()
<-s.ctx.Done()
}
atomic.StoreUint32(&s.closed, 1)
}
func (s *Service) do() {
for c := range s.queue {
var wg sync.WaitGroup
wg.Add(len(s.destinations))
for _, dest := range s.destinations {
go func(d Destination) {
if err := d.Send(s.ctx, c); err != nil {
log.Printf("[WARN] failed to send to %s, %s", d, err)
}
wg.Done()
}(dest)
}
wg.Wait()
}
log.Print("[WARN] terminated notifier")
}
// NopService is do-nothing notifier, without destinations
var NopService = &Service{}