This repository has been archived by the owner on Jun 1, 2023. It is now read-only.
/
stream.go
153 lines (141 loc) · 3.89 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package logic
import (
"fmt"
log "github.com/sirupsen/logrus"
"github.com/suzuki-shunsuke/go-graylog"
"github.com/suzuki-shunsuke/go-graylog/validator"
)
// HasStream returns whether the stream exists.
func (lgc *Logic) HasStream(id string) (bool, error) {
return lgc.store.HasStream(id)
}
// GetStream returns a stream.
func (lgc *Logic) GetStream(id string) (*graylog.Stream, int, error) {
if err := ValidateObjectID(id); err != nil {
// unfortunately graylog returns not 400 but 404.
return nil, 404, err
}
stream, err := lgc.store.GetStream(id)
if err != nil {
return nil, 500, err
}
if stream == nil {
return nil, 404, fmt.Errorf("no stream found with id <%s>", id)
}
return stream, 200, nil
}
// AddStream adds a stream to the Server.
func (lgc *Logic) AddStream(stream *graylog.Stream) (int, error) {
if err := validator.CreateValidator.Struct(stream); err != nil {
return 400, err
}
// check index set existence
is, sc, err := lgc.GetIndexSet(stream.IndexSetID)
if err != nil {
LogWE(sc, lgc.Logger().WithFields(log.Fields{
"error": err, "index_set_id": stream.IndexSetID, "status_code": sc,
}), "failed to get an index set")
return sc, err
}
if !is.Writable {
return 400, fmt.Errorf("assigned index set must be writable")
}
if err := lgc.store.AddStream(stream); err != nil {
return 500, err
}
return 200, nil
}
// UpdateStream updates a stream at the Server.
func (lgc *Logic) UpdateStream(prms *graylog.StreamUpdateParams) (*graylog.Stream, int, error) {
if prms == nil {
return nil, 400, fmt.Errorf("stream is nil")
}
if err := validator.UpdateValidator.Struct(prms); err != nil {
return nil, 400, err
}
stream, sc, err := lgc.GetStream(prms.ID)
if err != nil {
LogWE(sc, lgc.Logger().WithFields(log.Fields{
"error": err, "id": prms.ID, "status_code": sc,
}), "failed to get a stream")
return nil, sc, err
}
if stream.IsDefault {
return nil, 400, fmt.Errorf("the default stream cannot be edited")
}
// check index set existence
if prms.IndexSetID != "" {
is, sc, err := lgc.GetIndexSet(prms.IndexSetID)
if err != nil {
LogWE(sc, lgc.Logger().WithFields(log.Fields{
"error": err, "index_set_id": prms.IndexSetID, "status_code": sc,
}), "failed to get an index set")
return nil, sc, err
}
if !is.Writable {
return nil, 400, fmt.Errorf("assigned index set must be writable")
}
}
s, err := lgc.store.UpdateStream(prms)
if err != nil {
return nil, 500, err
}
return s, 200, nil
}
// DeleteStream deletes a stream from the Server.
func (lgc *Logic) DeleteStream(id string) (int, error) {
ok, err := lgc.HasStream(id)
if err != nil {
lgc.Logger().WithFields(log.Fields{
"error": err, "id": id,
}).Error("lgc.HasStream() is failure")
return 500, err
}
if !ok {
return 404, fmt.Errorf("no stream found with id <%s>", id)
}
if err := lgc.store.DeleteStream(id); err != nil {
return 500, err
}
return 200, nil
}
// GetStreams returns a list of all streams.
func (lgc *Logic) GetStreams() ([]graylog.Stream, int, int, error) {
streams, total, err := lgc.store.GetStreams()
if err != nil {
return nil, 0, 500, err
}
return streams, total, 200, nil
}
// GetEnabledStreams returns all enabled streams.
func (lgc *Logic) GetEnabledStreams() ([]graylog.Stream, int, int, error) {
streams, total, err := lgc.store.GetEnabledStreams()
if err != nil {
return nil, 0, 500, err
}
return streams, total, 200, nil
}
// PauseStream pauses a stream.
func (lgc *Logic) PauseStream(id string) (int, error) {
ok, err := lgc.HasStream(id)
if err != nil {
return 500, err
}
if !ok {
return 404, fmt.Errorf("no stream found with id <%s>", id)
}
// TODO pause
return 200, nil
}
// ResumeStream resumes a stream.
func (lgc *Logic) ResumeStream(id string) (int, error) {
ok, err := lgc.HasStream(id)
if err != nil {
return 500, err
}
if !ok {
return 404, fmt.Errorf("no stream found with id <%s>", id)
}
// TODO resume
return 200, nil
}