diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index 2986dc5..0000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,51 +0,0 @@ -version: 2.1 -jobs: - test: - docker: - - image: cimg/go:1.16.7 - - image: lbry/lbrynet-tv:0.101.2 - steps: - - checkout - - restore_cache: - keys: - - go-mod-cache-v4-{{ checksum "go.sum" }} - - run: make test_circleci - - run: git reset --hard HEAD - - save_cache: - key: go-mod-cache-v4-{{ checksum "go.sum" }} - paths: - - "/go/pkg/mod" - release: - docker: - - image: cimg/go:1.16.7 - steps: - - checkout - - restore_cache: - keys: - - go-mod-cache-v4-{{ checksum "go.sum" }} - - setup_remote_docker - - run: - name: Build release image - command: | - make linux - make image - - run: - name: Publish Docker image to Docker Hub - command: | - echo "$DOCKERHUB_PASS" | docker login -u "$DOCKERHUB_USERNAME" --password-stdin - make publish_image - - -workflows: - version: 2 - integrate: - jobs: - - test - - release: - requires: - - test - filters: - branches: - only: ["master"] - tags: - only: /v[0-9]+(\.[0-9]+)*(-.*)*/ diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index a0a9700..be647d2 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -24,7 +24,8 @@ jobs: - name: Check running containers run: docker ps -a - - uses: actions/setup-go@v3 + - name: Set up Go 1.20 + uses: actions/setup-go@v3 with: go-version: '1.20.x' id: go diff --git a/cmd/main.go b/cmd/main.go index bd29ed1..d5e4d4d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,7 +2,7 @@ package cmd import ( "fmt" - "io/ioutil" + "io" "net/http" "os" "time" @@ -260,7 +260,7 @@ func initPubkey() { if err != nil { l.Fatal(err) } - rawKey, err := ioutil.ReadAll(r.Body) + rawKey, err := io.ReadAll(r.Body) if err != nil { l.Fatal(err) } diff --git a/firewall/firewall.go b/firewall/firewall.go new file mode 100644 index 0000000..593ea68 --- /dev/null +++ b/firewall/firewall.go @@ -0,0 +1,54 @@ +package firewall + +import ( + "errors" + "sync" + "time" + + "github.com/bluele/gcache" +) + +var WindowSize = 120 * time.Second + +const MaxStringsPerIp = 6 + +var resourcesForIPCache = gcache.New(1000).Simple().Build() +var whitelist = map[string]bool{ + "51.210.0.171": true, +} + +func CheckAndRateLimitIp(ip string, endpoint string) (bool, int) { + if ip == "" { + return false, 0 + } + if whitelist[ip] { + return false, 0 + } + resources, err := resourcesForIPCache.Get(ip) + if errors.Is(err, gcache.KeyNotFoundError) { + tokensMap := &sync.Map{} + tokensMap.Store(endpoint, time.Now()) + err := resourcesForIPCache.SetWithExpire(ip, tokensMap, WindowSize*10) + if err != nil { + return false, 1 + } + return false, 1 + } + tokensForIP, _ := resources.(*sync.Map) + currentTime := time.Now() + tokensForIP.Store(endpoint, currentTime) + resourcesCount := 0 + flagged := false + tokensForIP.Range(func(k, v interface{}) bool { + if currentTime.Sub(v.(time.Time)) > WindowSize { + tokensForIP.Delete(k) + return true + } + resourcesCount++ + if !flagged && resourcesCount > MaxStringsPerIp { + flagged = true + } + return true + }) + return flagged, resourcesCount +} diff --git a/firewall/firewall_test.go b/firewall/firewall_test.go new file mode 100644 index 0000000..8ab09f4 --- /dev/null +++ b/firewall/firewall_test.go @@ -0,0 +1,31 @@ +package firewall + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestCheckIPAccess(t *testing.T) { + ip := "192.168.0.1" + endpoint := "/api/v1/example" + WindowSize = 7 * time.Second + // Test the first five accesses for an IP don't exceed the limit + for i := 1; i <= 6; i++ { + result, _ := CheckAndRateLimitIp(ip, endpoint+strconv.Itoa(i)) + assert.False(t, result, "Expected result to be false, got %v for endpoint %s", result, endpoint+strconv.Itoa(i)) + } + + // Test the sixth access for an IP exceeds the limit + result, _ := CheckAndRateLimitIp(ip, endpoint+"7") + assert.True(t, result, "Expected result to be true, got %v for endpoint %s", result, endpoint+"7") + + // Wait for the window size to elapse + time.Sleep(WindowSize) + + // Test the access for an IP after the window size elapses doesn't exceed the limit + result, _ = CheckAndRateLimitIp(ip, endpoint+"7") + assert.False(t, result, "Expected result to be false, got %v for endpoint %s", result, endpoint+"7") +} diff --git a/init.sql b/init.sql new file mode 100644 index 0000000..0e6151e --- /dev/null +++ b/init.sql @@ -0,0 +1,14 @@ +use godycdn; +CREATE TABLE `object` +( + `id` bigint unsigned NOT NULL AUTO_INCREMENT, + `hash` char(64) COLLATE utf8_unicode_ci NOT NULL, + `is_stored` tinyint(1) NOT NULL DEFAULT '0', + `length` bigint unsigned DEFAULT NULL, + `last_accessed_at` timestamp NULL DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `id` (`id`), + UNIQUE KEY `hash_idx` (`hash`), + KEY `last_accessed_idx` (`last_accessed_at`), + KEY `is_stored_idx` (`is_stored`) +); \ No newline at end of file diff --git a/internal/iapi/api.go b/internal/iapi/api.go index c4db1c7..bd89a00 100644 --- a/internal/iapi/api.go +++ b/internal/iapi/api.go @@ -2,7 +2,7 @@ package iapi import ( "encoding/json" - "io/ioutil" + "io" "net/http" "time" @@ -43,7 +43,7 @@ func GetBlockedContent() (map[string]bool, error) { if res.StatusCode != 200 { return nil, errors.Err("unexpected status code %d", res.StatusCode) } - body, err := ioutil.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) if err != nil { return nil, errors.Err(err) } diff --git a/pkg/paid/handlers_test.go b/pkg/paid/handlers_test.go index 15b13ba..4e372e8 100644 --- a/pkg/paid/handlers_test.go +++ b/pkg/paid/handlers_test.go @@ -1,7 +1,7 @@ package paid import ( - "io/ioutil" + "io" "net/http" "net/http/httptest" "testing" @@ -18,7 +18,7 @@ func TestHandlePublicKeyRequest(t *testing.T) { HandlePublicKeyRequest(rr, r) response := rr.Result() - key, err := ioutil.ReadAll(response.Body) + key, err := io.ReadAll(response.Body) require.NoError(t, err) assert.Equal(t, http.StatusOK, response.StatusCode) assert.NotZero(t, key) diff --git a/player/http_handlers.go b/player/http_handlers.go index 6630976..a6a9433 100644 --- a/player/http_handlers.go +++ b/player/http_handlers.go @@ -1,6 +1,7 @@ package player import ( + "encoding/json" "errors" "fmt" "net/http" @@ -11,6 +12,7 @@ import ( "strings" "time" + "github.com/OdyseeTeam/player-server/firewall" "github.com/OdyseeTeam/player-server/internal/iapi" "github.com/OdyseeTeam/player-server/internal/metrics" "github.com/OdyseeTeam/player-server/pkg/app" @@ -19,6 +21,7 @@ import ( "github.com/getsentry/sentry-go" "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" ) // SpeechPrefix is root level prefix for speech URLs. @@ -61,11 +64,61 @@ func NewRequestHandler(p *Player) *RequestHandler { } var bannedIPs = map[string]bool{ - "96.76.237.222": true, - "45.47.236.87": true, - "154.53.32.121": true, + "96.76.237.222": true, + "45.47.236.87": true, + "154.53.32.121": true, + "51.222.12.22": true, + "65.108.133.222": true, + "135.181.178.92": true, + "65.21.95.58": true, + "80.129.211.95": true, + "5.161.101.49": true, + "199.217.105.250": true, + "3.237.164.90": true, + "3.237.210.152": true, + "3.237.165.26": true, + "3.237.221.248": true, + "37.120.159.165": true, + "199.217.105.243": true, + "79.137.105.150": true, + "89.187.177.54": true, + "5.161.108.85": true, + "5.252.23.106": true, + "207.244.91.166": true, + "198.98.52.25": true, + "207.244.91.131": true, + "175.182.108.229": true, + "107.181.206.145": true, } +var allowedReferrers = map[string]bool{ + "https://piped.kavin.rocks/": true, + "https://piped.video/": true, + "https://www.gstatic.com/": true, + "http://localhost:9090/": true, +} +var allowedTldReferrers = map[string]bool{ + "odysee.com": true, + "odysee.tv": true, +} +var allowedOrigins = map[string]bool{ + "https://odysee.com": true, + "https://neko.odysee.tv": true, + "https://salt.odysee.tv": true, + "https://kp.odysee.tv": true, + "https://inf.odysee.tv": true, + "https://stashu.odysee.tv": true, + "https://www.gstatic.com": true, + "https://odysee.ap.ngrok.io": true, +} +var allowedUserAgents = []string{ + "LBRY/", + "Roku/", +} +var allowedSpecialHeaders = map[string]bool{"x-cf-lb-monitor": true} + +var allowedXRequestedWith = "com.odysee.app" + // Handle is responsible for all HTTP media delivery via player module. func (h *RequestHandler) Handle(c *gin.Context) { addCSPHeaders(c) @@ -74,8 +127,9 @@ func (h *RequestHandler) Handle(c *gin.Context) { if c.Request.Method == http.MethodHead { c.Header("Cache-Control", "no-store, No-cache") } - var uri string + var uri string + var isSpeech bool if strings.HasPrefix(c.Request.URL.String(), SpeechPrefix) { // Speech stuff uri = c.Request.URL.String()[len(SpeechPrefix):] @@ -87,6 +141,7 @@ func (h *RequestHandler) Handle(c *gin.Context) { c.AbortWithStatus(http.StatusNotFound) return } + isSpeech = true // Speech stuff over } else if c.Param("claim_name") != "" { uri = c.Param("claim_name") + "#" + c.Param("claim_id") @@ -98,14 +153,69 @@ func (h *RequestHandler) Handle(c *gin.Context) { } } - //this is here temporarily due to abuse. a better solution will be found - forwardedFor := c.GetHeader("X-Forwarded-For") - if forwardedFor != "" { - if bannedIPs[strings.TrimSpace(strings.Split(forwardedFor, ",")[0])] { - c.AbortWithStatus(http.StatusTooManyRequests) - return + magicTimestamp, exists := c.GetQuery("magic") + magicPass := false + if exists && magicTimestamp != "" { + unixT, err := strconv.Atoi(magicTimestamp) + if err == nil { + genesisTime := time.Unix(int64(unixT), 0) + if time.Since(genesisTime) < 5*time.Minute { + magicPass = true + } } } + + flagged := true + for header, v := range c.Request.Header { + hasHeaderToCheck := header != "User-Agent" && header != "Referer" && header != "Origin" && header != "X-Requested-With" + if hasHeaderToCheck && !allowedSpecialHeaders[strings.ToLower(header)] { + continue + } + if strings.ToLower(header) == "origin" && allowedOrigins[v[0]] { + flagged = false + break + } + if strings.ToLower(header) == "referer" { + if allowedReferrers[v[0]] { + flagged = false + break + } + //check if the referrer is from an allowed tld (weak check) + for tld := range allowedTldReferrers { + if strings.Contains(v[0], tld) { + flagged = false + break + } + } + } + + if strings.ToLower(header) == "user-agent" { + for _, ua := range allowedUserAgents { + if strings.HasPrefix(v[0], ua) { + flagged = false + break + } + } + } + if allowedSpecialHeaders[strings.ToLower(header)] { + flagged = false + break + } + if strings.ToLower(header) == "x-requested-with" && v[0] == allowedXRequestedWith { + flagged = false + break + } + } + //if the request is flagged and the magic pass is not set then we will not serve the request + //with an exception for /v3/ endpoints for now + flagged = !magicPass && flagged && !strings.HasPrefix(c.Request.URL.String(), "/api/v3/") + + //this is here temporarily due to abuse. a better solution will be found + ip := c.ClientIP() + if bannedIPs[ip] { + c.AbortWithStatus(http.StatusTooManyRequests) + return + } if strings.Contains(c.Request.URL.String(), "Katmovie18") { c.String(http.StatusForbidden, "this content cannot be accessed") return @@ -120,7 +230,17 @@ func (h *RequestHandler) Handle(c *gin.Context) { } } isDownload, _ := strconv.ParseBool(c.Query(paramDownload)) - if isDownload && !h.player.options.downloadsEnabled { + + if isDownload { + // log all headers for download requests + //encode headers in a json string + headers, err := json.MarshalIndent(c.Request.Header, "", " ") + if err == nil { + logrus.Infof("download request for %s with IP %s and headers: %+v", uri, ip, string(headers)) + } + } + //don't allow downloads if either flagged or disabled + if isDownload && (!h.player.options.downloadsEnabled || flagged) { c.String(http.StatusForbidden, "downloads are currently disabled") return } @@ -137,11 +257,28 @@ func (h *RequestHandler) Handle(c *gin.Context) { c.String(http.StatusForbidden, "this content cannot be accessed") return } + abusiveIP, abuseCount := firewall.CheckAndRateLimitIp(ip, stream.ClaimID) + if abusiveIP { + Logger.Warnf("IP %s is abusing resources (count: %d): %s - %s", ip, abuseCount, stream.ClaimID, stream.claim.Name) + if abuseCount > 10 { + c.String(http.StatusTooManyRequests, "Try again later") + return + } + } + if isDownload && abuseCount > 2 { + c.String(http.StatusTooManyRequests, "Try again later") + return + } + err = h.player.VerifyAccess(stream, c) if err != nil { processStreamError("access", uri, c.Writer, c.Request, err) return } + if flagged && !isSpeech { + c.String(http.StatusUnauthorized, "this content cannot be accessed at the moment") + return + } if !isDownload && fitForTranscoder(c, stream) && h.player.tclient != nil { tcPath := h.player.tclient.GetPlaybackPath(c.Param("claim_id"), stream.hash) @@ -194,6 +331,13 @@ func (h *RequestHandler) HandleTranscodedFragment(c *gin.Context) { addPoweredByHeaders(c) metrics.StreamsRunning.WithLabelValues(metrics.StreamTranscoded).Inc() defer metrics.StreamsRunning.WithLabelValues(metrics.StreamTranscoded).Dec() + blocked, err := iapi.GetBlockedContent() + if err == nil { + if blocked[uri] { + c.String(http.StatusForbidden, "this content cannot be accessed") + return + } + } size, err := h.player.tclient.PlayFragment(uri, c.Param("sd_hash"), c.Param("fragment"), c.Writer, c.Request) if err != nil { writeErrorResponse(c.Writer, http.StatusNotFound, err.Error()) diff --git a/player/http_handlers_test.go b/player/http_handlers_test.go index 3928518..9297a7a 100644 --- a/player/http_handlers_test.go +++ b/player/http_handlers_test.go @@ -30,6 +30,7 @@ func makeRequest(t *testing.T, router *gin.Engine, method, uri string, rng *rang r, err := http.NewRequest(method, uri, nil) require.NoError(t, err) + r.Header.Add("Referer", "https://odysee.com") if rng != nil { if rng.start == 0 { r.Header.Add("Range", fmt.Sprintf("bytes=0-%v", rng.end)) @@ -320,6 +321,7 @@ func Test_fitForTranscoder(t *testing.T) { InstallPlayerRoutes(e, p) r, _ = http.NewRequest(http.MethodGet, "https://cdn.lbryplayer.xyz/api/v4/streams/free/claimname/abc/sdhash", nil) + r.Header.Add("referer", "https://odysee.com/") c.Request = r e.HandleContext(c)