Skip to content

Commit

Permalink
Merge pull request #658 from sylr/parse-no-lock
Browse files Browse the repository at this point in the history
Only lock fieldmap once during message parsing
  • Loading branch information
ackleymi authored Aug 9, 2024
2 parents f8a53b0 + af66cc8 commit 6e564c1
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 10 deletions.
58 changes: 55 additions & 3 deletions field_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,20 @@ func (m FieldMap) GetField(tag Tag, parser FieldValueReader) MessageRejectError
return nil
}

// GetField parses of a field with Tag tag. Returned reject may indicate the field is not present, or the field value is invalid.
func (m FieldMap) getFieldNoLock(tag Tag, parser FieldValueReader) MessageRejectError {
f, ok := m.tagLookup[tag]
if !ok {
return ConditionallyRequiredFieldMissing(tag)
}

if err := parser.Read(f[0].value); err != nil {
return IncorrectDataFormatForValue(tag)
}

return nil
}

// GetBytes is a zero-copy GetField wrapper for []bytes fields.
func (m FieldMap) GetBytes(tag Tag) ([]byte, MessageRejectError) {
m.rwLock.RLock()
Expand All @@ -128,6 +142,16 @@ func (m FieldMap) GetBytes(tag Tag) ([]byte, MessageRejectError) {
return f[0].value, nil
}

// getBytesNoLock is a lock free zero-copy GetField wrapper for []bytes fields.
func (m FieldMap) getBytesNoLock(tag Tag) ([]byte, MessageRejectError) {
f, ok := m.tagLookup[tag]
if !ok {
return nil, ConditionallyRequiredFieldMissing(tag)
}

return f[0].value, nil
}

// GetBool is a GetField wrapper for bool fields.
func (m FieldMap) GetBool(tag Tag) (bool, MessageRejectError) {
var val FIXBoolean
Expand All @@ -152,6 +176,21 @@ func (m FieldMap) GetInt(tag Tag) (int, MessageRejectError) {
return int(val), err
}

// GetInt is a lock free GetField wrapper for int fields.
func (m FieldMap) getIntNoLock(tag Tag) (int, MessageRejectError) {
bytes, err := m.getBytesNoLock(tag)
if err != nil {
return 0, err
}

var val FIXInt
if val.Read(bytes) != nil {
err = IncorrectDataFormatForValue(tag)
}

return int(val), err
}

// GetTime is a GetField wrapper for utc timestamp fields.
func (m FieldMap) GetTime(tag Tag) (t time.Time, err MessageRejectError) {
m.rwLock.RLock()
Expand Down Expand Up @@ -179,6 +218,15 @@ func (m FieldMap) GetString(tag Tag) (string, MessageRejectError) {
return string(val), nil
}

// GetString is a GetField wrapper for string fields.
func (m FieldMap) getStringNoLock(tag Tag) (string, MessageRejectError) {
var val FIXString
if err := m.getFieldNoLock(tag, &val); err != nil {
return "", err
}
return string(val), nil
}

// GetGroup is a Get function specific to Group Fields.
func (m FieldMap) GetGroup(parser FieldGroupReader) MessageRejectError {
m.rwLock.RLock()
Expand Down Expand Up @@ -246,6 +294,13 @@ func (m *FieldMap) Clear() {
}
}

func (m *FieldMap) clearNoLock() {
m.tags = m.tags[0:0]
for k := range m.tagLookup {
delete(m.tagLookup, k)
}
}

// CopyInto overwrites the given FieldMap with this one.
func (m *FieldMap) CopyInto(to *FieldMap) {
m.rwLock.RLock()
Expand All @@ -263,9 +318,6 @@ func (m *FieldMap) CopyInto(to *FieldMap) {
}

func (m *FieldMap) add(f field) {
m.rwLock.Lock()
defer m.rwLock.Unlock()

t := fieldTag(f)
if _, ok := m.tagLookup[t]; !ok {
m.tags = append(m.tags, t)
Expand Down
25 changes: 18 additions & 7 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,17 @@ func ParseMessageWithDataDictionary(

// doParsing executes the message parsing process.
func doParsing(mp *msgParser) (err error) {
mp.msg.Header.rwLock.Lock()
defer mp.msg.Header.rwLock.Unlock()
mp.msg.Body.rwLock.Lock()
defer mp.msg.Body.rwLock.Unlock()
mp.msg.Trailer.rwLock.Lock()
defer mp.msg.Trailer.rwLock.Unlock()

// Initialize for parsing.
mp.msg.Header.Clear()
mp.msg.Body.Clear()
mp.msg.Trailer.Clear()
mp.msg.Header.clearNoLock()
mp.msg.Body.clearNoLock()
mp.msg.Trailer.clearNoLock()

// Allocate expected message fields in one chunk.
fieldCount := bytes.Count(mp.rawBytes, []byte{'\001'})
Expand Down Expand Up @@ -262,7 +269,7 @@ func doParsing(mp *msgParser) (err error) {
}

if mp.parsedFieldBytes.tag == tagXMLDataLen {
xmlDataLen, _ = mp.msg.Header.GetInt(tagXMLDataLen)
xmlDataLen, _ = mp.msg.Header.getIntNoLock(tagXMLDataLen)
}
mp.fieldIndex++
}
Expand All @@ -287,7 +294,7 @@ func doParsing(mp *msgParser) (err error) {
}
}

bodyLength, err := mp.msg.Header.GetInt(tagBodyLength)
bodyLength, err := mp.msg.Header.getIntNoLock(tagBodyLength)
if err != nil {
err = parseError{OrigError: err.Error()}
} else if length != bodyLength && !xmlDataMsg {
Expand Down Expand Up @@ -368,7 +375,7 @@ func parseGroup(mp *msgParser, tags []Tag) {
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
func isNumInGroupField(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) bool {
if appDataDictionary != nil {
msgt, err := msg.MsgType()
msgt, err := msg.msgTypeNoLock()
if err != nil {
return false
}
Expand Down Expand Up @@ -401,7 +408,7 @@ func isNumInGroupField(msg *Message, tags []Tag, appDataDictionary *datadictiona
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
func getGroupFields(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) (fields []*datadictionary.FieldDef) {
if appDataDictionary != nil {
msgt, err := msg.MsgType()
msgt, err := msg.msgTypeNoLock()
if err != nil {
return
}
Expand Down Expand Up @@ -471,6 +478,10 @@ func (m *Message) MsgType() (string, MessageRejectError) {
return m.Header.GetString(tagMsgType)
}

func (m *Message) msgTypeNoLock() (string, MessageRejectError) {
return m.Header.getStringNoLock(tagMsgType)
}

// IsMsgTypeOf returns true if the Header contains MsgType (tag 35) field and its value is the specified one.
func (m *Message) IsMsgTypeOf(msgType string) bool {
if v, err := m.MsgType(); err == nil {
Expand Down

0 comments on commit 6e564c1

Please sign in to comment.