Skip to content

Commit

Permalink
Merge pull request #2479 from influxdata/fix-influx-writes
Browse files Browse the repository at this point in the history
fix: influx gzip writes
  • Loading branch information
docmerlin authored Feb 12, 2021
2 parents dae7c8c + 6d92a1b commit 120d771
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 45 deletions.
56 changes: 40 additions & 16 deletions influxdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -325,7 +324,7 @@ func (c *HTTPClient) Ping(ctx context.Context) (time.Duration, string, error) {
}

func (c *HTTPClient) Write(bp BatchPoints) error {
b := bytes.Buffer{}
b := &bytes.Buffer{}
precision := bp.Precision()

u := c.url()
Expand All @@ -337,28 +336,26 @@ func (c *HTTPClient) Write(bp BatchPoints) error {
v.Set("consistency", bp.WriteConsistency())
u.RawQuery = v.Encode()

reqBody := io.Reader(&b)

if c.compression == "gzip" {
var err error
reqBody, err = CompressWithGzip(reqBody, gzip.DefaultCompression)
if err != nil {
return err
bodyWriteCloser := gzip.NewWriter(b)
for _, p := range bp.Points() {
if _, err := bodyWriteCloser.Write(p.BytesWithLineFeed(precision)); err != nil {
return err
}
}

}

for _, p := range bp.Points() {
if _, err := b.Write(p.Bytes(precision)); err != nil {
if err := bodyWriteCloser.Close(); err != nil {
return err
}

if err := b.WriteByte('\n'); err != nil {
return err
} else {
for _, p := range bp.Points() {
if _, err := b.Write(p.BytesWithLineFeed(precision)); err != nil {
return err
}
}
}

req, err := http.NewRequest("POST", u.String(), reqBody)
req, err := http.NewRequest("POST", u.String(), b)
if err != nil {
return err
}
Expand Down Expand Up @@ -571,6 +568,33 @@ func (p Point) Bytes(precision string) []byte {
return bytes
}

func (p Point) BytesWithLineFeed(precision string) []byte {
key := imodels.MakeKey([]byte(p.Name), imodels.NewTags(p.Tags))
fields := imodels.Fields(p.Fields).MarshalBinary()
kl := len(key)
fl := len(fields)
var bytes []byte

if p.Time.IsZero() {
bytes = make([]byte, fl+kl+2)
copy(bytes, key)
bytes[kl] = ' '
copy(bytes[kl+1:], fields)
} else {
timeStr := strconv.FormatInt(p.Time.UnixNano()/imodels.GetPrecisionMultiplier(precision), 10)
tl := len(timeStr)
bytes = make([]byte, fl+kl+tl+3)
copy(bytes, key)
bytes[kl] = ' '
copy(bytes[kl+1:], fields)
bytes[kl+fl+1] = ' '
copy(bytes[kl+fl+2:], []byte(timeStr))
}
bytes[len(bytes)-1] = '\n'
return bytes

}

// Simple type to create github.com/influxdata/kapacitor/influxdb clients.
type ClientCreator struct{}

Expand Down
54 changes: 54 additions & 0 deletions influxdb/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package influxdb

import (
"bytes"
"compress/gzip"
"encoding/json"
"io/ioutil"
Expand Down Expand Up @@ -196,6 +197,59 @@ func TestClient_Write(t *testing.T) {
}
}

func TestClient_WriteLarge(t *testing.T) {
expected := &bytes.Buffer{}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data Response
uncompressedBody, err := gzip.NewReader(r.Body)
if err != nil {
t.Error(err)
}
bod, err := ioutil.ReadAll(uncompressedBody)
if err != nil {
t.Error(err)
}
if r.Header.Get("Content-Encoding") != "gzip" {
t.Errorf("expected gzip Content-Encoding but got %s", r.Header.Get("Content-Encoding"))
}
if string(bod) != expected.String() {
t.Errorf("unexpected send, expected '%s', got '%s'", expected, string(bod))
}
w.WriteHeader(http.StatusNoContent)
_ = json.NewEncoder(w).Encode(data)
}))
defer ts.Close()

config := Config{URLs: []string{ts.URL}}
c, _ := NewHTTPClient(config)

bp, err := NewBatchPoints(BatchPointsConfig{})
if err != nil {
t.Errorf("unexpected error: %s", err)
}

startT := time.Date(1999, 11, 9, 0, 0, 0, 3, time.UTC)
for i := 0; i < 1000000; i++ {
pt := Point{
Name: "testpt",
Tags: map[string]string{"tag1": "tag1"},
Fields: map[string]interface{}{"value": 1},
Time: startT.Add(time.Second * time.Duration(i)),
}
bp.AddPoint(pt)
expected.Write(pt.BytesWithLineFeed(bp.Precision()))
}

if err != nil {
t.Errorf("unexpected error. expected %v, actual %v", nil, err)
}

err = c.Write(bp)
if err != nil {
t.Errorf("unexpected error. expected %v, actual %v", nil, err)
}
}

func TestClient_Write_noCompression(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data Response
Expand Down
29 changes: 0 additions & 29 deletions influxdb/gzip.go

This file was deleted.

5 changes: 5 additions & 0 deletions influxdb/point_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,10 @@ func TestPoint_Bytes(t *testing.T) {
t.Errorf("%s: Bytes() mismatch:\n actual: %v\n exp: %v",
test.name, got, exp)
}
if got, exp := string(p.BytesWithLineFeed(test.precision)), test.exp+"\n"; got != exp {
t.Errorf("%s: Bytes() mismatch:\n actual: %v\n exp: %v",
test.name, got, exp)
}

}
}

0 comments on commit 120d771

Please sign in to comment.