Skip to content

Commit

Permalink
x-pack/filebeat/input/salesforce: Bug fixes and improvements (#41015)
Browse files Browse the repository at this point in the history
(cherry picked from commit 9ae6ed1)
  • Loading branch information
shmsr authored and mergify[bot] committed Oct 1, 2024
1 parent 803e672 commit 3a10594
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Ensure netflow custom field configuration is applied. {issue}40735[40735] {pull}40730[40730]
- Fix replace processor handling of zero string replacement validation. {pull}40751[40751]
- Fix long filepaths in diagnostics exceeding max path limits on Windows. {pull}40909[40909]
- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015]


*Heartbeat*
Expand Down
100 changes: 64 additions & 36 deletions x-pack/filebeat/input/salesforce/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (s *salesforceInput) Setup(env v2.Context, src inputcursor.Source, cursor *
// and based on the configuration, it will run the different methods -- EventLogFile
// or Object to collect events at defined intervals.
func (s *salesforceInput) run() error {
s.log.Info("Starting Salesforce input run")
if s.srcConfig.EventMonitoringMethod.EventLogFile.isEnabled() {
err := s.RunEventLogFile()
if err != nil {
Expand Down Expand Up @@ -160,12 +161,18 @@ func (s *salesforceInput) run() error {
case <-s.ctx.Done():
return s.isError(s.ctx.Err())
case <-eventLogFileTicker.C:
s.log.Info("Running EventLogFile collection")
if err := s.RunEventLogFile(); err != nil {
s.log.Errorf("Problem running EventLogFile collection: %s", err)
} else {
s.log.Info("EventLogFile collection completed successfully")
}
case <-objectMethodTicker.C:
s.log.Info("Running Object collection")
if err := s.RunObject(); err != nil {
s.log.Errorf("Problem running Object collection: %s", err)
} else {
s.log.Info("Object collection completed successfully")
}
}
}
Expand All @@ -181,15 +188,17 @@ func (s *salesforceInput) isError(err error) error {
}

func (s *salesforceInput) SetupSFClientConnection() (*soql.Resource, error) {
s.log.Info("Setting up Salesforce client connection")
if s.sfdcConfig == nil {
return nil, errors.New("internal error: salesforce configuration is not set properly")
}

// Open creates a session using the configuration.
session, err := session.Open(*s.sfdcConfig)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to open salesforce connection: %w", err)
}
s.log.Info("Salesforce session opened successfully")

// Set clientSession for re-use.
s.clientSession = session
Expand All @@ -209,8 +218,6 @@ func (s *salesforceInput) FormQueryWithCursor(queryConfig *QueryConfig, cursor m
return nil, err
}

s.log.Infof("Salesforce query: %s", qr)

return &querier{Query: qr}, err
}

Expand All @@ -222,7 +229,7 @@ func isZero[T comparable](v T) bool {

// RunObject runs the Object method of the Event Monitoring API to collect events.
func (s *salesforceInput) RunObject() error {
s.log.Debugf("scrape object(s) every %s", s.srcConfig.EventMonitoringMethod.Object.Interval)
s.log.Infof("Running Object collection with interval: %s", s.srcConfig.EventMonitoringMethod.Object.Interval)

var cursor mapstr.M
if !(isZero(s.cursor.Object.FirstEventTime) && isZero(s.cursor.Object.LastEventTime)) {
Expand All @@ -241,6 +248,8 @@ func (s *salesforceInput) RunObject() error {
return fmt.Errorf("error forming query based on cursor: %w", err)
}

s.log.Infof("Query formed: %s", query.Query)

res, err := s.soqlr.Query(query, false)
if err != nil {
return err
Expand Down Expand Up @@ -282,15 +291,15 @@ func (s *salesforceInput) RunObject() error {
return err
}
}
s.log.Debugf("Total events: %d", totalEvents)
s.log.Infof("Total events: %d", totalEvents)

return nil
}

// RunEventLogFile runs the EventLogFile method of the Event Monitoring API to
// collect events.
func (s *salesforceInput) RunEventLogFile() error {
s.log.Debugf("scrape eventLogFile(s) every %s", s.srcConfig.EventMonitoringMethod.EventLogFile.Interval)
s.log.Infof("Running EventLogFile collection with interval: %s", s.srcConfig.EventMonitoringMethod.EventLogFile.Interval)

var cursor mapstr.M
if !(isZero(s.cursor.EventLogFile.FirstEventTime) && isZero(s.cursor.EventLogFile.LastEventTime)) {
Expand All @@ -309,6 +318,8 @@ func (s *salesforceInput) RunEventLogFile() error {
return fmt.Errorf("error forming query based on cursor: %w", err)
}

s.log.Infof("Query formed: %s", query.Query)

res, err := s.soqlr.Query(query, false)
if err != nil {
return err
Expand All @@ -324,9 +335,14 @@ func (s *salesforceInput) RunEventLogFile() error {
totalEvents, firstEvent := 0, true
for res.TotalSize() > 0 {
for _, rec := range res.Records() {
req, err := http.NewRequestWithContext(s.ctx, http.MethodGet, s.config.URL+rec.Record().Fields()["LogFile"].(string), nil)
logfile, ok := rec.Record().Fields()["LogFile"].(string)
if !ok {
return fmt.Errorf("LogFile field not found or not a string in Salesforce event log file: %v", rec.Record().Fields())
}

req, err := http.NewRequestWithContext(s.ctx, http.MethodGet, s.config.URL+logfile, nil)
if err != nil {
return err
return fmt.Errorf("error creating request for log file: %w", err)
}

s.clientSession.AuthorizationHeader(req)
Expand All @@ -341,19 +357,23 @@ func (s *salesforceInput) RunEventLogFile() error {

resp, err := s.sfdcConfig.Client.Do(req)
if err != nil {
return err
return fmt.Errorf("error fetching log file: %w", err)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return err
return fmt.Errorf("unexpected status code %d for log file", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return fmt.Errorf("error reading log file body: %w", err)
}

recs, err := decodeAsCSV(body)
recs, err := s.decodeAsCSV(body)
if err != nil {
return err
return fmt.Errorf("error decoding CSV: %w", err)
}

if timestamp, ok := rec.Record().Fields()[s.config.EventMonitoringMethod.EventLogFile.Cursor.Field].(string); ok {
Expand All @@ -366,12 +386,11 @@ func (s *salesforceInput) RunEventLogFile() error {
for _, val := range recs {
jsonStrEvent, err := json.Marshal(val)
if err != nil {
return err
return fmt.Errorf("error json marshaling event: %w", err)
}

err = publishEvent(s.publisher, s.cursor, jsonStrEvent, "EventLogFile")
if err != nil {
return err
if err := publishEvent(s.publisher, s.cursor, jsonStrEvent, "EventLogFile"); err != nil {
return fmt.Errorf("error publishing event: %w", err)
}
totalEvents++
}
Expand All @@ -384,10 +403,10 @@ func (s *salesforceInput) RunEventLogFile() error {

res, err = res.Next()
if err != nil {
return err
return fmt.Errorf("error getting next page: %w", err)
}
}
s.log.Debugf("Total events: %d", totalEvents)
s.log.Infof("Total events processed: %d", totalEvents)

return nil
}
Expand All @@ -405,6 +424,7 @@ func (s *salesforceInput) getSFDCConfig(cfg *config) (*sfdc.Configuration, error

switch {
case cfg.Auth.OAuth2.JWTBearerFlow != nil && cfg.Auth.OAuth2.JWTBearerFlow.isEnabled():
s.log.Info("Using JWT Bearer Flow for authentication")
pemBytes, err := os.ReadFile(cfg.Auth.OAuth2.JWTBearerFlow.ClientKeyPath)
if err != nil {
return nil, fmt.Errorf("problem with client key path for JWT auth: %w", err)
Expand All @@ -428,6 +448,7 @@ func (s *salesforceInput) getSFDCConfig(cfg *config) (*sfdc.Configuration, error
}

case cfg.Auth.OAuth2.UserPasswordFlow != nil && cfg.Auth.OAuth2.UserPasswordFlow.isEnabled():
s.log.Info("Using User Password Flow for authentication")
passCreds := credentials.PasswordCredentials{
URL: cfg.Auth.OAuth2.UserPasswordFlow.TokenURL,
Username: cfg.Auth.OAuth2.UserPasswordFlow.Username,
Expand Down Expand Up @@ -533,21 +554,29 @@ type textContextError struct {
body []byte
}

// decodeAsCSV decodes p as a headed CSV document into dst.
func decodeAsCSV(p []byte) ([]map[string]string, error) {
// decodeAsCSV decodes the provided byte slice as a CSV and returns a slice of
// maps, where each map represents a row in the CSV with the header fields as
// keys and the row values as values.
func (s *salesforceInput) decodeAsCSV(p []byte) ([]map[string]string, error) {
r := csv.NewReader(bytes.NewReader(p))

// To share the backing array for performance.
r.ReuseRecord = true

// Lazy quotes are enabled to allow for quoted fields with commas. More flexible
// in handling CSVs.
// NOTE(shmsr): Although, we didn't face any issue with LazyQuotes == false, but I
// think we should keep it enabled to avoid any issues in the future.
r.LazyQuotes = true

// Header row is always expected, otherwise we can't map values to keys in
// the event.
header, err := r.Read()
if err != nil {
if err == io.EOF { //nolint:errorlint // csv.Reader never wraps io.EOF.
if errors.Is(err, io.EOF) {
return nil, nil
}
return nil, err
return nil, fmt.Errorf("failed to read CSV header: %w", err)
}

// As buffer reuse is enabled, copying header is important.
Expand All @@ -561,22 +590,21 @@ func decodeAsCSV(p []byte) ([]map[string]string, error) {
// so that future records must have the same field count.
// So, if len(header) != len(event), the Read will return an error and hence
// we need not put an explicit check.
event, err := r.Read()
for ; err == nil; event, err = r.Read() {
for {
record, err := r.Read()
if err != nil {
continue
}
o := make(map[string]string, len(header))
for i, h := range header {
o[h] = event[i]
if errors.Is(err, io.EOF) {
break
}
s.log.Errorf("failed to read CSV record: %v\n%s", err, p)
return nil, textContextError{error: fmt.Errorf("failed to read CSV record: %w for: %v", err, record), body: p}
}
results = append(results, o)
}

if err != nil {
if err != io.EOF { //nolint:errorlint // csv.Reader never wraps io.EOF.
return nil, textContextError{error: err, body: p}
event := make(map[string]string, len(header))
for i, h := range header {
event[h] = record[i]
}
results = append(results, event)
}

return results, nil
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/salesforce/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ func TestDecodeAsCSV(t *testing.T) {
"Login","20231218054831.655","4u6LyuMrDvb_G-l1cJIQk-","00D5j00000DgAYG","0055j00000AT6I1","1219","127","/services/oauth2/token","","bY5Wfv8t/Ith7WVE","Standard","","1051271151","i","Go-http-client/1.1","","9998.0","[email protected]","TLSv1.2","ECDHE-RSA-AES256-GCM-SHA384","","","2023-12-18T05:48:31.655Z","0055j00000AT6I1AAL","Salesforce.com IP","","LOGIN_NO_ERROR","103.108.207.58"
"Login","20231218054832.003","4u6LyuHSDv8LLVl1cJOqGV","00D5j00000DgAYG","0055j00000AT6I1","1277","104","/services/oauth2/token","","u60el7VqW8CSSKcW","Standard","","674857427","i","Go-http-client/1.1","","9998.0","[email protected]","TLSv1.2","ECDHE-RSA-AES256-GCM-SHA384","","","2023-12-18T05:48:32.003Z","0055j00000AT6I1AAL","103.108.207.58","","LOGIN_NO_ERROR","103.108.207.58"`

mp, err := decodeAsCSV([]byte(sampleELF))
s := &salesforceInput{log: logp.NewLogger("salesforceInput")}

mp, err := s.decodeAsCSV([]byte(sampleELF))
assert.NoError(t, err)

wantNumOfEvents := 2
Expand Down

0 comments on commit 3a10594

Please sign in to comment.