Skip to content

Commit

Permalink
Refactor target storage. Prevent target duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
warmans committed Jul 28, 2021
1 parent 49206d0 commit 9e27a2c
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 61 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
/.tarballs
/bin
/VERSION
/*.iml
/*.iml
.cache
26 changes: 26 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,32 @@ build:
echo ">> building binaries"
go build -o ${BIN_DIR}/prometheus-aggregate-exporter -ldflags "-X main.Version=${GIT_TAG}" ./cmd


# Manual Testing
#----------------------------------------------------------------------
.PHONY: test.run-fixture-server
test.run-fixture-server:
cd fixture; go run serve.go

.PHONY: test.run
test.run: build
./bin/prometheus-aggregate-exporter \
-targets="t1=http://localhost:3000/histogram.txt,t2=http://localhost:3000/histogram-2.txt" \
-server.bind=":8080" \
-verbose=true \
-targets.dynamic.registration=true \
-targets.cache.path=".cache"

.PHONY: test.scrape
test.scrape:
curl localhost:8080/metrics

test.unregister:
curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d "name=t1&address=localhost:3000/histogram.txt" localhost:8080/unregister

test.register:
curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d "name=t1&address=localhost:3000/histogram.txt" localhost:8080/register

# Packaging
#-----------------------------------------------------------------------

Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ And you'll have Docker compile the binary and make it available under the image

Alternatively the image is available though docker-hub: https://hub.docker.com/r/warmans/prometheus-aggregate-exporter

### Manual Testing

You can run the exporter against some static fixture files by running the following make targets
in separate terminals.

```shell
$ make test.run-fixture-server
$ make test.run
```

then to view the `/metrics` page:

```shell
$ make test.scrape
```

### Example Usage
```
./bin/prometheus-aggregate-exporter \
Expand Down
177 changes: 117 additions & 60 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"os"
"strings"
"sync"

"crypto/tls"
"fmt"
Expand Down Expand Up @@ -41,8 +42,8 @@ var (
targetScrapeTimeout *int
targets *string
insecureSkipVerifyFlag *bool
cacheFilePath *string
dynamicRegistration *bool
cacheFilePath *string
dynamicRegistration *bool
)

func init() {
Expand Down Expand Up @@ -82,17 +83,23 @@ func main() {

if len(config.Targets) < 1 {
if *dynamicRegistration {
log.Print("WARN: no targets configured, using registration only")
log.Print("No initial targets configured, using registration only")
} else {
log.Fatal("No targets configured and dynamic registration is disabled")
log.Fatal("FATAL: No initial targets configured and dynamic registration is disabled.")
}
}

var cacheFile string
if *dynamicRegistration {
log.Println("Dynamic target registration enabled")
if *cacheFilePath != "" {
config.Targets = appendCachedTargets(config.Targets, *cacheFilePath)
log.Printf("Using targets cache file %s\n", *cacheFilePath)
cacheFile = *cacheFilePath
}
} else {
if *cacheFilePath != "" {
// cache makes no sense if dynamic registration is not enabled.
log.Printf("WARN: Dynamic registration is disabled but cache file path was given. Cache will be ignored.")
}
}

Expand All @@ -104,6 +111,8 @@ func main() {

aggregator := &Aggregator{HTTP: &http.Client{Timeout: time.Duration(config.Timeout) * time.Millisecond}}

targets := NewTargets(config.Targets, cacheFile)

mux := http.NewServeMux()
mux.HandleFunc("/metrics", func(rw http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
Expand All @@ -114,13 +123,14 @@ func main() {
}
if t := r.Form.Get("t"); t != "" {
targetKey, err := strconv.Atoi(t)
if err != nil || len(config.Targets)-1 < targetKey {
targetList := targets.Targets()
if err != nil || len(targetList)-1 < targetKey {
http.Error(rw, "Bad Request", http.StatusBadRequest)
return
}
aggregator.Aggregate([]string{config.Targets[targetKey]}, rw)
aggregator.Aggregate([]string{targetList[targetKey]}, rw)
} else {
aggregator.Aggregate(config.Targets, rw)
aggregator.Aggregate(targets.Targets(), rw)
}
})
mux.HandleFunc("/alive", func(rw http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -148,11 +158,7 @@ func main() {
}

uri := schema + "://" + address
target := name + "=" + uri
config.Targets = append(config.Targets, target)
if *cacheFilePath != "" {
saveTargets(config.Targets, *cacheFilePath)
}
targets.AddTarget(name + "=" + uri)
log.Printf("Registered target %s with name %s\n", uri, name)
})
mux.HandleFunc("/unregister", func(rw http.ResponseWriter, r *http.Request) {
Expand All @@ -175,17 +181,13 @@ func main() {
}

uri := schema + "://" + address
target := name + "=" + uri
config.Targets = removeTarget(config.Targets, target)
if *cacheFilePath != "" {
saveTargets(config.Targets, *cacheFilePath)
}
targets.RemoveTarget(name + "=" + uri)
log.Printf("Unregistered target %s with name %s\n", uri, name)
})
}

log.Printf("Starting server on %s with targets:\n", config.Server.Bind)
for _, t := range config.Targets {
for _, t := range targets.Targets() {
log.Printf(" - %s\n", t)
}

Expand All @@ -212,67 +214,114 @@ func main() {

}

type Result struct {
URL string
Name string
SecondsTaken float64
MetricFamily map[string]*io_prometheus_client.MetricFamily
Error error
func NewTargets(initialTargets []string, cachePath string) *Targets {
t := &Targets{
cachePath: cachePath,
targets: make(map[string]struct{}),
lock: sync.RWMutex{},
}
t.tryLoadCache()
for _, v := range initialTargets {
t.AddTarget(v)
}
return t
}

type Aggregator struct {
HTTP *http.Client
type Targets struct {
cachePath string
targets map[string]struct{}
lock sync.RWMutex
}

func indexOf(element string, data []string) int {
for k, v := range data {
if element == v {
return k
}
func (t *Targets) AddTarget(target string) {
target = strings.TrimSpace(target)
if target == "" {
return
}
return -1 //not found.
t.lock.Lock()
defer func() {
t.lock.Unlock()
t.updateCache()
}()
t.targets[target] = struct{}{}
}

func removeTarget(targets []string, target string) []string {
index := indexOf(target, targets)
if index == -1 {
log.Printf("There is no currently registered target %s", target)
return targets
}
targets[index] = targets[len(targets)-1]
// We do not need to put s[i] at the end, as it will be discarded anyway
return targets[:len(targets)-1]
func (t *Targets) RemoveTarget(target string) {
target = strings.TrimSpace(target)
t.lock.Lock()
defer func() {
t.lock.Unlock()
t.updateCache()
}()
delete(t.targets, target)
}

func appendCachedTargets(targets []string, cacheFilePath string) []string {
targetsFromFile, err := readLines(cacheFilePath)
result := targets
if err == nil {
for i := range targetsFromFile {
target := targetsFromFile[i]
if indexOf(target, result) == -1 {
result = append(result, target)
log.Printf("Recovered target %s from cache file\n", target)
}
}
func (t *Targets) Targets() []string {
t.lock.RLock()
defer t.lock.RUnlock()

ts := []string{}
for k := range t.targets {
ts = append(ts, k)
}

return result
return ts
}

func saveTargets(targets []string, cacheFilePath string){
err := writeLines(targets, cacheFilePath)
func (t *Targets) updateCache() {
if t.cachePath == "" {
return
}
err := writeLines(t.Targets(), t.cachePath)
if err != nil {
log.Fatal("Error while saving targets cache")
}
}

func (t *Targets) tryLoadCache() {
if t.cachePath == "" {
return
}
targetsFromFile, err := readLines(t.cachePath)
if err == nil {
for _, v := range targetsFromFile {
t.AddTarget(v)
log.Printf("Recovered target %s from cache file\n", v)
}
} else {
log.Printf("Failed to load cache: %s\n", err.Error())
}
}

type Result struct {
URL string
Name string
SecondsTaken float64
MetricFamily map[string]*io_prometheus_client.MetricFamily
Error error
}

type Aggregator struct {
HTTP *http.Client
}

func readLines(path string) ([]string, error) {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return []string{}, nil
} else {
return nil, err
}
}
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
defer func() {
if err := file.Close(); err != nil {
log.Printf("WARN: failed to close cache file after reading")
}
}()

var lines []string
scanner := bufio.NewScanner(file)
Expand All @@ -288,11 +337,17 @@ func writeLines(lines []string, path string) error {
if err != nil {
return err
}
defer file.Close()
defer func() {
if err := file.Close(); err != nil {
log.Printf("WARN: failed to close cache file after writing")
}
}()

w := bufio.NewWriter(file)
for _, line := range lines {
fmt.Fprintln(w, line)
if _, err := fmt.Fprintln(w, line); err != nil {
return err
}
}
return w.Flush()
}
Expand Down Expand Up @@ -346,7 +401,9 @@ func (f *Aggregator) Aggregate(targets []string, output io.Writer) {

encoder := expfmt.NewEncoder(output, expfmt.FmtText)
for _, f := range allFamilies {
encoder.Encode(f)
if err := encoder.Encode(f); err != nil {
log.Printf("Failed to encode familty: %s", err.Error())
}
}

}(len(targets), resultChan)
Expand Down

0 comments on commit 9e27a2c

Please sign in to comment.