Skip to content

Commit

Permalink
Revert "return errors during file ingest (#540)" (#594)
Browse files Browse the repository at this point in the history
This reverts commit 5cb2bdd.
  • Loading branch information
juggernot325 authored May 3, 2024
1 parent 356e6f5 commit ff337b1
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 114 deletions.
5 changes: 5 additions & 0 deletions cmd/api/src/api/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ import (
"github.com/unrolled/secure"
)

const (
// Default timeout for any request is thirty seconds
defaultTimeout = 30 * time.Second
)

// Wrapper is an iterator for middleware function application that wraps around a http.Handler.
type Wrapper struct {
middleware []mux.MiddlewareFunc
Expand Down
13 changes: 13 additions & 0 deletions cmd/api/src/api/signature.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,16 @@ func SignRequestAtTime(hasher func() hash.Hash, id string, token string, datetim
func SignRequest(tokenID, token string, request *http.Request) error {
return SignRequestAtTime(sha256.New, tokenID, token, time.Now(), request)
}

type readerDelegatedCloser struct {
source io.Reader
closer io.Closer
}

func (s readerDelegatedCloser) Read(p []byte) (n int, err error) {
return s.source.Read(p)
}

func (s readerDelegatedCloser) Close() error {
return s.closer.Close()
}
2 changes: 1 addition & 1 deletion cmd/api/src/api/v2/file_uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (s Resources) ProcessFileUpload(response http.ResponseWriter, request *http
}

if !IsValidContentTypeForUpload(request.Header) {
api.WriteErrorResponse(request.Context(), api.BuildErrorResponse(http.StatusBadRequest, "Content type must be application/json or application/zip", request), response)
api.WriteErrorResponse(request.Context(), api.BuildErrorResponse(http.StatusBadRequest, fmt.Sprintf("Content type must be application/json or application/zip"), request), response)
} else if fileUploadJobID, err := strconv.Atoi(fileUploadJobIdString); err != nil {
api.WriteErrorResponse(request.Context(), api.BuildErrorResponse(http.StatusBadRequest, api.ErrorResponseDetailsIDMalformed, request), response)
} else if fileUploadJob, err := fileupload.GetFileUploadJobByID(request.Context(), s.DB, int64(fileUploadJobID)); err != nil {
Expand Down
53 changes: 15 additions & 38 deletions cmd/api/src/daemons/datapipe/decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package datapipe
import (
"errors"
"github.com/specterops/bloodhound/dawgs/graph"
"github.com/specterops/bloodhound/dawgs/util"
"github.com/specterops/bloodhound/ein"
"github.com/specterops/bloodhound/log"
"io"
Expand All @@ -41,7 +40,6 @@ func decodeBasicData[T any](batch graph.Batch, reader io.ReadSeeker, conversionF
var (
count = 0
convertedData ConvertedData
errs = util.NewErrorCollector()
)

for decoder.More() {
Expand All @@ -58,22 +56,17 @@ func decodeBasicData[T any](batch graph.Batch, reader io.ReadSeeker, conversionF
}

if count == IngestCountThreshold {
if err = IngestBasicData(batch, convertedData); err != nil {
errs.Add(err)
}
IngestBasicData(batch, convertedData)
convertedData.Clear()
count = 0

}
}

if count > 0 {
if err = IngestBasicData(batch, convertedData); err != nil {
errs.Add(err)
}
IngestBasicData(batch, convertedData)
}

return errs.Combined()
return nil
}

func decodeGroupData(batch graph.Batch, reader io.ReadSeeker) error {
Expand All @@ -85,12 +78,11 @@ func decodeGroupData(batch graph.Batch, reader io.ReadSeeker) error {
var (
convertedData = ConvertedGroupData{}
count = 0
errs = util.NewErrorCollector()
)

for decoder.More() {
var group ein.Group
if err = decoder.Decode(&group); err != nil {
if err := decoder.Decode(&group); err != nil {
log.Errorf("Error decoding group object: %v", err)
if errors.Is(err, io.EOF) {
break
Expand All @@ -99,23 +91,18 @@ func decodeGroupData(batch graph.Batch, reader io.ReadSeeker) error {
count++
convertGroupData(group, &convertedData)
if count == IngestCountThreshold {
if err = IngestGroupData(batch, convertedData); err != nil {
errs.Add(err)
}

IngestGroupData(batch, convertedData)
convertedData.Clear()
count = 0
}
}
}

if count > 0 {
if err = IngestGroupData(batch, convertedData); err != nil {
errs.Add(err)
}
IngestGroupData(batch, convertedData)
}

return errs.Combined()
return nil
}

func decodeSessionData(batch graph.Batch, reader io.ReadSeeker) error {
Expand All @@ -127,11 +114,10 @@ func decodeSessionData(batch graph.Batch, reader io.ReadSeeker) error {
var (
convertedData = ConvertedSessionData{}
count = 0
errs = util.NewErrorCollector()
)
for decoder.More() {
var session ein.Session
if err = decoder.Decode(&session); err != nil {
if err := decoder.Decode(&session); err != nil {
log.Errorf("Error decoding session object: %v", err)
if errors.Is(err, io.EOF) {
break
Expand All @@ -140,22 +126,18 @@ func decodeSessionData(batch graph.Batch, reader io.ReadSeeker) error {
count++
convertSessionData(session, &convertedData)
if count == IngestCountThreshold {
if err = IngestSessions(batch, convertedData.SessionProps); err != nil {
errs.Add(err)
}
IngestSessions(batch, convertedData.SessionProps)
convertedData.Clear()
count = 0
}
}
}

if count > 0 {
if err = IngestSessions(batch, convertedData.SessionProps); err != nil {
errs.Add(err)
}
IngestSessions(batch, convertedData.SessionProps)
}

return errs.Combined()
return nil
}

func decodeAzureData(batch graph.Batch, reader io.ReadSeeker) error {
Expand All @@ -167,12 +149,11 @@ func decodeAzureData(batch graph.Batch, reader io.ReadSeeker) error {
var (
convertedData = ConvertedAzureData{}
count = 0
errs = util.NewErrorCollector()
)

for decoder.More() {
var data AzureBase
if err = decoder.Decode(&data); err != nil {
if err := decoder.Decode(&data); err != nil {
log.Errorf("Error decoding azure object: %v", err)
if errors.Is(err, io.EOF) {
break
Expand All @@ -182,20 +163,16 @@ func decodeAzureData(batch graph.Batch, reader io.ReadSeeker) error {
convert(data.Data, &convertedData)
count++
if count == IngestCountThreshold {
if err = IngestAzureData(batch, convertedData); err != nil {
errs.Add(err)
}
IngestAzureData(batch, convertedData)
convertedData.Clear()
count = 0
}
}
}

if count > 0 {
if err = IngestAzureData(batch, convertedData); err != nil {
errs.Add(err)
}
IngestAzureData(batch, convertedData)
}

return errs.Combined()
return nil
}
96 changes: 21 additions & 75 deletions cmd/api/src/daemons/datapipe/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package datapipe

import (
"fmt"
"github.com/specterops/bloodhound/dawgs/util"
"github.com/specterops/bloodhound/src/model/ingest"
"github.com/specterops/bloodhound/src/services/fileupload"
"io"
Expand All @@ -45,54 +44,21 @@ func ReadFileForIngest(batch graph.Batch, reader io.ReadSeeker, adcsEnabled bool
}
}

func IngestBasicData(batch graph.Batch, converted ConvertedData) error {
errs := util.NewErrorCollector()

if err := IngestNodes(batch, ad.Entity, converted.NodeProps); err != nil {
errs.Add(err)
}

if err := IngestRelationships(batch, ad.Entity, converted.RelProps); err != nil {
errs.Add(err)
}

return errs.Combined()
func IngestBasicData(batch graph.Batch, converted ConvertedData) {
IngestNodes(batch, ad.Entity, converted.NodeProps)
IngestRelationships(batch, ad.Entity, converted.RelProps)
}

func IngestGroupData(batch graph.Batch, converted ConvertedGroupData) error {
errs := util.NewErrorCollector()

if err := IngestNodes(batch, ad.Entity, converted.NodeProps); err != nil {
errs.Add(err)
}

if err := IngestRelationships(batch, ad.Entity, converted.RelProps); err != nil {
errs.Add(err)
}

if err := IngestDNRelationships(batch, converted.DistinguishedNameProps); err != nil {
errs.Add(err)
}

return errs.Combined()
func IngestGroupData(batch graph.Batch, converted ConvertedGroupData) {
IngestNodes(batch, ad.Entity, converted.NodeProps)
IngestRelationships(batch, ad.Entity, converted.RelProps)
IngestDNRelationships(batch, converted.DistinguishedNameProps)
}

func IngestAzureData(batch graph.Batch, converted ConvertedAzureData) error {
errs := util.NewErrorCollector()

if err := IngestNodes(batch, azure.Entity, converted.NodeProps); err != nil {
errs.Add(err)
}

if err := IngestNodes(batch, ad.Entity, converted.OnPremNodes); err != nil {
errs.Add(err)
}

if err := IngestRelationships(batch, azure.Entity, converted.RelProps); err != nil {
errs.Add(err)
}

return errs.Combined()
func IngestAzureData(batch graph.Batch, converted ConvertedAzureData) {
IngestNodes(batch, azure.Entity, converted.NodeProps)
IngestNodes(batch, ad.Entity, converted.OnPremNodes)
IngestRelationships(batch, azure.Entity, converted.RelProps)
}

func IngestWrapper(batch graph.Batch, reader io.ReadSeeker, meta ingest.Metadata, adcsEnabled bool) error {
Expand Down Expand Up @@ -175,19 +141,14 @@ func IngestNode(batch graph.Batch, nowUTC time.Time, identityKind graph.Kind, ne
})
}

func IngestNodes(batch graph.Batch, identityKind graph.Kind, nodes []ein.IngestibleNode) error {
var (
nowUTC = time.Now().UTC()
errs = util.NewErrorCollector()
)
func IngestNodes(batch graph.Batch, identityKind graph.Kind, nodes []ein.IngestibleNode) {
nowUTC := time.Now().UTC()

for _, next := range nodes {
if err := IngestNode(batch, nowUTC, identityKind, next); err != nil {
log.Errorf("Error ingesting node ID %s: %v", next.ObjectID, err)
errs.Add(err)
log.Errorf("Error ingesting node: %v", err)
}
}
return errs.Combined()
}

func IngestRelationship(batch graph.Batch, nowUTC time.Time, nodeIDKind graph.Kind, nextRel ein.IngestibleRelationship) error {
Expand Down Expand Up @@ -218,19 +179,14 @@ func IngestRelationship(batch graph.Batch, nowUTC time.Time, nodeIDKind graph.Ki
})
}

func IngestRelationships(batch graph.Batch, nodeIDKind graph.Kind, relationships []ein.IngestibleRelationship) error {
var (
nowUTC = time.Now().UTC()
errs = util.NewErrorCollector()
)
func IngestRelationships(batch graph.Batch, nodeIDKind graph.Kind, relationships []ein.IngestibleRelationship) {
nowUTC := time.Now().UTC()

for _, next := range relationships {
if err := IngestRelationship(batch, nowUTC, nodeIDKind, next); err != nil {
log.Errorf("Error ingesting relationship from %s to %s : %v", next.Source, next.Target, err)
errs.Add(err)
log.Errorf("Error ingesting relationship from basic data : %v ", err)
}
}
return errs.Combined()
}

func ingestDNRelationship(batch graph.Batch, nowUTC time.Time, nextRel ein.IngestibleRelationship) error {
Expand Down Expand Up @@ -261,19 +217,14 @@ func ingestDNRelationship(batch graph.Batch, nowUTC time.Time, nextRel ein.Inges
})
}

func IngestDNRelationships(batch graph.Batch, relationships []ein.IngestibleRelationship) error {
var (
nowUTC = time.Now().UTC()
errs = util.NewErrorCollector()
)
func IngestDNRelationships(batch graph.Batch, relationships []ein.IngestibleRelationship) {
nowUTC := time.Now().UTC()

for _, next := range relationships {
if err := ingestDNRelationship(batch, nowUTC, next); err != nil {
log.Errorf("Error ingesting relationship: %v", err)
errs.Add(err)
}
}
return errs.Combined()
}

func ingestSession(batch graph.Batch, nowUTC time.Time, nextSession ein.IngestibleSession) error {
Expand Down Expand Up @@ -306,17 +257,12 @@ func ingestSession(batch graph.Batch, nowUTC time.Time, nextSession ein.Ingestib
})
}

func IngestSessions(batch graph.Batch, sessions []ein.IngestibleSession) error {
var (
nowUTC = time.Now().UTC()
errs = util.NewErrorCollector()
)
func IngestSessions(batch graph.Batch, sessions []ein.IngestibleSession) {
nowUTC := time.Now().UTC()

for _, next := range sessions {
if err := ingestSession(batch, nowUTC, next); err != nil {
log.Errorf("Error ingesting sessions: %v", err)
errs.Add(err)
}
}
return errs.Combined()
}

0 comments on commit ff337b1

Please sign in to comment.