Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check missed events before start #5289

Merged
merged 8 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/server/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type ListAttestedNodesResponse struct {

type ListAttestedNodesEventsRequest struct {
GreaterThanEventID uint
LessThanEventID uint
}

type AttestedNodeEvent struct {
Expand All @@ -180,8 +181,7 @@ type AttestedNodeEvent struct {
}

type ListAttestedNodesEventsResponse struct {
Events []AttestedNodeEvent
FirstEventID uint
Events []AttestedNodeEvent
}

type ListBundlesRequest struct {
Expand Down Expand Up @@ -226,6 +226,7 @@ type ListRegistrationEntriesResponse struct {

type ListRegistrationEntriesEventsRequest struct {
GreaterThanEventID uint
LessThanEventID uint
}

type RegistrationEntryEvent struct {
Expand All @@ -234,8 +235,7 @@ type RegistrationEntryEvent struct {
}

type ListRegistrationEntriesEventsResponse struct {
Events []RegistrationEntryEvent
FirstEventID uint
Events []RegistrationEntryEvent
}

type ListFederationRelationshipsRequest struct {
Expand Down
58 changes: 48 additions & 10 deletions pkg/server/datastore/sqlstore/sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,8 +1719,20 @@ func createAttestedNodeEvent(tx *gorm.DB, event *datastore.AttestedNodeEvent) er

func listAttestedNodesEvents(tx *gorm.DB, req *datastore.ListAttestedNodesEventsRequest) (*datastore.ListAttestedNodesEventsResponse, error) {
var events []AttestedNodeEvent
if err := tx.Find(&events, "id > ?", req.GreaterThanEventID).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)

if req.GreaterThanEventID != 0 || req.LessThanEventID != 0 {
query, id, err := buildListEventsQueryString(req.GreaterThanEventID, req.LessThanEventID)
if err != nil {
return nil, sqlError.Wrap(err)
}

if err := tx.Find(&events, query.String(), id).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)
}
} else {
if err := tx.Find(&events).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)
}
}

resp := &datastore.ListAttestedNodesEventsResponse{
Expand All @@ -1730,9 +1742,6 @@ func listAttestedNodesEvents(tx *gorm.DB, req *datastore.ListAttestedNodesEvents
resp.Events[i].EventID = event.ID
resp.Events[i].SpiffeID = event.SpiffeID
}
if len(events) > 0 {
resp.FirstEventID = events[0].ID
}

return resp, nil
}
Expand Down Expand Up @@ -4096,8 +4105,20 @@ func deleteRegistrationEntryEvent(tx *gorm.DB, eventID uint) error {

func listRegistrationEntriesEvents(tx *gorm.DB, req *datastore.ListRegistrationEntriesEventsRequest) (*datastore.ListRegistrationEntriesEventsResponse, error) {
var events []RegisteredEntryEvent
if err := tx.Find(&events, "id > ?", req.GreaterThanEventID).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)

if req.GreaterThanEventID != 0 || req.LessThanEventID != 0 {
query, id, err := buildListEventsQueryString(req.GreaterThanEventID, req.LessThanEventID)
if err != nil {
return nil, sqlError.Wrap(err)
}

if err := tx.Find(&events, query.String(), id).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)
}
} else {
if err := tx.Find(&events).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)
}
}

resp := &datastore.ListRegistrationEntriesEventsResponse{
Expand All @@ -4107,9 +4128,6 @@ func listRegistrationEntriesEvents(tx *gorm.DB, req *datastore.ListRegistrationE
resp.Events[i].EventID = event.ID
resp.Events[i].EntryID = event.EntryID
}
if len(events) > 0 {
resp.FirstEventID = events[0].ID
}

return resp, nil
}
Expand All @@ -4122,6 +4140,26 @@ func pruneRegistrationEntriesEvents(tx *gorm.DB, olderThan time.Duration) error
return nil
}

func buildListEventsQueryString(greaterThanEventID, lessThanEventID uint) (*strings.Builder, uint, error) {
if greaterThanEventID != 0 && lessThanEventID != 0 {
return nil, 0, errors.New("can't set both greater and less than event id")
}

var id uint
query := new(strings.Builder)
query.WriteString("id ")
if greaterThanEventID != 0 {
query.WriteString("> ?")
id = greaterThanEventID
}
if lessThanEventID != 0 {
query.WriteString("< ?")
id = lessThanEventID
}

return query, id, nil
}

func createJoinToken(tx *gorm.DB, token *datastore.JoinToken) error {
t := JoinToken{
Token: token.Token,
Expand Down
86 changes: 69 additions & 17 deletions pkg/server/datastore/sqlstore/sqlstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,36 +1539,62 @@ func (s *PluginSuite) TestListAttestedNodesEvents() {
tests := []struct {
name string
greaterThanEventID uint
lessThanEventID uint
expectedEvents []datastore.AttestedNodeEvent
expectedFirstEventID uint
expectedLastEventID uint
expectedErr string
}{
{
name: "All Events",
greaterThanEventID: 0,
expectedFirstEventID: 1,
expectedLastEventID: uint(len(expectedEvents)),
expectedEvents: expectedEvents,
},
{
name: "Half of the Events",
name: "Greater than half of the Events",
greaterThanEventID: uint(len(expectedEvents) / 2),
expectedFirstEventID: uint(len(expectedEvents)/2) + 1,
expectedLastEventID: uint(len(expectedEvents)),
expectedEvents: expectedEvents[len(expectedEvents)/2:],
},
{
name: "None of the Events",
greaterThanEventID: uint(len(expectedEvents)),
expectedFirstEventID: 0,
expectedEvents: []datastore.AttestedNodeEvent{},
name: "Less than half of the Events",
lessThanEventID: uint(len(expectedEvents) / 2),
expectedFirstEventID: 1,
expectedLastEventID: uint(len(expectedEvents)/2) - 1,
expectedEvents: expectedEvents[:len(expectedEvents)/2-1],
},
{
name: "Greater than largest Event ID",
greaterThanEventID: uint(len(expectedEvents)),
expectedEvents: []datastore.AttestedNodeEvent{},
},
{
name: "Setting both greater and less than",
greaterThanEventID: 1,
lessThanEventID: 1,
expectedErr: "rpc error: code = Unknown desc = datastore-sql: can't set both greater and less than event id",
},
}
for _, test := range tests {
s.T().Run(test.name, func(t *testing.T) {
resp, err := s.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{
GreaterThanEventID: test.greaterThanEventID,
LessThanEventID: test.lessThanEventID,
})
s.Assert().NoError(err)
s.Assert().Equal(test.expectedFirstEventID, resp.FirstEventID)
s.Assert().Equal(test.expectedEvents, resp.Events)
if test.expectedErr != "" {
require.EqualError(t, err, test.expectedErr)
return
}
s.Require().NoError(err)

s.Require().Equal(test.expectedEvents, resp.Events)
if len(resp.Events) > 0 {
s.Require().Equal(test.expectedFirstEventID, resp.Events[0].EventID)
s.Require().Equal(test.expectedLastEventID, resp.Events[len(resp.Events)-1].EventID)
}
})
}
}
Expand Down Expand Up @@ -3998,36 +4024,62 @@ func (s *PluginSuite) TestListRegistrationEntriesEvents() {
tests := []struct {
name string
greaterThanEventID uint
lessThanEventID uint
expectedEvents []datastore.RegistrationEntryEvent
expectedFirstEventID uint
expectedLastEventID uint
expectedErr string
}{
{
name: "All Events",
greaterThanEventID: 0,
expectedFirstEventID: 1,
expectedLastEventID: uint(len(expectedEvents)),
expectedEvents: expectedEvents,
},
{
name: "Half of the Events",
greaterThanEventID: 2,
expectedFirstEventID: 3,
expectedEvents: expectedEvents[2:],
name: "Greater than half of the Events",
greaterThanEventID: uint(len(expectedEvents) / 2),
expectedFirstEventID: uint(len(expectedEvents)/2) + 1,
expectedLastEventID: uint(len(expectedEvents)),
expectedEvents: expectedEvents[len(expectedEvents)/2:],
},
{
name: "Less than half of the Events",
lessThanEventID: uint(len(expectedEvents) / 2),
expectedFirstEventID: 1,
expectedLastEventID: uint(len(expectedEvents)/2) - 1,
expectedEvents: expectedEvents[:len(expectedEvents)/2-1],
},
{
name: "Greater than largest Event ID",
greaterThanEventID: 4,
expectedEvents: []datastore.RegistrationEntryEvent{},
},
{
name: "None of the Events",
greaterThanEventID: 4,
expectedFirstEventID: 0,
expectedEvents: []datastore.RegistrationEntryEvent{},
name: "Setting both greater and less than",
greaterThanEventID: 1,
lessThanEventID: 1,
expectedErr: "rpc error: code = Unknown desc = datastore-sql: can't set both greater and less than event id",
},
}
for _, test := range tests {
s.T().Run(test.name, func(t *testing.T) {
resp, err = s.ds.ListRegistrationEntriesEvents(ctx, &datastore.ListRegistrationEntriesEventsRequest{
GreaterThanEventID: test.greaterThanEventID,
LessThanEventID: test.lessThanEventID,
})
if test.expectedErr != "" {
require.EqualError(t, err, test.expectedErr)
return
}
s.Require().NoError(err)
s.Require().Equal(test.expectedFirstEventID, resp.FirstEventID)

s.Require().Equal(test.expectedEvents, resp.Events)
if len(resp.Events) > 0 {
s.Require().Equal(test.expectedFirstEventID, resp.Events[0].EventID)
s.Require().Equal(test.expectedLastEventID, resp.Events[len(resp.Events)-1].EventID)
}
})
}
}
Expand Down
36 changes: 17 additions & 19 deletions pkg/server/endpoints/authorized_entryfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,35 @@ type AuthorizedEntryFetcherWithEventsBasedCache struct {
log logrus.FieldLogger
ds datastore.DataStore

cacheReloadInterval time.Duration
pruneEventsOlderThan time.Duration
sqlTransactionTimeout time.Duration
cacheReloadInterval time.Duration
pruneEventsOlderThan time.Duration

registrationEntries eventsBasedCache
attestedNodes eventsBasedCache
}

type eventsBasedCache interface {
updateCache(ctx context.Context) error
pruneMissedEvents(sqlTransactionTimeout time.Duration)
pruneMissedEvents()
}

func NewAuthorizedEntryFetcherWithEventsBasedCache(ctx context.Context, log logrus.FieldLogger, clk clock.Clock, ds datastore.DataStore, cacheReloadInterval, pruneEventsOlderThan, sqlTransactionTimeout time.Duration) (*AuthorizedEntryFetcherWithEventsBasedCache, error) {
log.Info("Building event-based in-memory entry cache")
cache, registrationEntries, attestedNodes, err := buildCache(ctx, log, ds, clk)
cache, registrationEntries, attestedNodes, err := buildCache(ctx, log, ds, clk, sqlTransactionTimeout)
if err != nil {
return nil, err
}
log.Info("Completed building event-based in-memory entry cache")

return &AuthorizedEntryFetcherWithEventsBasedCache{
cache: cache,
clk: clk,
log: log,
ds: ds,
cacheReloadInterval: cacheReloadInterval,
pruneEventsOlderThan: pruneEventsOlderThan,
sqlTransactionTimeout: sqlTransactionTimeout,
registrationEntries: registrationEntries,
attestedNodes: attestedNodes,
cache: cache,
clk: clk,
log: log,
ds: ds,
cacheReloadInterval: cacheReloadInterval,
pruneEventsOlderThan: pruneEventsOlderThan,
registrationEntries: registrationEntries,
attestedNodes: attestedNodes,
}, nil
}

Expand Down Expand Up @@ -100,8 +98,8 @@ func (a *AuthorizedEntryFetcherWithEventsBasedCache) pruneEvents(ctx context.Con
pruneRegistrationEntriesEventsErr := a.ds.PruneRegistrationEntriesEvents(ctx, olderThan)
pruneAttestedNodesEventsErr := a.ds.PruneAttestedNodesEvents(ctx, olderThan)

a.registrationEntries.pruneMissedEvents(a.sqlTransactionTimeout)
a.attestedNodes.pruneMissedEvents(a.sqlTransactionTimeout)
a.registrationEntries.pruneMissedEvents()
a.attestedNodes.pruneMissedEvents()

return errors.Join(pruneRegistrationEntriesEventsErr, pruneAttestedNodesEventsErr)
}
Expand All @@ -113,15 +111,15 @@ func (a *AuthorizedEntryFetcherWithEventsBasedCache) updateCache(ctx context.Con
return errors.Join(updateRegistrationEntriesCacheErr, updateAttestedNodesCacheErr)
}

func buildCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock) (*authorizedentries.Cache, *registrationEntries, *attestedNodes, error) {
func buildCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, sqlTransactionTimeout time.Duration) (*authorizedentries.Cache, *registrationEntries, *attestedNodes, error) {
cache := authorizedentries.NewCache(clk)

registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, buildCachePageSize)
registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, buildCachePageSize, sqlTransactionTimeout)
if err != nil {
return nil, nil, nil, err
}

attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache)
attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache, sqlTransactionTimeout)
if err != nil {
return nil, nil, nil, err
}
Expand Down
Loading
Loading