Skip to content

Commit

Permalink
get rid of protocol parsing by relying on OpenSMTPD-framework package
Browse files Browse the repository at this point in the history
  • Loading branch information
poolpOrg committed May 1, 2024
1 parent e90a249 commit 20baf73
Showing 1 changed file with 79 additions and 210 deletions.
289 changes: 79 additions & 210 deletions filter-senderscore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"log"
"time"

"github.com/poolpOrg/OpenSMTPD-framework/filter"
)

var blockBelow *int
Expand All @@ -35,17 +37,10 @@ var junkBelow *int
var slowFactor *int
var scoreHeader *bool
var allowlistFile *string
var testMode *bool
var allowlist = make(map[string]bool)
var allowlistMasks = make(map[int]bool)

var version string

var outputChannel chan string

type session struct {
id string

category int8
score int8

Expand All @@ -55,68 +50,30 @@ type session struct {

var sessions = make(map[string]*session)

var reporters = map[string]func(string, string, []string){
"link-connect": linkConnect,
"link-disconnect": linkDisconnect,
}

var filters = map[string]func(string, string, []string){
"connect": filterConnect,

"helo": delayedAnswer,
"ehlo": delayedAnswer,
"starttls": delayedAnswer,
"auth": delayedAnswer,
"mail-from": delayedAnswer,
"rcpt-to": delayedAnswer,
"data": delayedAnswer,
"data-line": dataline,
"commit": delayedAnswer,

"quit": delayedAnswer,
}

func linkConnect(phase string, sessionId string, params []string) {
if len(params) != 4 {
log.Fatal("invalid input, shouldn't happen")
}

func linkConnectCb(timestamp time.Time, sessionId string, rdns string, fcrdns string, src net.Addr, dest net.Addr) {
s := &session{}
s.first_line = true
s.score = -1
sessions[sessionId] = s

addr := net.ParseIP(strings.Split(params[2], ":")[0])
if addr == nil || strings.Contains(addr.String(), ":") {
return
}

defer func(addr net.IP, s *session) {
fmt.Fprintf(os.Stderr, "link-connect addr=%s score=%d\n", addr, s.score)
}(addr, s)

for maskOnes := range allowlistMasks {
mask := net.CIDRMask(maskOnes, 32)
maskedAddr := addr.Mask(mask).String()
query := fmt.Sprintf("%s/%d", maskedAddr, maskOnes)
if allowlist[query] {
fmt.Fprintf(os.Stderr, "IP address %s matches allowlisted subnet %s\n", addr, query)
s.score = 100
return
if addr, ok := src.(*net.TCPAddr); ok {
defer func(laddr net.IP, s *session) {
fmt.Fprintf(os.Stderr, "link-connect addr=%s score=%d\n", laddr, s.score)
}(addr.IP, s)

for maskOnes := range allowlistMasks {
mask := net.CIDRMask(maskOnes, 32)
maskedAddr := addr.IP.Mask(mask).String()
query := fmt.Sprintf("%s/%d", maskedAddr, maskOnes)
if allowlist[query] {
fmt.Fprintf(os.Stderr, "IP address %s matches allowlisted subnet %s\n", addr, query)
s.score = 100
return
}
}
}

atoms := strings.Split(addr.String(), ".")
atoms := strings.Split(addr.String(), ".")

if *testMode {
// if test mode is enabled, the Sender Score DNS query is
// skipped and the score is derived directly from the
// connecting IP address; IP addresses ending with 255 can be
// used to simulate missing Sender Score DNS entries
if atoms[3] == "255" {
return
}
} else {
addrs, _ := net.LookupIP(fmt.Sprintf("%s.%s.%s.%s.score.senderscore.com",
atoms[3], atoms[2], atoms[1], atoms[0]))

Expand All @@ -126,19 +83,16 @@ func linkConnect(phase string, sessionId string, params []string) {

resolved := addrs[0].String()
atoms = strings.Split(resolved, ".")
}

category, _ := strconv.ParseInt(atoms[2], 10, 8)
score, _ := strconv.ParseInt(atoms[3], 10, 8)
category, _ := strconv.ParseInt(atoms[2], 10, 8)
score, _ := strconv.ParseInt(atoms[3], 10, 8)

s.category = int8(category)
s.score = int8(score)
s.category = int8(category)
s.score = int8(score)
}
}

func linkDisconnect(phase string, sessionId string, params []string) {
if len(params) != 0 {
log.Fatal("invalid input, shouldn't happen")
}
func linkDisconnectCb(timestamp time.Time, sessionId string) {
delete(sessions, sessionId)
}

Expand All @@ -150,7 +104,7 @@ func getSession(sessionId string) *session {
return s
}

func filterConnect(phase string, sessionId string, params []string) {
func filterConnectCb(timestamp time.Time, sessionId string, rdns string, fcrdns string, src net.Addr, dest net.Addr) filter.Response {
s := getSession(sessionId)

if *slowFactor > 0 && s.score >= 0 {
Expand All @@ -160,126 +114,42 @@ func filterConnect(phase string, sessionId string, params []string) {
s.delay = 0
}

if s.score != -1 && s.score < int8(*blockBelow) && *blockPhase == "connect" {
delayedDisconnect(sessionId, params)
} else if s.score != -1 && s.score < int8(*junkBelow) {
delayedJunk(sessionId, params)
} else {
delayedProceed(sessionId, params)
}
}

func produceOutput(msgType string, sessionId string, token string, format string, a ...interface{}) {
var out string

tokens := strings.Split(version, ".")
hiver, _ := strconv.Atoi(tokens[0])
lover, _ := strconv.Atoi(tokens[1])
if hiver == 0 && lover < 5 {
out = msgType + "|" + token + "|" + sessionId
} else {
out = msgType + "|" + sessionId + "|" + token
if s.delay > 0 {
time.Sleep(time.Duration(s.delay) * time.Millisecond)
}
out += "|" + fmt.Sprintf(format, a...)

if *testMode {
fmt.Println(out)
if s.score != -1 && s.score < int8(*blockBelow) && *blockPhase == "connect" {
return filter.Disconnect("550 your IP reputation is too low for this MX")
} else if s.score != -1 && s.score < int8(*junkBelow) {
return filter.Junk()
} else {
outputChannel <- out
return filter.Proceed()
}
}

func dataline(phase string, sessionId string, params []string) {
func datalineCb(timestamp time.Time, sessionId string, line string) []string {
ret := []string{}
s := getSession(sessionId)
token := params[0]
line := strings.Join(params[1:], "|")

if s.first_line == true {
if s.first_line {
if s.score != -1 && *scoreHeader {
produceOutput("filter-dataline", sessionId, token, "X-SenderScore: %d", s.score)
ret = append(ret, fmt.Sprintf("X-SenderScore: %d", s.score))
}
s.first_line = false
}

produceOutput("filter-dataline", sessionId, token, "%s", line)
}

func delayedAnswer(phase string, sessionId string, params []string) {
s := getSession(sessionId)

if s.score != -1 && s.score < int8(*blockBelow) && *blockPhase == phase {
delayedDisconnect(sessionId, params)
return
}

delayedProceed(sessionId, params)
}

func delayedJunk(sessionId string, params []string) {
s := getSession(sessionId)
token := params[0]
if *testMode {
waitThenAction(sessionId, token, s.delay, "junk")
} else {
go waitThenAction(sessionId, token, s.delay, "junk")
}
}

func delayedProceed(sessionId string, params []string) {
s := getSession(sessionId)
token := params[0]
if *testMode {
waitThenAction(sessionId, token, s.delay, "proceed")
} else {
go waitThenAction(sessionId, token, s.delay, "proceed")
}
ret = append(ret, line)
return ret
}

func delayedDisconnect(sessionId string, params []string) {
func delayedResponse(sessionId string, phase string) filter.Response {
s := getSession(sessionId)
token := params[0]
if *testMode {
waitThenAction(sessionId, token, s.delay, "disconnect|550 your IP reputation is too low for this MX")
} else {
go waitThenAction(sessionId, token, s.delay, "disconnect|550 your IP reputation is too low for this MX")
}
}

func waitThenAction(sessionId string, token string, delay int, format string, a ...interface{}) {
if delay > 0 {
time.Sleep(time.Duration(delay) * time.Millisecond)
}
produceOutput("filter-result", sessionId, token, format, a...)
}

func filterInit() {
for k := range reporters {
fmt.Printf("register|report|smtp-in|%s\n", k)
}
for k := range filters {
fmt.Printf("register|filter|smtp-in|%s\n", k)
}
fmt.Println("register|ready")
}

func trigger(currentSlice map[string]func(string, string, []string), atoms []string) {
if handler, ok := currentSlice[atoms[4]]; ok {
handler(atoms[4], atoms[5], atoms[6:])
} else {
log.Fatalf("invalid phase: %s", atoms[4])
if s.delay > 0 {
time.Sleep(time.Duration(s.delay) * time.Millisecond)
}
}

func skipConfig(scanner *bufio.Scanner) {
for {
if !scanner.Scan() {
os.Exit(0)
}
line := scanner.Text()
if line == "config|ready" {
return
}
if s.score != -1 && s.score < int8(*blockBelow) && *blockPhase == phase {
return filter.Disconnect("550 your IP reputation is too low for this MX")
}
return filter.Proceed()
}

func validatePhase(phase string) {
Expand Down Expand Up @@ -341,46 +211,45 @@ func main() {
slowFactor = flag.Int("slowFactor", -1, "delay factor to apply to sessions")
scoreHeader = flag.Bool("scoreHeader", false, "add X-SenderScore header")
allowlistFile = flag.String("allowlist", "", "file containing a list of IP addresses or subnets in CIDR notation to allowlist, one per line")
testMode = flag.Bool("testMode", false, "skip all DNS queries, process all requests sequentially, only for debugging purposes")

flag.Parse()

validatePhase(*blockPhase)
loadAllowlists()

scanner := bufio.NewScanner(os.Stdin)
skipConfig(scanner)
filterInit()

if !*testMode {
outputChannel = make(chan string)
go func() {
for line := range outputChannel {
fmt.Println(line)
}
}()
}

for {
if !scanner.Scan() {
os.Exit(0)
}

line := scanner.Text()
atoms := strings.Split(line, "|")
if len(atoms) < 6 {
log.Fatalf("missing atoms: %s", line)
}

version = atoms[1]

switch atoms[0] {
case "report":
trigger(reporters, atoms)
case "filter":
trigger(filters, atoms)
default:
log.Fatalf("invalid stream: %s", atoms[0])
}
}
filter.Init()

// reporting callbacks
filter.SMTP_IN.OnLinkConnect(linkConnectCb)
filter.SMTP_IN.OnLinkDisconnect(linkDisconnectCb)

// filtering callbacks
filter.SMTP_IN.ConnectRequest(filterConnectCb)
filter.SMTP_IN.HeloRequest(func(timestamp time.Time, sessionId string, helo string) filter.Response {
return delayedResponse(sessionId, "helo")
})
filter.SMTP_IN.EhloRequest(func(timestamp time.Time, sessionId string, helo string) filter.Response {
return delayedResponse(sessionId, "ehlo")
})
filter.SMTP_IN.StartTLSRequest(func(timestamp time.Time, sessionId string, tlsString string) filter.Response {
return delayedResponse(sessionId, "starttls")
})
filter.SMTP_IN.AuthRequest(func(timestamp time.Time, sessionId string, method string) filter.Response {
return delayedResponse(sessionId, "auth")
})
filter.SMTP_IN.MailFromRequest(func(timestamp time.Time, sessionId string, from string) filter.Response {
return delayedResponse(sessionId, "mail-from")
})
filter.SMTP_IN.MailFromRequest(func(timestamp time.Time, sessionId string, to string) filter.Response {
return delayedResponse(sessionId, "rcpt-to")
})
filter.SMTP_IN.DataRequest(func(timestamp time.Time, sessionId string) filter.Response {
return delayedResponse(sessionId, "data")
})
filter.SMTP_IN.CommitRequest(func(timestamp time.Time, sessionId string) filter.Response {
return delayedResponse(sessionId, "commit")
})
filter.SMTP_IN.DataLineRequest(datalineCb)

filter.Dispatch()
}

0 comments on commit 20baf73

Please sign in to comment.