-
Notifications
You must be signed in to change notification settings - Fork 0
/
update.go
183 lines (151 loc) · 3.9 KB
/
update.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package nrupdate
import (
"context"
"database/sql"
"net/http"
"os"
"github.com/kentik/ktranslate"
"github.com/kentik/ktranslate/pkg/eggs/baseserver"
"github.com/kentik/ktranslate/pkg/eggs/logger"
"github.com/kentik/ktranslate/pkg/kt"
"github.com/judwhite/go-svc"
_ "github.com/lib/pq" // Load postgres driver
)
const (
PG_RW_CON = "PG_CONNECTION_RW"
PG_RO_CON = "PG_CONNECTION"
)
type NRUpdate struct {
logger.ContextL
pgdb *sql.DB
pgdbRW *sql.DB
targetHost string
cfg *ktranslate.Config
}
func NewNRUpdate(targetHost string, cfg *ktranslate.Config, log logger.ContextL) (*NRUpdate, error) {
return &NRUpdate{
ContextL: log,
targetHost: targetHost,
cfg: cfg,
}, nil
}
// GetStatus implements the baseserver.Service interface.
func (kc *NRUpdate) GetStatus() []byte {
return []byte("OK")
}
// RunHealthCheck implements the baseserver.Service interface.
func (kc *NRUpdate) RunHealthCheck(ctx context.Context, result *baseserver.HealthCheckResult) {
}
// HttpInfo implements the baseserver.Service interface.
func (kc *NRUpdate) HttpInfo(w http.ResponseWriter, r *http.Request) {}
func (kc *NRUpdate) Run(ctx context.Context) error {
defer kc.cleanup()
// Connect PG
if db, err := sql.Open("postgres", os.Getenv(PG_RO_CON)); err == nil {
kc.pgdb = db
kc.Infof("Connected to PG")
} else {
return err
}
if db, err := sql.Open("postgres", os.Getenv(PG_RW_CON)); err == nil {
kc.pgdbRW = db
kc.Infof("Connected to PG RW")
} else {
return err
}
// First update the device_alert field to send flow to the target host for any missing devices.
if err := kc.updateNRAlerts(ctx); err != nil {
return err
}
// Then update our config file with the lastest info.
if err := kc.updateConfigFile(ctx); err != nil {
return err
}
// Finally, write out the new config.
return kc.cfg.SaveConfig()
}
// These are needed in case we are running under windows.
func (kc *NRUpdate) Init(env svc.Environment) error {
return nil
}
func (kc *NRUpdate) Start() error {
go kc.Run(context.Background())
return nil
}
func (kc *NRUpdate) Stop() error {
return kc.cleanup()
}
func (kc *NRUpdate) cleanup() error {
if kc.pgdb != nil {
kc.pgdb.Close()
}
if kc.pgdbRW != nil {
kc.pgdbRW.Close()
}
return nil
}
func (n *NRUpdate) updateNRAlerts(ctx context.Context) error {
res, err := n.pgdbRW.ExecContext(ctx, `
update
mn_device set
edate=now(),
device_alert = $1
where
device_name = 'ksynth'
and device_alert <> $2
and company_id in (
select id from mn_company
where exist(company_kvs, 'nr_api_key') and company_status = 'V'
)
`, "127.0.0.1:9456,"+n.targetHost, "127.0.0.1:9456,"+n.targetHost)
if err != nil {
return err
}
count, err := res.RowsAffected()
n.Infof("Updated %d alert devices", count)
return nil
}
func (n *NRUpdate) updateConfigFile(ctx context.Context) error {
rows, err := n.pgdb.QueryContext(ctx, `
select
a.id,
company_kvs->'nr_api_key' as api_key,
company_kvs->'nr_account_id' as account_id,
user_email,
user_kvs->'api_token' as kentik_api
from
mn_company as a
join
mn_user as b
on (a.id = b.company_id)
where
exist(company_kvs, 'nr_api_key')
and company_status = 'V'
and user_email like 'ksynth-owners+%@kentik.com'
order by a.id
`)
if err != nil {
return err
}
defer rows.Close()
// Loop through rows, using Scan to assign column data to struct fields.
newCreds := []ktranslate.KentikCred{}
newNR := map[int]ktranslate.NRCred{}
for rows.Next() {
var companyID kt.Cid
var kc ktranslate.KentikCred
var nr ktranslate.NRCred
if err := rows.Scan(&companyID, &nr.NRApiToken, &nr.NRAccount, &kc.APIEmail, &kc.APIToken); err != nil {
return err
}
newCreds = append(newCreds, kc)
newNR[int(companyID)] = nr
}
if err = rows.Err(); err != nil {
return err
}
n.Infof("Found %d creds to map", len(newCreds))
n.cfg.KentikCreds = newCreds
n.cfg.NewRelicMultiSink.CredMap = newNR
return nil
}