Skip to content

Commit

Permalink
Exposes some processing functions in the http module
Browse files Browse the repository at this point in the history
  • Loading branch information
gordallott authored and lukasmalkmus committed Dec 9, 2021
1 parent 8ad0f96 commit b48cd93
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
6 changes: 3 additions & 3 deletions http/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,16 @@ func (m *Multiplexer) multiplex(req *http.Request) error {
}

var (
ingestReq *pushRequest
ingestReq *PushRequest
err error
)

typ := req.Header.Get("Content-Type")
switch typ {
case "application/json":
ingestReq, err = decodeJSONPushRequest(req.Body)
ingestReq, err = DecodeJSONPushRequest(req.Body)
case "application/x-protobuf":
ingestReq, err = decodeProtoPushRequest(req.Body)
ingestReq, err = DecodeProtoPushRequest(req.Body)
default:
err = fmt.Errorf("unsupported Content-Type %v", typ)
}
Expand Down
24 changes: 12 additions & 12 deletions http/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
pb "github.com/grafana/loki/pkg/logproto"
)

type stream struct {
type Stream struct {
Entries []pb.Entry
Labels loghttp.LabelSet
}

type pushRequest struct {
Streams []stream
type PushRequest struct {
Streams []Stream
}

func convertLabelsString(str string) (map[string]string, error) {
Expand All @@ -32,7 +32,7 @@ func convertLabelsString(str string) (map[string]string, error) {
return labels, nil
}

func decodeProtoPushRequest(r io.Reader) (*pushRequest, error) {
func DecodeProtoPushRequest(r io.Reader) (*PushRequest, error) {
var req pb.PushRequest

b, err := ioutil.ReadAll(r)
Expand All @@ -47,8 +47,8 @@ func decodeProtoPushRequest(r io.Reader) (*pushRequest, error) {
return nil, err
}

ret := &pushRequest{
Streams: make([]stream, len(req.Streams)),
ret := &PushRequest{
Streams: make([]Stream, len(req.Streams)),
}

for i, s := range req.Streams {
Expand All @@ -57,7 +57,7 @@ func decodeProtoPushRequest(r io.Reader) (*pushRequest, error) {
return nil, err
}

ret.Streams[i] = stream{
ret.Streams[i] = Stream{
Labels: loghttp.LabelSet(labels),
Entries: s.Entries,
}
Expand All @@ -66,14 +66,14 @@ func decodeProtoPushRequest(r io.Reader) (*pushRequest, error) {
return ret, nil
}

func decodeJSONPushRequest(b io.Reader) (*pushRequest, error) {
func DecodeJSONPushRequest(b io.Reader) (*PushRequest, error) {
var req loghttp.PushRequest
if err := json.NewDecoder(b).Decode(&req); err != nil {
return nil, err
}

ret := &pushRequest{
Streams: make([]stream, len(req.Streams)),
ret := &PushRequest{
Streams: make([]Stream, len(req.Streams)),
}

for i, s := range req.Streams {
Expand All @@ -83,8 +83,8 @@ func decodeJSONPushRequest(b io.Reader) (*pushRequest, error) {
return ret, nil
}

func newStream(s *loghttp.Stream) stream {
ret := stream{
func newStream(s *loghttp.Stream) Stream {
ret := Stream{
Entries: make([]pb.Entry, len(s.Entries)),
Labels: s.Labels,
}
Expand Down

0 comments on commit b48cd93

Please sign in to comment.