Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/salesforce: Bug fixes and improvements #41015

Merged
merged 7 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 64 additions & 36 deletions x-pack/filebeat/input/salesforce/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (s *salesforceInput) Setup(env v2.Context, src inputcursor.Source, cursor *
// and based on the configuration, it will run the different methods -- EventLogFile
// or Object to collect events at defined intervals.
func (s *salesforceInput) run() error {
s.log.Info("Starting Salesforce input run")
if s.srcConfig.EventMonitoringMethod.EventLogFile.isEnabled() {
err := s.RunEventLogFile()
if err != nil {
Expand Down Expand Up @@ -160,17 +161,22 @@ func (s *salesforceInput) run() error {
case <-s.ctx.Done():
return s.isError(s.ctx.Err())
case <-eventLogFileTicker.C:
s.log.Info("Running EventLogFile collection")
if err := s.RunEventLogFile(); err != nil {
s.log.Errorf("Problem running EventLogFile collection: %s", err)
} else {
s.log.Info("EventLogFile collection completed successfully")
}
case <-objectMethodTicker.C:
s.log.Info("Running Object collection")
if err := s.RunObject(); err != nil {
s.log.Errorf("Problem running Object collection: %s", err)
} else {
s.log.Info("Object collection completed successfully")
}
}
}
}
shmsr marked this conversation as resolved.
Show resolved Hide resolved

func (s *salesforceInput) isError(err error) error {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
s.log.Infof("input stopped because context was cancelled with: %v", err)
Expand All @@ -181,15 +187,18 @@ func (s *salesforceInput) isError(err error) error {
}

func (s *salesforceInput) SetupSFClientConnection() (*soql.Resource, error) {
s.log.Info("Setting up Salesforce client connection")
if s.sfdcConfig == nil {
return nil, errors.New("internal error: salesforce configuration is not set properly")
}

// Open creates a session using the configuration.
session, err := session.Open(*s.sfdcConfig)
if err != nil {
s.log.Errorf("Failed to open Salesforce session: %v", err)
kush-elastic marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
s.log.Info("Salesforce session opened successfully")

// Set clientSession for re-use.
s.clientSession = session
Expand All @@ -209,8 +218,6 @@ func (s *salesforceInput) FormQueryWithCursor(queryConfig *QueryConfig, cursor m
return nil, err
}

s.log.Infof("Salesforce query: %s", qr)

return &querier{Query: qr}, err
}

Expand All @@ -222,7 +229,7 @@ func isZero[T comparable](v T) bool {

// RunObject runs the Object method of the Event Monitoring API to collect events.
func (s *salesforceInput) RunObject() error {
s.log.Debugf("scrape object(s) every %s", s.srcConfig.EventMonitoringMethod.Object.Interval)
s.log.Infof("Running Object collection with interval: %s", s.srcConfig.EventMonitoringMethod.Object.Interval)

var cursor mapstr.M
if !(isZero(s.cursor.Object.FirstEventTime) && isZero(s.cursor.Object.LastEventTime)) {
Expand All @@ -241,6 +248,8 @@ func (s *salesforceInput) RunObject() error {
return fmt.Errorf("error forming query based on cursor: %w", err)
}

s.log.Infof("Query formed: %s", query.Query)

res, err := s.soqlr.Query(query, false)
if err != nil {
return err
Expand Down Expand Up @@ -282,15 +291,15 @@ func (s *salesforceInput) RunObject() error {
return err
}
}
s.log.Debugf("Total events: %d", totalEvents)
s.log.Infof("Total events: %d", totalEvents)

return nil
}

// RunEventLogFile runs the EventLogFile method of the Event Monitoring API to
// collect events.
func (s *salesforceInput) RunEventLogFile() error {
s.log.Debugf("scrape eventLogFile(s) every %s", s.srcConfig.EventMonitoringMethod.EventLogFile.Interval)
s.log.Infof("Running EventLogFile collection with interval: %s", s.srcConfig.EventMonitoringMethod.EventLogFile.Interval)

var cursor mapstr.M
if !(isZero(s.cursor.EventLogFile.FirstEventTime) && isZero(s.cursor.EventLogFile.LastEventTime)) {
Expand All @@ -309,6 +318,8 @@ func (s *salesforceInput) RunEventLogFile() error {
return fmt.Errorf("error forming query based on cursor: %w", err)
}

s.log.Infof("Query formed: %s", query.Query)
kush-elastic marked this conversation as resolved.
Show resolved Hide resolved

res, err := s.soqlr.Query(query, false)
if err != nil {
return err
Expand All @@ -324,9 +335,14 @@ func (s *salesforceInput) RunEventLogFile() error {
totalEvents, firstEvent := 0, true
for res.TotalSize() > 0 {
for _, rec := range res.Records() {
req, err := http.NewRequestWithContext(s.ctx, http.MethodGet, s.config.URL+rec.Record().Fields()["LogFile"].(string), nil)
logfile, ok := rec.Record().Fields()["LogFile"].(string)
if !ok {
return fmt.Errorf("LogFile field not found or not a string in Salesforce event log file: %v", rec.Record().Fields())
}

req, err := http.NewRequestWithContext(s.ctx, http.MethodGet, s.config.URL+logfile, nil)
if err != nil {
return err
return fmt.Errorf("error creating request for log file: %w", err)
}

s.clientSession.AuthorizationHeader(req)
Expand All @@ -341,19 +357,23 @@ func (s *salesforceInput) RunEventLogFile() error {

resp, err := s.sfdcConfig.Client.Do(req)
if err != nil {
return err
return fmt.Errorf("error fetching log file: %w", err)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
devamanv marked this conversation as resolved.
Show resolved Hide resolved
return err
return fmt.Errorf("unexpected status code %d for log file", resp.StatusCode)
}
Comment on lines +363 to 366
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most important change. only resp with 200 status code should be allowed to go forward.


body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return fmt.Errorf("error reading log file body: %w", err)
}

recs, err := decodeAsCSV(body)
recs, err := s.decodeAsCSV(body)
if err != nil {
return err
return fmt.Errorf("error decoding CSV: %w", err)
}

if timestamp, ok := rec.Record().Fields()[s.config.EventMonitoringMethod.EventLogFile.Cursor.Field].(string); ok {
Expand All @@ -366,12 +386,11 @@ func (s *salesforceInput) RunEventLogFile() error {
for _, val := range recs {
jsonStrEvent, err := json.Marshal(val)
if err != nil {
return err
return fmt.Errorf("error json marshaling event: %w", err)
}

err = publishEvent(s.publisher, s.cursor, jsonStrEvent, "EventLogFile")
if err != nil {
return err
if err := publishEvent(s.publisher, s.cursor, jsonStrEvent, "EventLogFile"); err != nil {
return fmt.Errorf("error publishing event: %w", err)
}
totalEvents++
}
Expand All @@ -384,10 +403,10 @@ func (s *salesforceInput) RunEventLogFile() error {

res, err = res.Next()
if err != nil {
return err
return fmt.Errorf("error getting next page: %w", err)
}
}
s.log.Debugf("Total events: %d", totalEvents)
s.log.Infof("Total events processed: %d", totalEvents)

return nil
}
Expand All @@ -405,6 +424,7 @@ func (s *salesforceInput) getSFDCConfig(cfg *config) (*sfdc.Configuration, error

switch {
case cfg.Auth.OAuth2.JWTBearerFlow != nil && cfg.Auth.OAuth2.JWTBearerFlow.isEnabled():
s.log.Info("Using JWT Bearer Flow for authentication")
pemBytes, err := os.ReadFile(cfg.Auth.OAuth2.JWTBearerFlow.ClientKeyPath)
if err != nil {
return nil, fmt.Errorf("problem with client key path for JWT auth: %w", err)
Expand All @@ -428,6 +448,7 @@ func (s *salesforceInput) getSFDCConfig(cfg *config) (*sfdc.Configuration, error
}

case cfg.Auth.OAuth2.UserPasswordFlow != nil && cfg.Auth.OAuth2.UserPasswordFlow.isEnabled():
s.log.Info("Using User Password Flow for authentication")
passCreds := credentials.PasswordCredentials{
URL: cfg.Auth.OAuth2.UserPasswordFlow.TokenURL,
Username: cfg.Auth.OAuth2.UserPasswordFlow.Username,
Expand Down Expand Up @@ -533,21 +554,29 @@ type textContextError struct {
body []byte
}

// decodeAsCSV decodes p as a headed CSV document into dst.
func decodeAsCSV(p []byte) ([]map[string]string, error) {
// decodeAsCSV decodes the provided byte slice as a CSV and returns a slice of
// maps, where each map represents a row in the CSV with the header fields as
// keys and the row values as values.
func (s *salesforceInput) decodeAsCSV(p []byte) ([]map[string]string, error) {
r := csv.NewReader(bytes.NewReader(p))

// To share the backing array for performance.
r.ReuseRecord = true

// Lazy quotes are enabled to allow for quoted fields with commas. More flexible
// in handling CSVs.
// NOTE(shmsr): Although, we didn't face any issue with LazyQuotes == false, but I
// think we should keep it enabled to avoid any issues in the future.
r.LazyQuotes = true

// Header row is always expected, otherwise we can't map values to keys in
// the event.
header, err := r.Read()
if err != nil {
if err == io.EOF { //nolint:errorlint // csv.Reader never wraps io.EOF.
if errors.Is(err, io.EOF) {
return nil, nil
}
return nil, err
return nil, fmt.Errorf("failed to read CSV header: %w", err)
}

// As buffer reuse is enabled, copying header is important.
Expand All @@ -561,22 +590,21 @@ func decodeAsCSV(p []byte) ([]map[string]string, error) {
// so that future records must have the same field count.
// So, if len(header) != len(event), the Read will return an error and hence
// we need not put an explicit check.
event, err := r.Read()
for ; err == nil; event, err = r.Read() {
for {
record, err := r.Read()
if err != nil {
continue
}
o := make(map[string]string, len(header))
for i, h := range header {
o[h] = event[i]
if errors.Is(err, io.EOF) {
break
}
s.log.Errorf("failed to read CSV record: %v\n%s", err, p)
return nil, textContextError{error: fmt.Errorf("failed to read CSV record: %w for: %v", err, record), body: p}
}
results = append(results, o)
}

if err != nil {
if err != io.EOF { //nolint:errorlint // csv.Reader never wraps io.EOF.
return nil, textContextError{error: err, body: p}
event := make(map[string]string, len(header))
devamanv marked this conversation as resolved.
Show resolved Hide resolved
for i, h := range header {
event[h] = record[i]
}
results = append(results, event)
}

return results, nil
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/salesforce/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ func TestDecodeAsCSV(t *testing.T) {
"Login","20231218054831.655","4u6LyuMrDvb_G-l1cJIQk-","00D5j00000DgAYG","0055j00000AT6I1","1219","127","/services/oauth2/token","","bY5Wfv8t/Ith7WVE","Standard","","1051271151","i","Go-http-client/1.1","","9998.0","[email protected]","TLSv1.2","ECDHE-RSA-AES256-GCM-SHA384","","","2023-12-18T05:48:31.655Z","0055j00000AT6I1AAL","Salesforce.com IP","","LOGIN_NO_ERROR","103.108.207.58"
"Login","20231218054832.003","4u6LyuHSDv8LLVl1cJOqGV","00D5j00000DgAYG","0055j00000AT6I1","1277","104","/services/oauth2/token","","u60el7VqW8CSSKcW","Standard","","674857427","i","Go-http-client/1.1","","9998.0","[email protected]","TLSv1.2","ECDHE-RSA-AES256-GCM-SHA384","","","2023-12-18T05:48:32.003Z","0055j00000AT6I1AAL","103.108.207.58","","LOGIN_NO_ERROR","103.108.207.58"`

mp, err := decodeAsCSV([]byte(sampleELF))
s := &salesforceInput{log: logp.NewLogger("salesforceInput")}

mp, err := s.decodeAsCSV([]byte(sampleELF))
assert.NoError(t, err)

wantNumOfEvents := 2
Expand Down
Loading