Skip to content

Commit

Permalink
rpc: Store and serve the event transaction ID
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Jan 31, 2024
1 parent 433cd44 commit 876da11
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 14 deletions.
10 changes: 8 additions & 2 deletions cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type event struct {
diagnosticEventXDR []byte
txIndex uint32
eventIndex uint32
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
}

func (e event) cursor(ledgerSeq uint32) Cursor {
Expand Down Expand Up @@ -90,13 +91,15 @@ type Range struct {
ClampEnd bool
}

type ScanFunction func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool

// Scan applies f on all the events occurring in the given range.
// The events are processed in sorted ascending Cursor order.
// If f returns false, the scan terminates early (f will not be applied on
// remaining events in the range). Note that a read lock is held for the
// entire duration of the Scan function so f should be written in a way
// to minimize latency.
func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor, int64) bool) (uint32, error) {
func (m *MemoryStore) Scan(eventRange Range, f ScanFunction) (uint32, error) {
startTime := time.Now()
m.lock.RLock()
defer m.lock.RUnlock()
Expand Down Expand Up @@ -126,7 +129,7 @@ func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor,
if err != nil {
return 0, err
}
if !f(diagnosticEvent, cur, timestamp) {
if !f(diagnosticEvent, cur, timestamp, event.txHash) {
return lastLedgerInWindow, nil
}
}
Expand Down Expand Up @@ -235,10 +238,12 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (
if !tx.Result.Successful() {
continue
}

txEvents, err := tx.GetDiagnosticEvents()
if err != nil {
return nil, err
}
txHash := tx.Result.TransactionHash
for index, e := range txEvents {
diagnosticEventXDR, err := e.MarshalBinary()
if err != nil {
Expand All @@ -248,6 +253,7 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (
diagnosticEventXDR: diagnosticEventXDR,
txIndex: tx.Index,
eventIndex: uint32(index),
txHash: &txHash,
})
}
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/soroban-rpc/internal/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func eventsAreEqual(t *testing.T, a, b []event) {

func TestScanRangeValidation(t *testing.T) {
m := NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 4)
assertNoCalls := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, timestamp int64) bool {
assertNoCalls := func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool {
t.Fatalf("unexpected call")
return true
}
Expand Down Expand Up @@ -362,14 +362,15 @@ func TestScan(t *testing.T) {
for _, input := range genEquivalentInputs(testCase.input) {
var events []event
iterateAll := true
f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64) bool {
f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64, hash *xdr.Hash) bool {
require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp)
diagnosticEventXDR, err := contractEvent.MarshalBinary()
require.NoError(t, err)
events = append(events, event{
diagnosticEventXDR: diagnosticEventXDR,
txIndex: cursor.Tx,
eventIndex: cursor.Event,
txHash: hash,
})
return iterateAll
}
Expand Down
12 changes: 8 additions & 4 deletions cmd/soroban-rpc/internal/methods/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type EventInfo struct {
Topic []string `json:"topic"`
Value string `json:"value"`
InSuccessfulContractCall bool `json:"inSuccessfulContractCall"`
TransactionHash string `json:"txHash"`
}

type GetEventsRequest struct {
Expand Down Expand Up @@ -299,7 +300,7 @@ type GetEventsResponse struct {
}

type eventScanner interface {
Scan(eventRange events.Range, f func(xdr.DiagnosticEvent, events.Cursor, int64) bool) (uint32, error)
Scan(eventRange events.Range, f events.ScanFunction) (uint32, error)
}

type eventsRPCHandler struct {
Expand Down Expand Up @@ -334,6 +335,7 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse
cursor events.Cursor
ledgerCloseTimestamp int64
event xdr.DiagnosticEvent
txHash *xdr.Hash
}
var found []entry
latestLedger, err := h.scanner.Scan(
Expand All @@ -343,9 +345,9 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse
End: events.MaxCursor,
ClampEnd: true,
},
func(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerCloseTimestamp int64) bool {
func(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerCloseTimestamp int64, txHash *xdr.Hash) bool {
if request.Matches(event) {
found = append(found, entry{cursor, ledgerCloseTimestamp, event})
found = append(found, entry{cursor, ledgerCloseTimestamp, event, txHash})
}
return uint(len(found)) < limit
},
Expand All @@ -363,6 +365,7 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse
entry.event,
entry.cursor,
time.Unix(entry.ledgerCloseTimestamp, 0).UTC().Format(time.RFC3339),
entry.txHash.HexString(),
)
if err != nil {
return GetEventsResponse{}, errors.Wrap(err, "could not parse event")
Expand All @@ -375,7 +378,7 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse
}, nil
}

func eventInfoForEvent(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerClosedAt string) (EventInfo, error) {
func eventInfoForEvent(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerClosedAt string, txHash string) (EventInfo, error) {
v0, ok := event.Event.Body.GetV0()
if !ok {
return EventInfo{}, errors.New("unknown event version")
Expand Down Expand Up @@ -411,6 +414,7 @@ func eventInfoForEvent(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerCl
Topic: topic,
Value: data,
InSuccessfulContractCall: event.InSuccessfulContractCall,
TransactionHash: txHash,
}
if event.Event.ContractId != nil {
info.ContractID = strkey.MustEncode(strkey.VersionByteContract, (*event.Event.ContractId)[:])
Expand Down
24 changes: 18 additions & 6 deletions cmd/soroban-rpc/internal/methods/get_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,8 @@ func TestGetEvents(t *testing.T) {
),
))
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

handler := eventsRPCHandler{
scanner: store,
Expand Down Expand Up @@ -626,6 +627,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{value},
Value: value,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(),
})
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
Expand Down Expand Up @@ -699,7 +701,8 @@ func TestGetEvents(t *testing.T) {
),
))
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

number := xdr.Uint64(4)
handler := eventsRPCHandler{
Expand Down Expand Up @@ -738,6 +741,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{counterXdr, value},
Value: value,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(4).HexString(),
},
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
Expand Down Expand Up @@ -792,7 +796,8 @@ func TestGetEvents(t *testing.T) {
),
),
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

handler := eventsRPCHandler{
scanner: store,
Expand Down Expand Up @@ -832,6 +837,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{counterXdr, value},
Value: value,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(3).HexString(),
},
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
Expand Down Expand Up @@ -865,7 +871,8 @@ func TestGetEvents(t *testing.T) {
),
),
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

handler := eventsRPCHandler{
scanner: store,
Expand All @@ -892,6 +899,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{counterXdr},
Value: counterXdr,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(0).HexString(),
},
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
Expand All @@ -913,7 +921,8 @@ func TestGetEvents(t *testing.T) {
),
))
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

handler := eventsRPCHandler{
scanner: store,
Expand Down Expand Up @@ -947,6 +956,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{value},
Value: value,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(),
})
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
Expand Down Expand Up @@ -996,7 +1006,8 @@ func TestGetEvents(t *testing.T) {
),
),
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(5, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(5, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

id := &events.Cursor{Ledger: 5, Tx: 1, Op: 0, Event: 0}
handler := eventsRPCHandler{
Expand Down Expand Up @@ -1031,6 +1042,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{counterXdr},
Value: expectedXdr,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(),
})
}
assert.Equal(t, GetEventsResponse{expected, 5}, results)
Expand Down

0 comments on commit 876da11

Please sign in to comment.