diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4c49bd7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env diff --git a/.travis.yml b/.travis.yml index 89ae482..719367c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,10 @@ language: go go: - - 1.1 + - 1.2 + - tip install: - go get github.com/BotBotMe/botbot-bot -script: go test github.com/BotBotMe/botbot-bot +script: go test github.com/BotBotMe/botbot-bot/... diff --git a/README.md b/README.md index 7efa624..50735bd 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ [![Build Status](https://travis-ci.org/BotBotMe/botbot-bot.png)](https://travis-ci.org/BotBotMe/botbot-bot) -The bot used in botbot.me. To install: +The bot used in botbot.me is a Go (1.2) program. To install: go get github.com/BotBotMe/botbot-bot diff --git a/botbot.go b/botbot.go index 93c8c15..07fc36c 100644 --- a/botbot.go +++ b/botbot.go @@ -72,14 +72,14 @@ func (self *BotBot) listen(queueName string) { } func (self *BotBot) mainLoop() { - - go self.recordUserCounts() + // TODO (yml) comment out self.recordUserCounts because I think it is + // leaking postgres connection. + //go self.recordUserCounts() var busCommand string var args string for { select { - case serverLine, ok := <-self.fromServer: if !ok { // Channel is closed, we're offline. Stop. @@ -120,6 +120,9 @@ func (self *BotBot) mainLoop() { // - WRITE : Send message to server // - REFRESH: Reload plugin configuration func (self *BotBot) handleCommand(cmd string, args string) { + if glog.V(2) { + glog.Infoln("HandleCommand:", cmd) + } switch cmd { case "WRITE": parts := strings.SplitN(args, " ", 3) diff --git a/certs/README.md b/certs/README.md new file mode 100644 index 0000000..443a8e6 --- /dev/null +++ b/certs/README.md @@ -0,0 +1,12 @@ +The certs included in this repo has been with `generate_cert` from the GO +standard library. + +``` +generate_cert -ca=true -duration=8760h0m0s -host="127.0.0.1" -start-date="Jan 1 15:04:05 2014" +``` + +The generate_cert can be build like this + +``` +go build $GOROOT/src/pkg/crypto/tls/generate_cert.go +``` diff --git a/certs/cert.pem b/certs/cert.pem new file mode 100644 index 0000000..8971977 --- /dev/null +++ b/certs/cert.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC8zCCAd2gAwIBAgIQLlTIDFFPizKZzJKC6j/k4jALBgkqhkiG9w0BAQswEjEQ +MA4GA1UEChMHQWNtZSBDbzAeFw0xNDAxMDExNTA0MDVaFw0xNTAxMDExNTA0MDVa +MBIxEDAOBgNVBAoTB0FjbWUgQ28wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQDMVw4BGI1GsAdye8rqiSUS/gOI1oGpdbHO/B99xxXib9VvqFx3pT4WkQdd +OqAni7X4K4Td/3MBQ/K+Wy982wxR3pCHCWRxZ6iDQi/TPMRqP8hSufTyRd/QA+bj +UEXUh4Orq3VthIq0i4TjML9rC14pJy/ldb80clHLoVdx2uz3t52k0VZS1QMwLLpW +l3khpjO7ICUsdfFda+oMbroQPlgSEg69QcBLmwTuxJ7Y2PRVK6nWU/2+fGKQQXGT +PA9jU/zHt/7voUA9I1zXM1s7e1ExRbD4MFlaLwUo0qlbLL0dsIFURiJh3fBvgum/ +T7NQRvEJQQFx25S6/zyxn1VwKcVbAgMBAAGjSTBHMA4GA1UdDwEB/wQEAwIApDAT +BgNVHSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MA8GA1UdEQQIMAaH +BH8AAAEwCwYJKoZIhvcNAQELA4IBAQA8YZJmhOwIYEhUzObwQQzcJY3ZrUN7AiEc +sQnFaqOmzrtP0SDjWPlMITXeHV84Zw5+taIv1OQ3IyG9z+BPgIAkHQYVuVILrYOU +nVGs9bxdcrhKLwKVXEUoRsa+FY3NG8r7HTO+jYveXoXBjqA3NT7CmlJ3QtJJiCD/ +5S5u1KwfAZQ7xcrigBxTyRSpR/m0APGDGqrf5mKUcQVkGF9RIEDWOuCoqr1He9GZ +ba6HNYeEQrsdooazi9xbR2bk6F0Wa2Pt4YpS5Zy3KVuhxwbo43houwifhjF23MM/ +4DmoVl1copKHf/dMSsiDbGDeF2g/Z6Q2X790ijDOnxkOEbpjyuHP +-----END CERTIFICATE----- diff --git a/certs/key.pem b/certs/key.pem new file mode 100644 index 0000000..3ee2821 --- /dev/null +++ b/certs/key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAzFcOARiNRrAHcnvK6oklEv4DiNaBqXWxzvwffccV4m/Vb6hc +d6U+FpEHXTqgJ4u1+CuE3f9zAUPyvlsvfNsMUd6QhwlkcWeog0Iv0zzEaj/IUrn0 +8kXf0APm41BF1IeDq6t1bYSKtIuE4zC/awteKScv5XW/NHJRy6FXcdrs97edpNFW +UtUDMCy6Vpd5IaYzuyAlLHXxXWvqDG66ED5YEhIOvUHAS5sE7sSe2Nj0VSup1lP9 +vnxikEFxkzwPY1P8x7f+76FAPSNc1zNbO3tRMUWw+DBZWi8FKNKpWyy9HbCBVEYi +Yd3wb4Lpv0+zUEbxCUEBcduUuv88sZ9VcCnFWwIDAQABAoIBAQDGY07JaZjgJE9W +qzGa+4PvKIct93Tznb0ABHBeoUdyDLngKkl8MAZTyoKn1R9gxZfqVfYtwrFZbSrj +/YvhYJpZcghH0qqeH9HPfhcIs1rO/RX0m25hOc3OG7uyvmDNsrP3nAPqTGCOYVan +bNCrkOHeeplZ4jO9K7CWTxKjC7Y92V1EL1IW1H3NU7z+6gA/r5JRUhrSljR37sja +RIc3kBPqfo+qlwTSQ8m9A8dKVQro6Lj8xzSUxwZbDRV83GJDQS0UWDRZEbxO/6Oq +xNp6LEeFjjgI4CpKtPogGkOrcrOTRlZt4Y0bFaEYmKjsMepUIBmf7rbT/C9dfJKc +76YzTnnBAoGBANCBB43/jJuOyvwVM+n7mn0/dRBhnioe0ffONu0EXKx42JFUZ2hf +nnFbFQDjQU5EJDyWmz7gRSbcWXOBdrLwhLfqgOs4NBO9IYfw8tCjFfvLieo1cC1g +k++qfxQWSGlLOqLHmH89a2Zo+n6E2bVHZeEPLtx9BaKsDu/JjtFcn8bpAoGBAPrj +M9RvY0tpPcfmoBch3LYseGLPonbVJLOea2xOyQRgUuzSD8rfcoKL9HrmLGV1jP7b +dUamOle+1gOIjMN/NDTULnbU3of9Jpg7sM4QN58Ae2VVheJiP7vQ3mQMVGQFNjZn +PqkHsX8SrSufjJPXXZpyEB8eoncEOHrEXCSY98ejAoGAUKTXSzDALZp5IC8DOUi7 +ZB4bJQ7coeXxyCrWS64YOg0zlmJS7KevHKDh6sn7BE6OxXBq81LC7FjlsCsuwd8N +NiVZX6EJY2kPgwjKoa+yXQVIfnHUxXhJP7VuGVLVBwS11Sfl77DNzXplCHJR0i+f +VAEPWvBwMY7i6BPJTWjJ06ECgYB+PuoL0ekzP0f5WVHhkgaQHRyPDFCCX4rSX7tK +ivKYIyhXRvTvbzUd//MMDNr2SwqiDCidXnQpRzxGFuZOUaohk0u2PDOmVnqfTBoR +9xMmDYYKXdi7LBlKFzZxPcsRcZXwFk/vmpF0kh9VvjltA4h0eUUng7DZFuSc0kZT +nC3C5QKBgQCfg1NASP4WsANzaLABzfDKbQ23pxb9eqBWGVESEPhzUoqiTZ0rSDl4 +5oY7Dx9RGIJxPQWoG3nxjAK3GnlArUEJf4S/YMFmOX0naXp4HW4RxODPNoxnaQG0 +NfDb7xAuQNgY2F5T9dK02EPJ0tamsJbvHxVZLm1eiU2HPcK1ZurRfw== +-----END RSA PRIVATE KEY----- diff --git a/common/common.go b/common/common.go index f69d08d..478218e 100644 --- a/common/common.go +++ b/common/common.go @@ -2,6 +2,7 @@ package common import ( "io" + "strings" ) // Interface all bots must implement @@ -15,11 +16,23 @@ type ChatBot interface { // Configuration for a 'chatbot', which is what clients pay for type BotConfig struct { - Id int - Config map[string]string + Id int + Config map[string]string + Channels []*Channel +} + +// Configuration for a channel +type Channel struct { + Id int + Name string + Pwd string + Fingerprint string +} + +func (cc *Channel) Credential() string { + return strings.TrimSpace(cc.Name + " " + cc.Pwd) +} - // Normally this array is names of the channels to join - // but if the channel has a password, the string is " " - // That means we can pass the string straight to JOIN and it works - Channels []string +func (cc *Channel) String() string { + return cc.Name } diff --git a/common/mock.go b/common/mock.go index 343e606..93bfc5f 100644 --- a/common/mock.go +++ b/common/mock.go @@ -2,15 +2,65 @@ package common import ( "bufio" - "net" + "crypto/tls" + "flag" + "log" + "sync" "testing" + "time" + + "github.com/golang/glog" ) +// SetGlogFlags walk around a glog issue and force it to log to stderr. +// It need to be called at the beginning of each test. +func SetGlogFlags() { + flag.Set("alsologtostderr", "true") + flag.Set("V", "3") +} + +// MockSocket is a dummy implementation of ReadWriteCloser +type MockSocket struct { + sync.RWMutex + Counter chan bool + Receiver chan string +} + +func (sock MockSocket) Write(data []byte) (int, error) { + glog.V(3).Infoln("[Debug]: Starting MockSocket.Write of:", string(data)) + if sock.Counter != nil { + sock.Counter <- true + } + if sock.Receiver != nil { + sock.Receiver <- string(data) + } + + return len(data), nil +} + +func (sock MockSocket) Read(into []byte) (int, error) { + sock.RLock() + defer sock.RUnlock() + time.Sleep(time.Second) // Prevent busy loop + return 0, nil +} + +func (sock MockSocket) Close() error { + if sock.Receiver != nil { + close(sock.Receiver) + } + if sock.Counter != nil { + close(sock.Counter) + } + return nil +} + /* * Mock IRC server */ type MockIRCServer struct { + sync.RWMutex Port string Message string Got []string @@ -24,44 +74,63 @@ func NewMockIRCServer(msg, port string) *MockIRCServer { } } -func (self *MockIRCServer) Run(t *testing.T) { +func (srv *MockIRCServer) GotLength() int { + srv.RLock() + defer srv.RUnlock() + return len(srv.Got) +} - listener, err := net.Listen("tcp", ":"+self.Port) +func (srv *MockIRCServer) Run(t *testing.T) { + // Use the certs generated with generate_certs + cert, err := tls.LoadX509KeyPair("certs/cert.pem", "certs/key.pem") if err != nil { - t.Error("Error starting mock server on "+self.Port, err) - return + log.Fatalf("server: loadkeys: %s", err) } - - // Accept a single connection - conn, lerr := listener.Accept() - if lerr != nil { - t.Error("Error on IRC server on Accept. ", err) + config := tls.Config{Certificates: []tls.Certificate{cert}} + listener, err := tls.Listen("tcp", "127.0.0.1:"+srv.Port, &config) + if err != nil { + t.Error("Error starting mock server on "+srv.Port, err) + return } - // First message triggers BotBot to send USER and NICK messages - conn.Write([]byte(":hybrid7.debian.local NOTICE AUTH :*** Looking up your hostname...\n")) - - // Ask for NickServ auth, and pretend we got it - conn.Write([]byte(":NickServ!NickServ@services. NOTICE graham_king :This nickname is registered. Please choose a different nickname, or identify via /msg NickServ identify \n")) - conn.Write([]byte(":NickServ!NickServ@services. NOTICE graham_king :You are now identified for graham_king.\n")) + for { + conn, lerr := listener.Accept() + // If create a new connection throw the old data away + // This can happen if a client trys to connect with tls + // Got will store the handshake data. The cient will try + // connect with a plaintext connect after the tls fails. + srv.Lock() + srv.Got = make([]string, 0) + srv.Unlock() - conn.Write([]byte(":wolfe.freenode.net 001 graham_king :Welcome to the freenode Internet Relay Chat Network graham_king\n")) + if lerr != nil { + t.Error("Error on IRC server on Accept. ", err) + } - // This should get sent to plugins - conn.Write([]byte(":yml!~yml@li148-151.members.linode.com PRIVMSG #unit :" + self.Message + "\n")) - //conn.Write([]byte("test: " + self.Message + "\n")) + // First message triggers BotBot to send USER and NICK messages + conn.Write([]byte(":hybrid7.debian.local NOTICE AUTH :*** Looking up your hostname...\n")) + // Ask for NickServ auth, and pretend we got it + conn.Write([]byte(":NickServ!NickServ@services. NOTICE graham_king :This nickname is registered. Please choose a different nickname, or identify via /msg NickServ identify \n")) + conn.Write([]byte(":NickServ!NickServ@services. NOTICE graham_king :You are now identified for graham_king.\n")) + conn.Write([]byte(":wolfe.freenode.net 001 graham_king :Welcome to the freenode Internet Relay Chat Network graham_king\n")) + // This should get sent to plugins + conn.Write([]byte(":yml!~yml@li148-151.members.linode.com PRIVMSG #unit :" + srv.Message + "\n")) + conn.Write([]byte("test: " + srv.Message + "\n")) - var derr error - var data []byte + var derr error + var data []byte - bufRead := bufio.NewReader(conn) - for { - data, derr = bufRead.ReadBytes('\n') - if derr != nil { - // Client closed connection - break + bufRead := bufio.NewReader(conn) + for { + data, derr = bufRead.ReadBytes('\n') + if derr != nil { + // Client closed connection + break + } + srv.Lock() + srv.Got = append(srv.Got, string(data)) + srv.Unlock() } - self.Got = append(self.Got, string(data)) } } diff --git a/common/queue.go b/common/queue.go index 736cb07..c64558f 100644 --- a/common/queue.go +++ b/common/queue.go @@ -4,6 +4,9 @@ import ( "net" "net/url" "os" + "strconv" + "strings" + "sync" "time" "github.com/golang/glog" @@ -38,6 +41,7 @@ type Queue interface { // Simplistic Queue implementation used by the test suite type MockQueue struct { + sync.RWMutex Got map[string][]string ReadChannel chan string } @@ -49,23 +53,29 @@ func NewMockQueue() *MockQueue { } } -func (self *MockQueue) Publish(queue string, message []byte) error { - self.Got[queue] = append(self.Got[queue], string(message)) +func (mq *MockQueue) Publish(queue string, message []byte) error { + mq.Lock() + defer mq.Unlock() + mq.Got[queue] = append(mq.Got[queue], string(message)) return nil } -func (self *MockQueue) Rpush(key string, val []byte) error { - self.Got[key] = append(self.Got[key], string(val)) +func (mq *MockQueue) Rpush(key string, val []byte) error { + mq.Lock() + defer mq.Unlock() + mq.Got[key] = append(mq.Got[key], string(val)) return nil } -func (self *MockQueue) Blpop(keys []string, timeoutsecs uint) (*string, []byte, error) { - val := <-self.ReadChannel +func (mq *MockQueue) Blpop(keys []string, timeoutsecs uint) (*string, []byte, error) { + val := <-mq.ReadChannel return &keys[0], []byte(val), nil } -func (self *MockQueue) Llen(key string) (int, error) { - return len(self.Got), nil +func (mq *MockQueue) Llen(key string) (int, error) { + mq.RLock() + defer mq.RUnlock() + return len(mq.Got), nil } func (self *MockQueue) Ltrim(key string, start int, end int) error { @@ -86,15 +96,21 @@ type RedisQueue struct { } func NewRedisQueue() Queue { - redisUrlString := os.Getenv("QUEUE_URL") + redisUrlString := os.Getenv("REDIS_PLUGIN_QUEUE_URL") if redisUrlString == "" { - glog.Fatal("QUEUE_URL cannot be empty.\nexport QUEUE_URL=redis://host:port/db_number") + glog.Fatal("REDIS_PLUGIN_QUEUE_URL cannot be empty.\nexport REDIS_PLUGIN_QUEUE_URL=redis://host:port/db_number") } redisUrl, err := url.Parse(redisUrlString) if err != nil { glog.Fatal("Could not read Redis string", err) } - redisQueue := goredis.Client{Addr: redisUrl.Host} + + redisDb, err := strconv.Atoi(strings.TrimLeft(redisUrl.Path, "/")) + if err != nil { + glog.Fatal("Could not read Redis path", err) + } + + redisQueue := goredis.Client{Addr: redisUrl.Host, Db: redisDb} s := RedisQueue{queue: &redisQueue} s.waitForRedis() return &s diff --git a/common/storage.go b/common/storage.go index 19ea13f..2e005cd 100644 --- a/common/storage.go +++ b/common/storage.go @@ -5,8 +5,8 @@ import ( "os" "time" - "github.com/bmizerany/pq" "github.com/golang/glog" + "github.com/lib/pq" ) // Storage. Wraps the database @@ -30,8 +30,8 @@ func NewMockStorage(serverPort string) Storage { "nick": "test", "password": "testxyz", "server": "127.0.0.1:" + serverPort} - channels := make([]string, 0) - channels = append(channels, "#unit") + channels := make([]*Channel, 0) + channels = append(channels, &Channel{Id: 1, Name: "#unit", Fingerprint: "5876HKJGYUT"}) botConf := &BotConfig{Id: 1, Config: conf, Channels: channels} return &MockStorage{botConfs: []*BotConfig{botConf}} @@ -66,11 +66,17 @@ func NewPostgresStorage() *PostgresStorage { if err != nil { glog.Fatal("Could not read database string", err) } - db, err := sql.Open("postgres", dataSource+" sslmode=disable") + db, err := sql.Open("postgres", dataSource+" sslmode=disable fallback_application_name=bot") if err != nil { glog.Fatal("Could not connect to database.", err) } + // The following 2 lines mitigate the leak of postgresql connection leak + // explicitly setting a maximum number of postgresql connections + db.SetMaxOpenConns(10) + // explicitly setting a maximum number of Idle postgresql connections + db.SetMaxIdleConns(2) + return &PostgresStorage{db} } @@ -81,46 +87,60 @@ func (self *PostgresStorage) BotConfig() []*BotConfig { configs := make([]*BotConfig, 0) - sql := "SELECT id, server, server_password, nick, password, real_name FROM bots_chatbot WHERE is_active=true" + sql := "SELECT id, server, server_password, nick, password, real_name, server_identifier FROM bots_chatbot WHERE is_active=true" rows, err = self.db.Query(sql) if err != nil { glog.Fatal("Error running: ", sql, " ", err) } + defer rows.Close() var chatbotId int - var server, server_password, nick, password, real_name []byte + var server, server_password, nick, password, real_name, server_identifier []byte for rows.Next() { - rows.Scan(&chatbotId, &server, &server_password, &nick, &password, &real_name) + rows.Scan( + &chatbotId, &server, &server_password, &nick, &password, + &real_name, &server_identifier) confMap := map[string]string{ - "server": string(server), - "server_password": string(server_password), - "nick": string(nick), - "password": string(password), - "realname": string(real_name), + "server": string(server), + "server_password": string(server_password), + "nick": string(nick), + "password": string(password), + "realname": string(real_name), + "server_identifier": string(server_identifier), } config := &BotConfig{ Id: chatbotId, Config: confMap, - Channels: make([]string, 0), + Channels: make([]*Channel, 0), } configs = append(configs, config) + glog.Infoln("config.Id:", config.Id) + } + channelStmt, err := self.db.Prepare("SELECT id, name, password, fingerprint FROM bots_channel WHERE is_active=true and chatbot_id=$1") + if err != nil { + glog.Fatal("[Error] Error while preparing the statements to retrieve the channel:", err) } + defer channelStmt.Close() + for i := range configs { config := configs[i] - rows, err = self.db.Query("SELECT id, name, password FROM bots_channel WHERE is_active=true and chatbot_id=$1", config.Id) + rows, err = channelStmt.Query(config.Id) if err != nil { glog.Fatal("Error running:", err) } + defer rows.Close() + var channelId int - var channelName string - var channelPwd string + var channelName, channelPwd, channelFingerprint string for rows.Next() { - rows.Scan(&channelId, &channelName, &channelPwd) - config.Channels = append(config.Channels, channelName+" "+channelPwd) + rows.Scan(&channelId, &channelName, &channelPwd, &channelFingerprint) + config.Channels = append(config.Channels, + &Channel{Id: channelId, Name: channelName, + Pwd: channelPwd, Fingerprint: channelFingerprint}) } glog.Infoln("config.Channel:", config.Channels) } @@ -187,6 +207,7 @@ func (self *PostgresStorage) channelId(name string) (int, error) { if err != nil { return -1, err } + defer rows.Close() rows.Next() rows.Scan(&channelId) diff --git a/main.go b/main.go index 854de4d..9bf049f 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,18 @@ package main import ( + _ "expvar" "flag" + "log" + "net/http" "os" "os/signal" "syscall" "github.com/BotBotMe/botbot-bot/common" "github.com/golang/glog" + _ "net/http/pprof" + ) const ( @@ -32,6 +37,9 @@ func main() { // Start the main loop go botbot.mainLoop() + // Start and http server to serve the stats from expvar + log.Fatal(http.ListenAndServe(":3030", nil)) + // Trap stop signal (Ctrl-C, kill) to exit kill := make(chan os.Signal) signal.Notify(kill, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM) diff --git a/main_test.go b/main_test.go index df0c0d8..a5131a2 100644 --- a/main_test.go +++ b/main_test.go @@ -1,10 +1,12 @@ package main import ( - "github.com/BotBotMe/botbot-bot/common" "strings" "testing" "time" + + "github.com/BotBotMe/botbot-bot/common" + "github.com/golang/glog" ) const ( @@ -12,9 +14,20 @@ const ( TEST_MSG = "q: Something new" ) +func GetQueueLength(queue *common.MockQueue) int { + queue.RLock() + q := queue.Got["q"] + queue.RUnlock() + return len(q) +} + +// TODO (yml) this test is broken because ircBot establish first a tls conn +// we need to find a better way to handle this. + // A serious integration test for BotBot. // This covers BotBot, the IRC code, and the dispatcher. func TestBotBotIRC(t *testing.T) { + common.SetGlogFlags() // Create a mock storage with configuration in it storage := common.NewMockStorage(SERVER_PORT) @@ -27,23 +40,41 @@ func TestBotBotIRC(t *testing.T) { go server.Run(t) // Run BotBot + time.Sleep(time.Second) // Sleep of one second to avoid the 5s backoff botbot := NewBotBot(storage, queue) go botbot.listen("testcmds") go botbot.mainLoop() - waitForServer(server, 5) + waitForServer(server, 4) + // this sleep allow us to keep the answer in the right order + time.Sleep(time.Second) // Test sending a reply - should probably be separate test queue.ReadChannel <- "WRITE 1 #unit I am a plugin response" waitForServer(server, 6) - checkContains(queue.Got["q"], TEST_MSG, t) + tries := 0 + queue.RLock() + q := queue.Got["q"] + queue.RUnlock() + + for len(q) < 4 && tries < 4 { + queue.RLock() + q = queue.Got["q"] + queue.RUnlock() + + glog.V(4).Infoln("[Debug] queue.Got[\"q\"]", len(q), "/", 4, q) + time.Sleep(time.Second) + tries++ + } + checkContains(q, TEST_MSG, t) // Check IRC server expectations - if len(server.Got) != 6 { - t.Fatal("Expected exactly 6 IRC messages from the bot. Got ", len(server.Got)) + if server.GotLength() != 6 { + t.Fatal("Expected exactly 6 IRC messages from the bot. Got ", server.GotLength()) } + glog.Infoln("[Debug] server.Got", server.Got) expect := []string{"PING", "USER", "NICK", "NickServ", "JOIN", "PRIVMSG"} for i := 0; i < 5; i++ { if !strings.Contains(string(server.Got[i]), expect[i]) { @@ -55,27 +86,37 @@ func TestBotBotIRC(t *testing.T) { botbot.shutdown() - tries := 0 - for len(queue.Got["q"]) < 4 && tries < 20 { - time.Sleep(50 * time.Millisecond) + tries = 0 + val := 5 + for len(q) < val && tries < val { + queue.RLock() + q = queue.Got["q"] + queue.RUnlock() + glog.V(4).Infoln("[Debug] queue.Got[\"q\"]", len(q), "/", val, q) + time.Sleep(time.Second) tries++ } + queue.RLock() checkContains(queue.Got["q"], "SHUTDOWN", t) + queue.RUnlock() } // Block until len(target.Get) is at least val, or timeout func waitForServer(target *common.MockIRCServer, val int) { - tries := 0 - for len(target.Got) < val && tries < 30 { - time.Sleep(200 * time.Millisecond) + for target.GotLength() < val && tries < val*3 { + time.Sleep(time.Millisecond * 500) + glog.V(4).Infoln("[Debug] val", target.GotLength(), "/", val, " target.Got:", target.Got) tries++ } + glog.Infoln("[Debug] waitForServer val", target.GotLength(), "/", val, " target.Got:", target.Got) + } // Check that "val" is in one of the strings in "arr". t.Error if not. func checkContains(arr []string, val string, t *testing.T) { + glog.Infoln("[Debug] checkContains", val, "in", arr) isFound := false for _, item := range arr { diff --git a/network/irc/irc.go b/network/irc/irc.go index 60552a9..617df2a 100644 --- a/network/irc/irc.go +++ b/network/irc/irc.go @@ -1,7 +1,7 @@ // IRC server connection // // Connecting to an IRC server goes like this: -// 1. Connect to the socket. Wait for a response (anything will do). +// 1. Connect to the conn. Wait for a response (anything will do). // 2. Send USER and NICK. Wait for a response (anything). // 2.5 If we have a password, wait for NickServ to ask for it, and to confirm authentication // 3. JOIN channels @@ -10,14 +10,15 @@ package irc import ( "bufio" - "bytes" "crypto/tls" "crypto/x509" + "expvar" + "fmt" "io" "net" "sort" - "strconv" "strings" + "sync" "time" "unicode/utf8" @@ -28,155 +29,312 @@ import ( ) const ( - VERSION = "botbotme v0.2" + // VERSION of the botbot-bot + VERSION = "botbot v0.3.0" + // RPL_WHOISCHANNELS IRC command code from the spec + RPL_WHOISCHANNELS = "319" +) + +type chatBotStats struct { + sync.RWMutex + m map[string]*expvar.Map +} + +func (s chatBotStats) GetOrCreate(identifier string) (*expvar.Map, bool) { + s.RLock() + chatbotStats, ok := s.m[identifier] + s.RUnlock() + if !ok { + chatbotStats = expvar.NewMap(identifier) + s.Lock() + s.m[identifier] = chatbotStats + s.Unlock() + } + return chatbotStats, ok +} + +var ( + // BotStats hold the references to the expvar.Map for each ircBot instance + BotStats = chatBotStats{m: make(map[string]*expvar.Map)} ) type ircBot struct { + sync.RWMutex id int address string - socket io.ReadWriteCloser nick string realname string password string serverPass string - fromServer chan *line.Line - channels []string - isRunning bool + serverIdentifier string + rateLimit time.Duration // Duration used to rate limit send + channels []*common.Channel isConnecting bool isAuthenticating bool + isClosed bool sendQueue chan []byte - monitorChan chan string + fromServer chan *line.Line + pingResponse chan struct{} + closing chan struct{} } +// NewBot create an irc instance of ChatBot func NewBot(config *common.BotConfig, fromServer chan *line.Line) common.ChatBot { - // realname is set to config["realname"] or config["nick"] realname := config.Config["realname"] if realname == "" { realname = config.Config["nick"] } + // Initialize the bot. chatbot := &ircBot{ - id: config.Id, - address: config.Config["server"], - nick: config.Config["nick"], - realname: realname, - password: config.Config["password"], // NickServ password - serverPass: config.Config["server_password"], // PASS password - fromServer: fromServer, - channels: config.Channels, - monitorChan: make(chan string), - isRunning: true, - } - - chatbot.init() - + id: config.Id, + address: config.Config["server"], + nick: config.Config["nick"], + realname: realname, + password: config.Config["password"], // NickServ password + serverPass: config.Config["server_password"], // PASS password + serverIdentifier: config.Config["server_identifier"], + rateLimit: time.Second, + fromServer: fromServer, + channels: config.Channels, + pingResponse: make(chan struct{}, 10), // HACK: This is to avoid the current deadlock + sendQueue: make(chan []byte, 256), + closing: make(chan struct{}), + } + + chatbotStats, ok := BotStats.GetOrCreate(chatbot.serverIdentifier) + + // Initialize the counter for the exported variable + if !ok { + chatbotStats.Add("channels", 0) + chatbotStats.Add("messages", 0) + chatbotStats.Add("received_messages", 0) + chatbotStats.Add("ping", 0) + chatbotStats.Add("pong", 0) + chatbotStats.Add("missed_ping", 0) + chatbotStats.Add("restart", 0) + chatbotStats.Add("reply_whoischannels", 0) + } + + conn := chatbot.connect() + chatbot.init(conn) return chatbot } -func (self *ircBot) GetUser() string { - return self.nick -} -// Monitor that we are still connected to the IRC server -// should run in go-routine -func (self *ircBot) monitor() { - for self.isRunning { - select { - case <-self.monitorChan: - case <-time.After(time.Minute * 10): - glog.Infoln("IRC monitoring KO ; Trying to reconnect.") - self.Close() - self.Connect() - } - } +// GetUser returns the bot.nick +func (bot *ircBot) GetUser() string { + bot.RLock() + defer bot.RUnlock() + return bot.nick } -// Ping the server every 5 min to keep the connection open -func (self *ircBot) pinger() { - i := 0 - for self.isRunning { - <-time.After(time.Minute * 5) - i = i + 1 - self.SendRaw("PING " + strconv.Itoa(i)) - } +// IsRunning the isRunning field +func (bot *ircBot) IsRunning() bool { + bot.RLock() + defer bot.RUnlock() + return !bot.isClosed } -// Connect to the IRC server and start listener -func (self *ircBot) init() { - - self.isConnecting = true - self.isAuthenticating = false +// GetStats returns the expvar.Map for this bot +func (bot *ircBot) GetStats() *expvar.Map { + stats, _ := BotStats.GetOrCreate(bot.serverIdentifier) + return stats +} - self.Connect() +// String returns the string representation of the bot +func (bot *ircBot) String() string { + bot.RLock() + defer bot.RUnlock() + return fmt.Sprintf("%s on %s", bot.nick, bot.address) +} - // Listen for incoming messages in background thread - go self.listen() +// listenSendMonitor is the main goroutine of the ircBot it listens to the conn +// send response to irc via the conn and it check that the conn is healthy if +// it is not it try to reconnect. +func (bot *ircBot) listenSendMonitor(quit chan struct{}, receive chan string, conn io.ReadWriteCloser) { + var pingTimeout <-chan time.Time + reconnect := make(chan struct{}) + // TODO maxPongWithoutMessage should probably be a field of ircBot + maxPingWithoutResponse := 1 // put it back to 3 + maxPongWithoutMessage := 150 + pongCounter := 0 + missedPing := 0 + whoisTimerChan := time.After(time.Minute * 5) - // Monitor that we are still getting incoming messages in a background thread - go self.monitor() + botStats := bot.GetStats() + for { + select { + case <-quit: + return + case <-reconnect: + glog.Infoln("IRC monitoring KO shutting down", bot) + botStats.Add("restart", 1) + err := bot.Close() + if err != nil { + glog.Errorln("An error occured while Closing the bot", bot, ": ", err) + } + return + case <-whoisTimerChan: + bot.Whois() + whoisTimerChan = time.After(time.Minute * 5) + case <-time.After(time.Second * 60): + glog.Infoln("[Info] Ping the ircBot server", pongCounter, bot) + botStats.Add("ping", 1) + bot.SendRaw("PING 1") + // Activate the ping timeout case + pingTimeout = time.After(time.Second * 10) + case <-bot.pingResponse: + // deactivate the case waiting for a pingTimeout because we got a response + pingTimeout = nil + botStats.Add("pong", 1) + pongCounter++ + if glog.V(1) { + glog.Infoln("[Info] Pong from ircBot server", bot) + } + if pongCounter > maxPongWithoutMessage { + close(reconnect) + } + case <-pingTimeout: + // Deactivate the pingTimeout case + pingTimeout = nil + botStats.Add("missed_ping", 1) + missedPing++ + glog.Infoln("[Info] No pong from ircBot server", bot, "missed", missedPing) + if missedPing > maxPingWithoutResponse { + close(reconnect) + } + case content := <-receive: + theLine, err := parseLine(content) + if err == nil { + botStats.Add("received_messages", 1) + bot.RLock() + theLine.ChatBotId = bot.id + bot.RUnlock() + bot.act(theLine) + pongCounter = 0 + missedPing = 0 + // Deactivate the pingTimeout case + pingTimeout = nil - // Pinger that Ping the server every 5 min - go self.pinger() + } else { + glog.Errorln("Invalid line:", content) + } + // Rate limit to one message every tempo + // // https://github.com/BotBotMe/botbot-bot/issues/2 + case data := <-bot.sendQueue: + glog.V(3).Infoln(bot, " Pulled data from bot.sendQueue chan:", string(data)) + if glog.V(2) { + glog.Infoln("[RAW", bot, "] -->", string(data)) + } + _, err := conn.Write(data) + if err != nil { + glog.Errorln("Error writing to conn to", bot, ": ", err) + close(reconnect) + } + botStats.Add("messages", 1) + time.Sleep(bot.rateLimit) - // Listen for outgoing messages (rate limited) in background thread - if self.sendQueue == nil { - self.sendQueue = make(chan []byte, 256) + } } - go self.sender() +} - if self.serverPass != "" { - self.SendRaw("PASS " + self.serverPass) - } +// init initializes the conn to the ircServer and start all the gouroutines +// requires to run ircBot +func (bot *ircBot) init(conn io.ReadWriteCloser) { + glog.Infoln("Init bot", bot) - self.SendRaw("PING Bonjour") -} + quit := make(chan struct{}) + receive := make(chan string) -// Connect to the server -func (self *ircBot) Connect() { + go bot.readSocket(quit, receive, conn) - if self.socket != nil { - // socket already set, unit tests need this - return + // Listen for incoming messages in background thread + go bot.listenSendMonitor(quit, receive, conn) + + go func(bot *ircBot, conn io.Closer) { + for { + select { + case <-bot.closing: + err := conn.Close() + if err != nil { + glog.Errorln("An error occured while closing the conn of", bot, err) + } + close(quit) + return + } + } + }(bot, conn) + + bot.RLock() + if bot.serverPass != "" { + bot.SendRaw("PASS " + bot.serverPass) } + bot.RUnlock() - var socket net.Conn - var err error + bot.SendRaw("PING Bonjour") +} - for { - glog.Infoln("Connecting to IRC server: ", self.address) +// connect to the server. Here we keep trying every 10 seconds until we manage +// to Dial to the server. +func (bot *ircBot) connect() (conn io.ReadWriteCloser) { - socket, err = tls.Dial("tcp", self.address, nil) // Always try TLS first - if err == nil { - glog.Infoln("Connected: TLS secure") - break - } + var ( + err error + counter int + ) - glog.Infoln("Could not connect using TLS because: ", err) + connectTimeout := time.After(0) - _, ok := err.(x509.HostnameError) - if ok { - // Certificate might not match. This happens on irc.cloudfront.net - insecure := &tls.Config{InsecureSkipVerify: true} - socket, err = tls.Dial("tcp", self.address, insecure) + bot.Lock() + bot.isConnecting = true + bot.isAuthenticating = false + bot.Unlock() - if err == nil && isCertValid(socket.(*tls.Conn)) { - glog.Infoln("Connected: TLS with awkward certificate") - break - } - } + for { + select { + case <-connectTimeout: + counter++ + connectTimeout = nil + glog.Infoln("[Info] Connecting to IRC server: ", bot.address) + conn, err = tls.Dial("tcp", bot.address, nil) // Always try TLS first + if err == nil { + glog.Infoln("Connected: TLS secure") + return conn + } else if _, ok := err.(x509.HostnameError); ok { + glog.Errorln("Could not connect using TLS because: ", err) + // Certificate might not match. This happens on irc.cloudfront.net + insecure := &tls.Config{InsecureSkipVerify: true} + conn, err = tls.Dial("tcp", bot.address, insecure) + + if err == nil && isCertValid(conn.(*tls.Conn)) { + glog.Errorln("Connected: TLS with awkward certificate") + return conn + } + } else if _, ok := err.(x509.UnknownAuthorityError); ok { + glog.Errorln("x509.UnknownAuthorityError : ", err) + insecure := &tls.Config{InsecureSkipVerify: true} + conn, err = tls.Dial("tcp", bot.address, insecure) + if err == nil { + glog.Infoln("Connected: TLS with an x509.UnknownAuthorityError", err) + return conn + } + } else { + glog.Errorln("Could not establish a tls connection", err) - socket, err = net.Dial("tcp", self.address) + } - if err == nil { - glog.Infoln("Connected: Plain text insecure") - break + conn, err = net.Dial("tcp", bot.address) + if err == nil { + glog.Infoln("Connected: Plain text insecure") + return conn + } + // TODO (yml) At some point we might want to panic + delay := 5 * counter + glog.Infoln("IRC Connect error. Will attempt to re-connect. ", err, "in", delay, "seconds") + connectTimeout = time.After(time.Duration(delay) * time.Second) } - - glog.Infoln("IRC Connect error. Will attempt to re-connect. ", err) - time.Sleep(1 * time.Second) } - - self.socket = socket } /* Check that the TLS connection's certficate can be applied to this connection. @@ -193,12 +351,11 @@ func isCertValid(conn *tls.Conn) bool { // Cert has single name, the usual case return isIPMatch(cert.Subject.CommonName, connAddr) - } else { - // Cert has several valid names - for _, certname := range cert.DNSNames { - if isIPMatch(certname, connAddr) { - return true - } + } + // Cert has several valid names + for _, certname := range cert.DNSNames { + if isIPMatch(certname, connAddr) { + return true } } @@ -211,13 +368,13 @@ func isIPMatch(hostname string, connIP string) bool { addrs, err := net.LookupIP(hostname) if err != nil { - glog.Errorln("Error DNS lookup of "+hostname+": ", err) + glog.Errorln("Error DNS lookup of ", hostname, ": ", err) return false } for _, ip := range addrs { if ip.String() == connIP { - glog.Infoln("Accepting certificate anyway. " + hostname + " has same IP as connection") + glog.Infoln("Accepting certificate anyway. ", hostname, " has same IP as connection") return true } } @@ -225,217 +382,215 @@ func isIPMatch(hostname string, connIP string) bool { } // Update bot configuration. Called when webapp changes a chatbot's config. -func (self *ircBot) Update(config *common.BotConfig) { +func (bot *ircBot) Update(config *common.BotConfig) { - isNewServer := self.updateServer(config.Config) + isNewServer := bot.updateServer(config) if isNewServer { + glog.Infoln("[Info] the config is from a new server.") // If the server changed, we've already done nick and channel changes too return } + glog.Infoln("[Info] bot.Update -- It is not a new server.") - self.updateNick(config.Config["nick"], config.Config["password"]) - self.updateChannels(config.Channels) + bot.updateNick(config.Config["nick"], config.Config["password"]) + bot.updateChannels(config.Channels) } // Update the IRC server we're connected to -func (self *ircBot) updateServer(config map[string]string) bool { +func (bot *ircBot) updateServer(config *common.BotConfig) bool { - addr := config["server"] - if addr == self.address { + addr := config.Config["server"] + if addr == bot.address { return false } - glog.Infoln("Changing IRC server from ", self.address, " to ", addr) + glog.Infoln("[Info] Changing IRC server from ", bot.address, " to ", addr) - self.Close() - time.Sleep(1 * time.Second) // Wait for timeout to be sure listen has stopped + err := bot.Close() + if err != nil { + glog.Errorln("An error occured while Closing the bot", bot, ": ", err) + } - self.address = addr - self.nick = config["nick"] - self.password = config["password"] - self.channels = splitChannels(config["rooms"]) + bot.address = addr + bot.nick = config.Config["nick"] + bot.password = config.Config["password"] + bot.channels = config.Channels - self.init() + conn := bot.connect() + bot.init(conn) return true } // Update the nickname we're registered under, if needed -func (self *ircBot) updateNick(newNick, newPass string) { - if newNick == self.nick { +func (bot *ircBot) updateNick(newNick, newPass string) { + glog.Infoln("[Info] Starting bot.updateNick()") + + bot.RLock() + nick := bot.nick + bot.RUnlock() + if newNick == nick { + glog.Infoln("[Info] bot.updateNick() -- the nick has not changed so return") return } + glog.Infoln("[Info] bot.updateNick() -- set the new nick") - self.nick = newNick - self.password = newPass - self.setNick() + bot.Lock() + bot.nick = newNick + bot.password = newPass + bot.Unlock() + bot.setNick() } // Update the channels based on new configuration, leaving old ones and joining new ones -func (self *ircBot) updateChannels(newChannels []string) { +func (bot *ircBot) updateChannels(newChannels []*common.Channel) { + glog.Infoln("[Info] Starting bot.updateChannels") + bot.RLock() + channels := bot.channels + bot.RUnlock() + + glog.V(3).Infoln("[Debug] newChannels: ", newChannels, "bot.channels:", channels) - if isEqual(newChannels, self.channels) { + if isEqual(newChannels, channels) { + if glog.V(2) { + glog.Infoln("Channels comparison is equals for bot: ", bot.nick) + } return } + glog.Infoln("[Info] The channels the bot is connected to need to be updated") // PART old ones - for _, channel := range self.channels { + for _, channel := range channels { if !isIn(channel, newChannels) { - self.part(channel) + glog.Infoln("[Info] Parting new channel: ", channel.Credential()) + bot.part(channel.Credential()) } } // JOIN new ones for _, channel := range newChannels { - if !isIn(channel, self.channels) { - self.join(channel) + if !isIn(channel, channels) { + glog.Infoln("[Info] Joining new channel: ", channel.Credential()) + bot.join(channel.Credential()) } } - - self.channels = newChannels + bot.Lock() + bot.channels = newChannels + bot.Unlock() } // Join channels -func (self *ircBot) JoinAll() { - for _, channel := range self.channels { - self.join(channel) +func (bot *ircBot) JoinAll() { + for _, channel := range bot.channels { + bot.join(channel.Credential()) } } +// Whois is used to query information about the bot +func (bot *ircBot) Whois() { + bot.SendRaw("WHOIS " + bot.nick) +} + // Join an IRC channel -func (self *ircBot) join(channel string) { - self.SendRaw("JOIN " + channel) +func (bot *ircBot) join(channel string) { + bot.SendRaw("JOIN " + channel) } // Leave an IRC channel -func (self *ircBot) part(channel string) { - self.SendRaw("PART " + channel) +func (bot *ircBot) part(channel string) { + bot.SendRaw("PART " + channel) + botStats := bot.GetStats() + botStats.Add("channels", -1) } // Send a regular (non-system command) IRC message -func (self *ircBot) Send(channel, msg string) { +func (bot *ircBot) Send(channel, msg string) { fullmsg := "PRIVMSG " + channel + " :" + msg - self.SendRaw(fullmsg) + bot.SendRaw(fullmsg) } -// Send message down socket. Add \n at end first. -func (self *ircBot) SendRaw(msg string) { - self.sendQueue <- []byte(msg + "\n") +// Send message down conn. Add \n at end first. +func (bot *ircBot) SendRaw(msg string) { + bot.sendQueue <- []byte(msg + "\n") } // Tell the irc server who we are - we can't do anything until this is done. -func (self *ircBot) login() { +func (bot *ircBot) login() { - self.isAuthenticating = true + bot.isAuthenticating = true // We use the botname as the 'realname', because bot's don't have real names! - self.SendRaw("USER " + self.nick + " 0 * :" + self.realname) + bot.SendRaw("USER " + bot.nick + " 0 * :" + bot.realname) - self.setNick() + bot.setNick() } -// Tell the network our nick -func (self *ircBot) setNick() { - self.SendRaw("NICK " + self.nick) +// Tell the network our +func (bot *ircBot) setNick() { + bot.SendRaw("NICK " + bot.nick) } // Tell NickServ our password -func (self *ircBot) sendPassword() { - self.Send("NickServ", "identify "+self.password) -} - -// Actually really send message to the server. Implements rate limiting. -// Should run in go-routine. -func (self *ircBot) sender() { - - var data []byte - var twoSeconds = time.Second * 2 - var err error - - for self.isRunning { - - data = <-self.sendQueue - if glog.V(1) { - glog.Infoln("[RAW"+strconv.Itoa(self.id)+"] -->", string(data)) - } - - _, err = self.socket.Write(data) - if err != nil { - self.isRunning = false - glog.Errorln("Error writing to socket", err) - glog.Infoln("Stopping chatbot. Monitor can restart it.") - self.Close() - } - - // Rate limit to one message every 2 seconds - // https://github.com/BotBotMe/botbot-bot/issues/2 - time.Sleep(twoSeconds) - } +func (bot *ircBot) sendPassword() { + bot.Send("NickServ", "identify "+bot.password) } -// Listen for incoming messages. Parse them and put on channel. -// Should run in go-routine -func (self *ircBot) listen() { - - var contentData []byte - var content string - var err error +// Read from the conn +func (bot *ircBot) readSocket(quit chan struct{}, receive chan string, conn io.ReadWriteCloser) { - bufRead := bufio.NewReader(self.socket) - for self.isRunning { - contentData, err = bufRead.ReadBytes('\n') + bufRead := bufio.NewReader(conn) + for { + select { + case <-quit: + return + default: + contentData, err := bufRead.ReadBytes('\n') + if err != nil { + netErr, ok := err.(net.Error) + if ok && netErr.Timeout() == true { + continue + } else { + glog.Errorln("An Error occured while reading from conn ", err) + return + } + } - if err != nil { - netErr, ok := err.(net.Error) - if ok && netErr.Timeout() == true { + if len(contentData) == 0 { continue - - } else if !self.isRunning { - // Close() wants us to stop - return - - } else { - glog.Errorln("Lost IRC server connection. ", err) - self.Close() - return } - } - - if len(contentData) == 0 { - continue - } - - content = toUnicode(contentData) - if glog.V(1) { - glog.Infoln("[RAW" + strconv.Itoa(self.id) + "]" + content) - } - - theLine, err := parseLine(content) - if err == nil { - theLine.ChatBotId = self.id - self.act(theLine) - } else { - glog.Errorln("Invalid line:", content) + content := toUnicode(contentData) + if glog.V(2) { + glog.Infoln("[RAW", bot, "] <--", content) + } + receive <- content } - } } -func (self *ircBot) act(theLine *line.Line) { - - // Send the command on the monitorChan - self.monitorChan <- theLine.Command +func (bot *ircBot) act(theLine *line.Line) { + // Notify the monitor goroutine that we receive a PONG + if theLine.Command == "PONG" { + if glog.V(2) { + glog.Infoln("Sending the signal in bot.pingResponse") + } + bot.pingResponse <- struct{}{} + return + } + bot.RLock() + isConnecting := bot.isConnecting + bot.RUnlock() // As soon as we receive a message from the server, complete initiatization - if self.isConnecting { - self.isConnecting = false - self.login() + if isConnecting { + bot.Lock() + bot.isConnecting = false + bot.Unlock() + bot.login() return } // NickServ interactions - isNickServ := strings.Contains(theLine.User, "NickServ") // freenode, coldfront @@ -451,62 +606,77 @@ func (self *ircBot) act(theLine *line.Line) { if isNickServ { if isAskingForPW { - self.sendPassword() + bot.sendPassword() return } else if isConfirm { - self.isAuthenticating = false - self.JoinAll() + bot.Lock() + bot.isAuthenticating = false + bot.Unlock() + bot.JoinAll() return } } // After USER / NICK is accepted, join all the channels, // assuming we don't need to identify with NickServ - - if self.isAuthenticating && len(self.password) == 0 { - self.isAuthenticating = false - self.JoinAll() + bot.RLock() + shouldIdentify := bot.isAuthenticating && len(bot.password) == 0 + bot.RUnlock() + if shouldIdentify { + bot.Lock() + bot.isAuthenticating = false + bot.Unlock() + bot.JoinAll() return } if theLine.Command == "PING" { // Reply, and send message on to client - self.SendRaw("PONG " + theLine.Content) + bot.SendRaw("PONG " + theLine.Content) } else if theLine.Command == "VERSION" { versionMsg := "NOTICE " + theLine.User + " :\u0001VERSION " + VERSION + "\u0001\n" - self.SendRaw(versionMsg) + bot.SendRaw(versionMsg) + } else if theLine.Command == RPL_WHOISCHANNELS { + glog.Infoln("[Info] reply_whoischannels -- len:", + len(strings.Split(theLine.Content, " ")), "content:", theLine.Content) + botStats := bot.GetStats() + botStats.Add("reply_whoischannels", int64(len(strings.Split(theLine.Content, " ")))) } - self.fromServer <- theLine + bot.fromServer <- theLine } -func (self *ircBot) IsRunning() bool { - return self.isRunning -} - -func (self *ircBot) Close() error { - self.sendShutdown() - self.isRunning = false - return self.socket.Close() +// Close ircBot +func (bot *ircBot) Close() (err error) { + // Send a signal to all goroutine to return + glog.Infoln("[Info] Closing bot.") + bot.sendShutdown() + close(bot.closing) + bot.Lock() + bot.isClosed = true + bot.Unlock() + return err } // Send a non-standard SHUTDOWN message to the plugins // This allows them to know that this channel is offline -func (self *ircBot) sendShutdown() { - +func (bot *ircBot) sendShutdown() { + glog.Infoln("[Info] Logging Shutdown command in the channels monitored by:", bot) + bot.RLock() shutLine := &line.Line{ Command: "SHUTDOWN", Received: time.Now().UTC().Format(time.RFC3339Nano), - ChatBotId: self.id, - User: self.nick, + ChatBotId: bot.id, + User: bot.nick, Raw: "", Content: ""} - for _, channel := range self.channels { - shutLine.Channel = channel - self.fromServer <- shutLine + for _, channel := range bot.channels { + shutLine.Channel = channel.Credential() + bot.fromServer <- shutLine } + bot.RUnlock() } /* @@ -516,8 +686,7 @@ func (self *ircBot) sendShutdown() { // Split a string into sorted array of strings: // e.g. "#bob, #alice" becomes ["#alice", "#bob"] func splitChannels(rooms string) []string { - - var channels []string = make([]string, 0) + var channels = make([]string, 0) for _, s := range strings.Split(rooms, ",") { channels = append(channels, strings.TrimSpace(s)) } @@ -668,14 +837,36 @@ func toUnicode(data []byte) string { } // Are a and b equal? -func isEqual(a, b []string) bool { - ja := strings.Join(a, ",") - jb := strings.Join(b, ",") - return bytes.Equal([]byte(ja), []byte(jb)) +func isEqual(a, b []*common.Channel) (flag bool) { + if len(a) == len(b) { + for _, aCc := range a { + flag = false + for _, bCc := range b { + if aCc.Fingerprint == bCc.Fingerprint { + flag = true + break + } + } + if flag == false { + return flag + } + } + return true + } + return false } // Is a in b? container must be sorted -func isIn(a string, container []string) bool { - index := sort.SearchStrings(container, a) - return index < len(container) && container[index] == a +func isIn(a *common.Channel, channels []*common.Channel) (flag bool) { + flag = false + for _, cc := range channels { + if a.Fingerprint == cc.Fingerprint { + flag = true + break + } + } + if flag == false { + return flag + } + return true } diff --git a/network/irc/irc_test.go b/network/irc/irc_test.go index c163a80..7e548dd 100644 --- a/network/irc/irc_test.go +++ b/network/irc/irc_test.go @@ -8,13 +8,15 @@ import ( "github.com/BotBotMe/botbot-bot/common" "github.com/BotBotMe/botbot-bot/line" + "github.com/golang/glog" ) -const ( - NEW_CHANNEL = "#unitnew" +var ( + NEW_CHANNEL = common.Channel{Name: "#unitnew", Fingerprint: "new-channel-uuid"} ) func TestParseLine_welcome(t *testing.T) { + common.SetGlogFlags() line1 := ":barjavel.freenode.net 001 graham_king :Welcome to the freenode Internet Relay Chat Network graham_king" line, err := parseLine(line1) @@ -32,6 +34,7 @@ func TestParseLine_welcome(t *testing.T) { } func TestParseLine_privmsg(t *testing.T) { + common.SetGlogFlags() line1 := ":rnowak!~rnowak@q.ovron.com PRIVMSG #linode :totally" line, err := parseLine(line1) @@ -57,6 +60,7 @@ func TestParseLine_privmsg(t *testing.T) { } func TestParseLine_pm(t *testing.T) { + common.SetGlogFlags() line1 := ":graham_king!graham_kin@i.love.debian.org PRIVMSG botbotme :hello" line, err := parseLine(line1) @@ -77,6 +81,7 @@ func TestParseLine_pm(t *testing.T) { } func TestParseLine_list(t *testing.T) { + common.SetGlogFlags() line1 := ":oxygen.oftc.net 322 graham_king #linode 412 :Linode Community Support | http://www.linode.com/ | Linodes in Asia-Pacific! - http://bit.ly/ooBzhV" line, err := parseLine(line1) @@ -102,6 +107,7 @@ func TestParseLine_list(t *testing.T) { } func TestParseLine_quit(t *testing.T) { + common.SetGlogFlags() line1 := ":nicolaslara!~nicolasla@c83-250-0-151.bredband.comhem.se QUIT :" line, err := parseLine(line1) if err != nil { @@ -113,6 +119,7 @@ func TestParseLine_quit(t *testing.T) { } func TestParseLine_part(t *testing.T) { + common.SetGlogFlags() line1 := ":nicolaslara!~nicolasla@c83-250-0-151.bredband.comhem.se PART #lincolnloop-internal" line, err := parseLine(line1) if err != nil { @@ -127,6 +134,7 @@ func TestParseLine_part(t *testing.T) { } func TestParseLine_353(t *testing.T) { + common.SetGlogFlags() line1 := ":hybrid7.debian.local 353 botbot = #test :@botbot graham_king" line, err := parseLine(line1) if err != nil { @@ -143,50 +151,32 @@ func TestParseLine_353(t *testing.T) { } } -// Dummy implementation of ReadWriteCloser -type MockSocket struct { - received []string - counter chan bool -} - -func (self *MockSocket) Write(data []byte) (int, error) { - self.received = append(self.received, string(data)) - if self.counter != nil { - self.counter <- true - } - return len(data), nil -} - -func (self *MockSocket) Read(into []byte) (int, error) { - time.Sleep(time.Second) // Prevent busy loop - return 0, nil -} - -func (self *MockSocket) Close() error { - return nil -} - // Test sending messages too fast func TestFlood(t *testing.T) { + common.SetGlogFlags() NUM := 5 fromServer := make(chan *line.Line) receivedCounter := make(chan bool) - mockSocket := MockSocket{counter: receivedCounter} + mockSocket := common.MockSocket{Counter: receivedCounter} + channels := make([]*common.Channel, 1) + channels = append(channels, &common.Channel{Name: "test", Fingerprint: "uuid-string"}) chatbot := &ircBot{ - id: 99, - address: "localhost", - nick: "test", - realname: "Unit Test", - password: "test", - fromServer: fromServer, - channels: []string{"test"}, - socket: &mockSocket, - isRunning: true, - } - chatbot.init() + id: 99, + address: "fakehost", + nick: "test", + realname: "Unit Test", + password: "test", + serverIdentifier: "localhost.test", + rateLimit: time.Second, + fromServer: fromServer, + channels: channels, + pingResponse: make(chan struct{}, 10), // HACK: This is to avoid the current deadlock + sendQueue: make(chan []byte, 256), + } + chatbot.init(mockSocket) startTime := time.Now() @@ -196,14 +186,13 @@ func TestFlood(t *testing.T) { } // Wait for them to 'arrive' at the socket - for numGot := 0; numGot < NUM; numGot++ { + for numGot := 0; numGot <= NUM; numGot++ { <-receivedCounter } elapsed := int64(time.Since(startTime)) - // NUM messages should take at least ((NUM-1) / 4) seconds (max 4 msgs second) - expected := int64((NUM-1)/4) * int64(time.Second) + expected := int64((NUM-1)/4) * int64(chatbot.rateLimit) if elapsed < expected { t.Error("Flood prevention did not work") } @@ -212,62 +201,56 @@ func TestFlood(t *testing.T) { // Test joining additional channels func TestUpdate(t *testing.T) { + common.SetGlogFlags() + glog.Infoln("[DEBUG] starting TestUpdate") fromServer := make(chan *line.Line) - mockSocket := MockSocket{counter: nil} + receiver := make(chan string, 10) + mockSocket := common.MockSocket{Receiver: receiver} + channels := make([]*common.Channel, 0, 2) + channel := common.Channel{Name: "#test", Fingerprint: "uuid-string"} + channels = append(channels, &channel) chatbot := &ircBot{ - id: 99, - address: "localhost", - nick: "test", - realname: "Unit Test", - password: "test", - fromServer: fromServer, - channels: []string{"#test"}, - socket: &mockSocket, - sendQueue: make(chan []byte, 100), - isRunning: true, - } - // Rate limiting requires a go-routine to actually do the sending - go chatbot.sender() - + id: 99, + address: "localhost", + nick: "test", + realname: "Unit Test", + password: "test", + serverIdentifier: "localhost.test1", + fromServer: fromServer, + channels: channels, + rateLimit: time.Second, + pingResponse: make(chan struct{}, 10), // HACK: This is to avoid the current deadlock + sendQueue: make(chan []byte, 256), + } + chatbot.init(mockSocket) conf := map[string]string{ "nick": "test", "password": "testxyz", "server": "localhost"} - channels := []string{"#test", NEW_CHANNEL} + channels = append(channels, &NEW_CHANNEL) newConfig := &common.BotConfig{Id: 1, Config: conf, Channels: channels} + // TODO (yml) there is probably better than sleeping but we need to wait + // until chatbot is fully ready + time.Sleep(time.Second * 2) chatbot.Update(newConfig) - - // Wait a bit - for i := 0; i < 10; i++ { - time.Sleep(time.Second / 3) - if len(mockSocket.received) >= 1 { - break - } - } - - // Expect a JOIN of NEW_CHANNEL but NOT a JOIN on #test (because already in there) isFound := false - for _, cmd := range mockSocket.received { - - cmd = strings.TrimSpace(cmd) - - if cmd == "JOIN "+NEW_CHANNEL { + for received := range mockSocket.Receiver { + glog.Infoln("[DEBUG] received", received) + if strings.TrimSpace(received) == "JOIN "+NEW_CHANNEL.Credential() { isFound = true - break - } - - if cmd == "JOIN #test" { + close(mockSocket.Receiver) + } else if received == "JOIN #test" { t.Error("Should not rejoin channels already in, can cause flood") } } - if !isFound { - t.Error("Expected JOIN " + NEW_CHANNEL) + t.Error("Expected JOIN " + NEW_CHANNEL.Credential()) } } func TestToUnicodeUTF8(t *testing.T) { + common.SetGlogFlags() msg := "ελληνικά" result := toUnicode([]byte(msg)) if result != msg { @@ -276,6 +259,7 @@ func TestToUnicodeUTF8(t *testing.T) { } func TestToUnicodeLatin1(t *testing.T) { + common.SetGlogFlags() msg := "âôé" latin1_bytes := []byte{0xe2, 0xf4, 0xe9} result := toUnicode(latin1_bytes) @@ -285,6 +269,7 @@ func TestToUnicodeLatin1(t *testing.T) { } func TestSplitChannels(t *testing.T) { + common.SetGlogFlags() input := "#aone, #btwo, #cthree" result := splitChannels(input) if len(result) != 3 || result[2] != "#cthree" { diff --git a/network/network.go b/network/network.go index 0332769..3d346f0 100644 --- a/network/network.go +++ b/network/network.go @@ -2,6 +2,7 @@ package network import ( "sort" + "sync" "time" "github.com/golang/glog" @@ -12,6 +13,7 @@ import ( ) type NetworkManager struct { + sync.RWMutex chatbots map[int]common.ChatBot fromServer chan *line.Line storage common.Storage @@ -30,15 +32,23 @@ func NewNetworkManager(storage common.Storage, fromServer chan *line.Line) *Netw return netMan } +func (nm *NetworkManager) IsRunning() bool { + nm.RLock() + defer nm.RUnlock() + return nm.isRunning +} + // Get the User for a ChatbotId -func (self *NetworkManager) GetUserByChatbotId(id int) string { - return self.getChatbotById(id).GetUser() +func (nm *NetworkManager) GetUserByChatbotId(id int) string { + return nm.getChatbotById(id).GetUser() } // Connect to networks / start chatbots. Loads chatbot configuration from DB. -func (self *NetworkManager) RefreshChatbots() { - - botConfigs := self.storage.BotConfig() +func (nm *NetworkManager) RefreshChatbots() { + if glog.V(2) { + glog.Infoln("Entering in NetworkManager.RefreshChatbots") + } + botConfigs := nm.storage.BotConfig() var current common.ChatBot var id int @@ -49,50 +59,65 @@ func (self *NetworkManager) RefreshChatbots() { id = config.Id active = append(active, id) - current = self.chatbots[id] + nm.RLock() + current = nm.chatbots[id] + nm.RUnlock() if current == nil { // Create - self.chatbots[id] = self.Connect(config) + if glog.V(2) { + glog.Infoln("Connect the bot with the following config:", config) + } + nm.Lock() + nm.chatbots[id] = nm.Connect(config) + nm.Unlock() } else { // Update - self.chatbots[id].Update(config) + if glog.V(2) { + glog.Infoln("Update the bot with the following config:", config) + } + nm.chatbots[id].Update(config) } } // Stop old ones - active.Sort() numActive := len(active) - - for currId, _ := range self.chatbots { + nm.Lock() + for currId := range nm.chatbots { if active.Search(currId) == numActive { // if currId not in active: glog.Infoln("Stopping chatbot: ", currId) - self.chatbots[currId].Close() - delete(self.chatbots, currId) + nm.chatbots[currId].Close() + delete(nm.chatbots, currId) } } + nm.Unlock() + if glog.V(2) { + glog.Infoln("Exiting NetworkManager.RefreshChatbots") + } + } -func (self *NetworkManager) Connect(config *common.BotConfig) common.ChatBot { +func (nm *NetworkManager) Connect(config *common.BotConfig) common.ChatBot { - glog.Infoln("Creating chatbot: %+v\n", config) - return irc.NewBot(config, self.fromServer) + glog.Infoln("Creating chatbot as:,", config) + return irc.NewBot(config, nm.fromServer) } -func (self *NetworkManager) Send(chatbotId int, channel, msg string) { - self.chatbots[chatbotId].Send(channel, msg) +func (nm *NetworkManager) Send(chatbotId int, channel, msg string) { + nm.RLock() + nm.chatbots[chatbotId].Send(channel, msg) + nm.RUnlock() } // Check out chatbots are alive, recreating them if not. Run this in go-routine. -func (self *NetworkManager) MonitorChatbots() { - - for self.isRunning { - for id, bot := range self.chatbots { +func (nm *NetworkManager) MonitorChatbots() { + for nm.IsRunning() { + for id, bot := range nm.chatbots { if !bot.IsRunning() { - self.restart(id) + nm.restart(id) } } time.Sleep(1 * time.Second) @@ -100,12 +125,14 @@ func (self *NetworkManager) MonitorChatbots() { } // get a chatbot by id -func (self *NetworkManager) getChatbotById(id int) common.ChatBot { - return self.chatbots[id] +func (nm *NetworkManager) getChatbotById(id int) common.ChatBot { + nm.RLock() + defer nm.RUnlock() + return nm.chatbots[id] } // Restart a chatbot -func (self *NetworkManager) restart(botId int) { +func (nm *NetworkManager) restart(botId int) { glog.Infoln("Restarting bot ", botId) @@ -113,7 +140,7 @@ func (self *NetworkManager) restart(botId int) { // Find configuration for this bot - botConfigs := self.storage.BotConfig() + botConfigs := nm.storage.BotConfig() for _, botConf := range botConfigs { if botConf.Id == botId { config = botConf @@ -123,17 +150,24 @@ func (self *NetworkManager) restart(botId int) { if config == nil { glog.Infoln("Could not find configuration for bot ", botId, ". Bot will not run.") - delete(self.chatbots, botId) + delete(nm.chatbots, botId) return } - self.chatbots[botId] = self.Connect(config) + nm.Lock() + nm.chatbots[botId] = nm.Connect(config) + nm.Unlock() } // Stop all bots -func (self *NetworkManager) Shutdown() { - self.isRunning = false - for _, bot := range self.chatbots { - bot.Close() +func (nm *NetworkManager) Shutdown() { + nm.Lock() + nm.isRunning = false + for _, bot := range nm.chatbots { + err := bot.Close() + if err != nil { + glog.Errorln("An error occured while Closing the bot", bot, ": ", err) + } } + nm.Unlock() }