Skip to content

Commit

Permalink
Merge pull request #636 from ackleymi/repeating-grps
Browse files Browse the repository at this point in the history
Maintain repeating group field order when parsing messages
  • Loading branch information
ackleymi authored May 30, 2024
2 parents fbe0cd7 + 3517c8b commit 2da4180
Show file tree
Hide file tree
Showing 5 changed files with 471 additions and 59 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ linters-install:
lint: linters-install
golangci-lint run

# An easy way to run the linter without going through the install process -
# docker run -t --rm -v $(pwd):/app -w /app golangci/golangci-lint:v1.57.2 golangci-lint run -v
# See https://golangci-lint.run/welcome/install/ for more details.

# ---------------------------------------------------------------
# Targets related to running acceptance tests -

Expand Down
6 changes: 5 additions & 1 deletion in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
nextSeqNum := seqNum
msg := NewMessage()
for _, msgBytes := range msgs {
_ = ParseMessageWithDataDictionary(msg, bytes.NewBuffer(msgBytes), session.transportDataDictionary, session.appDataDictionary)
err = ParseMessageWithDataDictionary(msg, bytes.NewBuffer(msgBytes), session.transportDataDictionary, session.appDataDictionary)
if err != nil {
session.log.OnEventf("Resend Msg Parse Error: %v, %v", err.Error(), bytes.NewBuffer(msgBytes).String())
return // We cant continue with a message that cant be parsed correctly.
}
msgType, _ := msg.Header.GetBytes(tagMsgType)
sentMessageSeqNum, _ := msg.Header.GetInt(tagMsgSeqNum)

Expand Down
282 changes: 224 additions & 58 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ import (
// Header is first section of a FIX Message.
type Header struct{ FieldMap }

// msgparser contains message parsing vars needed to parse a string into a message.
type msgParser struct {
msg *Message
transportDataDictionary *datadictionary.DataDictionary
appDataDictionary *datadictionary.DataDictionary
rawBytes []byte
fieldIndex int
parsedFieldBytes *TagValue
trailerBytes []byte
foundBody bool
foundTrailer bool
}

// in the message header, the first 3 tags in the message header must be 8,9,35.
func headerFieldOrdering(i, j Tag) bool {
var ordering = func(t Tag) uint32 {
Expand Down Expand Up @@ -152,124 +165,134 @@ func ParseMessageWithDataDictionary(
msg *Message,
rawMessage *bytes.Buffer,
transportDataDictionary *datadictionary.DataDictionary,
_ *datadictionary.DataDictionary,
appDataDictionary *datadictionary.DataDictionary,
) (err error) {
msg.Header.Clear()
msg.Body.Clear()
msg.Trailer.Clear()
msg.rawMessage = rawMessage
// Create msgparser before we go any further.
mp := &msgParser{
msg: msg,
transportDataDictionary: transportDataDictionary,
appDataDictionary: appDataDictionary,
}
mp.msg.rawMessage = rawMessage
mp.rawBytes = rawMessage.Bytes()

rawBytes := rawMessage.Bytes()
return doParsing(mp)
}

// Allocate fields in one chunk.
// doParsing executes the message parsing process.
func doParsing(mp *msgParser) (err error) {
// Initialize for parsing.
mp.msg.Header.Clear()
mp.msg.Body.Clear()
mp.msg.Trailer.Clear()

// Allocate expected message fields in one chunk.
fieldCount := 0
for _, b := range rawBytes {
for _, b := range mp.rawBytes {
if b == '\001' {
fieldCount++
}
}

if fieldCount == 0 {
return parseError{OrigError: fmt.Sprintf("No Fields detected in %s", string(rawBytes))}
return parseError{OrigError: fmt.Sprintf("No Fields detected in %s", string(mp.rawBytes))}
}

if cap(msg.fields) < fieldCount {
msg.fields = make([]TagValue, fieldCount)
if cap(mp.msg.fields) < fieldCount {
mp.msg.fields = make([]TagValue, fieldCount)
} else {
msg.fields = msg.fields[0:fieldCount]
mp.msg.fields = mp.msg.fields[0:fieldCount]
}

fieldIndex := 0

// Message must start with begin string, body length, msg type.
if rawBytes, err = extractSpecificField(&msg.fields[fieldIndex], tagBeginString, rawBytes); err != nil {
// Get begin string.
if mp.rawBytes, err = extractSpecificField(&mp.msg.fields[mp.fieldIndex], tagBeginString, mp.rawBytes); err != nil {
return
}
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])

msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
fieldIndex++

parsedFieldBytes := &msg.fields[fieldIndex]
if rawBytes, err = extractSpecificField(parsedFieldBytes, tagBodyLength, rawBytes); err != nil {
// Get body length.
mp.fieldIndex++
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
if mp.rawBytes, err = extractSpecificField(mp.parsedFieldBytes, tagBodyLength, mp.rawBytes); err != nil {
return
}
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])

msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
fieldIndex++

parsedFieldBytes = &msg.fields[fieldIndex]
if rawBytes, err = extractSpecificField(parsedFieldBytes, tagMsgType, rawBytes); err != nil {
// Get msg type.
mp.fieldIndex++
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
if mp.rawBytes, err = extractSpecificField(mp.parsedFieldBytes, tagMsgType, mp.rawBytes); err != nil {
return
}
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])

// Start parsing.
mp.fieldIndex++
xmlDataLen := 0
xmlDataMsg := false

msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
fieldIndex++

trailerBytes := []byte{}
foundBody := false
foundTrailer := false
mp.trailerBytes = []byte{}
mp.foundBody = false
mp.foundTrailer = false
for {
parsedFieldBytes = &msg.fields[fieldIndex]
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
if xmlDataLen > 0 {
rawBytes, err = extractXMLDataField(parsedFieldBytes, rawBytes, xmlDataLen)
mp.rawBytes, err = extractXMLDataField(mp.parsedFieldBytes, mp.rawBytes, xmlDataLen)
xmlDataLen = 0
xmlDataMsg = true
} else {
rawBytes, err = extractField(parsedFieldBytes, rawBytes)
mp.rawBytes, err = extractField(mp.parsedFieldBytes, mp.rawBytes)
}
if err != nil {
return
}

switch {
case isHeaderField(parsedFieldBytes.tag, transportDataDictionary):
msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
case isTrailerField(parsedFieldBytes.tag, transportDataDictionary):
msg.Trailer.add(msg.fields[fieldIndex : fieldIndex+1])
foundTrailer = true
case isHeaderField(mp.parsedFieldBytes.tag, mp.transportDataDictionary):
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
case isTrailerField(mp.parsedFieldBytes.tag, mp.transportDataDictionary):
mp.msg.Trailer.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
mp.foundTrailer = true
case isNumInGroupField(mp.msg, []Tag{mp.parsedFieldBytes.tag}, mp.appDataDictionary):
parseGroup(mp, []Tag{mp.parsedFieldBytes.tag})
default:
foundBody = true
trailerBytes = rawBytes
msg.Body.add(msg.fields[fieldIndex : fieldIndex+1])
mp.foundBody = true
mp.trailerBytes = mp.rawBytes
mp.msg.Body.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
}
if parsedFieldBytes.tag == tagCheckSum {
if mp.parsedFieldBytes.tag == tagCheckSum {
break
}

if !foundBody {
msg.bodyBytes = rawBytes
if !mp.foundBody {
mp.msg.bodyBytes = mp.rawBytes
}

if parsedFieldBytes.tag == tagXMLDataLen {
xmlDataLen, _ = msg.Header.GetInt(tagXMLDataLen)
if mp.parsedFieldBytes.tag == tagXMLDataLen {
xmlDataLen, _ = mp.msg.Header.GetInt(tagXMLDataLen)
}
fieldIndex++
mp.fieldIndex++
}

// This will happen if there are no fields in the body
if foundTrailer && !foundBody {
trailerBytes = rawBytes
msg.bodyBytes = nil
if mp.foundTrailer && !mp.foundBody {
mp.trailerBytes = mp.rawBytes
mp.msg.bodyBytes = nil
}

// Body length would only be larger than trailer if fields out of order.
if len(msg.bodyBytes) > len(trailerBytes) {
msg.bodyBytes = msg.bodyBytes[:len(msg.bodyBytes)-len(trailerBytes)]
if len(mp.msg.bodyBytes) > len(mp.trailerBytes) {
mp.msg.bodyBytes = mp.msg.bodyBytes[:len(mp.msg.bodyBytes)-len(mp.trailerBytes)]
}

length := 0
for _, field := range msg.fields {
for _, field := range mp.msg.fields {
switch field.tag {
case tagBeginString, tagBodyLength, tagCheckSum: // Tags do not contribute to length.
default:
length += field.length()
}
}

bodyLength, err := msg.Header.GetInt(tagBodyLength)
bodyLength, err := mp.msg.Header.GetInt(tagBodyLength)
if err != nil {
err = parseError{OrigError: err.Error()}
} else if length != bodyLength && !xmlDataMsg {
Expand All @@ -279,6 +302,149 @@ func ParseMessageWithDataDictionary(
return
}

// parseGroup iterates through a repeating group to maintain correct order of those fields.
func parseGroup(mp *msgParser, tags []Tag) {
mp.foundBody = true
dm := mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1]
fields := getGroupFields(mp.msg, tags, mp.appDataDictionary)

for {
mp.fieldIndex++
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
mp.rawBytes, _ = extractField(mp.parsedFieldBytes, mp.rawBytes)
mp.trailerBytes = mp.rawBytes

// Is this field a member for the group.
if isGroupMember(mp.parsedFieldBytes.tag, fields) {
// Is this field a nested repeating group.
if isNumInGroupField(mp.msg, append(tags, mp.parsedFieldBytes.tag), mp.appDataDictionary) {
dm = append(dm, *mp.parsedFieldBytes)
tags = append(tags, mp.parsedFieldBytes.tag)
fields = getGroupFields(mp.msg, tags, mp.appDataDictionary)
continue
}
// Add the field member to the group.
dm = append(dm, *mp.parsedFieldBytes)
} else if isHeaderField(mp.parsedFieldBytes.tag, mp.transportDataDictionary) {
// Found a header tag for some reason..
mp.msg.Body.add(dm)
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
break
} else if isTrailerField(mp.parsedFieldBytes.tag, mp.transportDataDictionary) {
// Found the trailer at the end of the message.
mp.msg.Body.add(dm)
mp.msg.Trailer.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
mp.foundTrailer = true
break
} else {
// Found a body field outside the group.
searchTags := []Tag{mp.parsedFieldBytes.tag}
// Is this a new group not inside the existing group.
if isNumInGroupField(mp.msg, searchTags, mp.appDataDictionary) {
// Add the current repeating group.
mp.msg.Body.add(dm)
// Cycle again with the new group.
dm = mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1]
fields = getGroupFields(mp.msg, searchTags, mp.appDataDictionary)
continue
}
if len(tags) > 1 {
searchTags = tags[:len(tags)-1]
}
// Did this tag occur after a nested group and belongs to the parent group.
if isNumInGroupField(mp.msg, searchTags, mp.appDataDictionary) {
// Add the field member to the group.
dm = append(dm, *mp.parsedFieldBytes)
// Continue parsing the parent group.
fields = getGroupFields(mp.msg, searchTags, mp.appDataDictionary)
continue
}
// Add the repeating group.
mp.msg.Body.add(dm)
// Add the next body field.
mp.msg.Body.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])

break
}
}
}

// isNumInGroupField evaluates if this tag is the start of a repeating group.
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
func isNumInGroupField(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) bool {
if appDataDictionary != nil {
msgt, err := msg.MsgType()
if err != nil {
return false
}
mm, ok := appDataDictionary.Messages[msgt]
if ok {
fields := mm.Fields
for idx, tag := range tags {
fd, ok := fields[int(tag)]
if ok {
if idx == len(tags)-1 {
if len(fd.Fields) > 0 {
return true
}
} else {
// Map nested fields.
newFields := make(map[int]*datadictionary.FieldDef)
for _, ff := range fd.Fields {
newFields[ff.Tag()] = ff
}
fields = newFields
}
}
}
}
}
return false
}

// getGroupFields gets the relevant fields for parsing a repeating group if this tag is the start of a repeating group.
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
func getGroupFields(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) (fields []*datadictionary.FieldDef) {
if appDataDictionary != nil {
msgt, err := msg.MsgType()
if err != nil {
return
}
mm, ok := appDataDictionary.Messages[msgt]
if ok {
fields := mm.Fields
for idx, tag := range tags {
fd, ok := fields[int(tag)]
if ok {
if idx == len(tags)-1 {
if len(fd.Fields) > 0 {
return fd.Fields
}
} else {
// Map nested fields.
newFields := make(map[int]*datadictionary.FieldDef)
for _, ff := range fd.Fields {
newFields[ff.Tag()] = ff
}
fields = newFields
}
}
}
}
}
return
}

// isGroupMember evaluates if this tag belongs to a repeating group.
func isGroupMember(tag Tag, fields []*datadictionary.FieldDef) bool {
for _, f := range fields {
if f.Tag() == int(tag) {
return true
}
}
return false
}

func isHeaderField(tag Tag, dataDict *datadictionary.DataDictionary) bool {
if tag.IsHeader() {
return true
Expand Down
Loading

0 comments on commit 2da4180

Please sign in to comment.