Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use default http client for requests for emissions collector #17

Merged
merged 1 commit into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions pkg/collector/emissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package collector

import (
"context"
"net/http"
"strings"
"sync"
"time"
Expand All @@ -21,7 +20,6 @@ const emissionsCollectorSubsystem = "emissions"

type emissionsCollector struct {
logger log.Logger
client http.Client
ctx context.Context
emissionSources emissions.EmissionSources
emissionsMetricDesc *prometheus.Desc
Expand Down Expand Up @@ -56,9 +54,6 @@ func convertISO2ToISO3(countryCodeISO2 string) string {

// NewEmissionsCollector returns a new Collector exposing emission factor metrics.
func NewEmissionsCollector(logger log.Logger) (Collector, error) {
// Start a new HTTP client
client := http.Client{}

// Ensure country code is in upper case
*countryCodeAlpha2 = strings.ToUpper(*countryCodeAlpha2)

Expand All @@ -78,15 +73,14 @@ func NewEmissionsCollector(logger log.Logger) (Collector, error) {
)

// Create a new instance of EmissionCollector
emissionSources, err := newEmissionSources(ctx, &client, logger)
emissionSources, err := newEmissionSources(ctx, logger)
if err != nil {
level.Error(logger).Log("msg", "Failed to create new EmissionCollector", "err", err)
return nil, err
}

return &emissionsCollector{
logger: logger,
client: client,
ctx: ctx,
emissionSources: *emissionSources,
emissionsMetricDesc: emissionsMetricDesc,
Expand Down
16 changes: 9 additions & 7 deletions pkg/emissions/emaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ const (

type emapsSource struct {
logger log.Logger
client Client
ctx context.Context
apiToken string
cacheDuration int64
lastRequestTime int64
lastEmissionFactor float64
fetch func(apiToken string, ctx context.Context, client Client, logger log.Logger) (float64, error)
fetch func(apiToken string, ctx context.Context, logger log.Logger) (float64, error)
}

func init() {
Expand All @@ -40,7 +39,7 @@ func init() {
}

// NewEMapsSource returns a new Source that returns emission factor from electricity maps data
func NewEMapsSource(ctx context.Context, client Client, logger log.Logger) (Source, error) {
func NewEMapsSource(ctx context.Context, logger log.Logger) (Source, error) {
var eMapsAPIToken string
// Check if EMAPS_API_TOKEN is set
if token, present := os.LookupEnv("EMAPS_API_TOKEN"); present {
Expand All @@ -52,7 +51,6 @@ func NewEMapsSource(ctx context.Context, client Client, logger log.Logger) (Sour
return &emapsSource{
logger: logger,
ctx: ctx,
client: client,
apiToken: eMapsAPIToken,
cacheDuration: 1800,
lastRequestTime: time.Now().Unix(),
Expand All @@ -68,7 +66,7 @@ func NewEMapsSource(ctx context.Context, client Client, logger log.Logger) (Sour
// scrape intervals.
func (s *emapsSource) Update() (float64, error) {
if time.Now().Unix()-s.lastRequestTime > s.cacheDuration || s.lastEmissionFactor == -1 {
currentEmissionFactor, err := s.fetch(s.apiToken, s.ctx, s.client, s.logger)
currentEmissionFactor, err := s.fetch(s.apiToken, s.ctx, s.logger)
if err != nil {
level.Warn(s.logger).Log("msg", "Failed to retrieve emission factor from Electricity maps source", "err", err)

Expand All @@ -92,14 +90,18 @@ func (s *emapsSource) Update() (float64, error) {
}

// Make request to Electricity maps API
func makeEMapsAPIRequest(apiToken string, ctx context.Context, client Client, logger log.Logger) (float64, error) {
func makeEMapsAPIRequest(apiToken string, ctx context.Context, logger log.Logger) (float64, error) {
// Retrieve context values
contextValues := ctx.Value(ContextKey{}).(ContextValues)

params := url.Values{}
params.Add("zone", contextValues.CountryCodeAlpha2)
queryString := params.Encode()

// Create a context with timeout to ensure we dont have deadlocks
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet,
fmt.Sprintf(eMapAPIBaseUrlPath, eMapAPIBaseUrl, queryString), nil,
)
Expand All @@ -111,7 +113,7 @@ func makeEMapsAPIRequest(apiToken string, ctx context.Context, client Client, lo
// Add token to auth header
req.Header.Add("auth-token", apiToken)

resp, err := client.Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
level.Error(logger).Log("msg", "Failed to make HTTP request for Electricity Maps source", "err", err)
return float64(-1), err
Expand Down
4 changes: 2 additions & 2 deletions pkg/emissions/emaps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ var (
emapsIdx = 0
)

func mockEMapsAPIRequest(token string, ctx context.Context, client Client, logger log.Logger) (float64, error) {
func mockEMapsAPIRequest(token string, ctx context.Context, logger log.Logger) (float64, error) {
emapsIdx++
return expectedEMapsFactor[emapsIdx-1], nil
}

func mockEMapsAPIFailRequest(token string, ctx context.Context, client Client, logger log.Logger) (float64, error) {
func mockEMapsAPIFailRequest(token string, ctx context.Context, logger log.Logger) (float64, error) {
return float64(-1), fmt.Errorf("Failed API request")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/emissions/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func init() {
}

// NewGlobalSource returns a new Source that returns a constant global average emission factor
func NewGlobalSource(ctx context.Context, client Client, logger log.Logger) (Source, error) {
func NewGlobalSource(ctx context.Context, logger log.Logger) (Source, error) {
return &globalSource{
logger: logger,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/emissions/owid.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func init() {
}

// NewOWIDSource returns a new Source that returns emission factor from OWID data
func NewOWIDSource(ctx context.Context, client Client, logger log.Logger) (Source, error) {
func NewOWIDSource(ctx context.Context, logger log.Logger) (Source, error) {
// Retrieve context values
contextValues := ctx.Value(ContextKey{}).(ContextValues)
level.Info(logger).Log("msg", "Emission factor from OWID data will be reported.")
Expand Down
16 changes: 9 additions & 7 deletions pkg/emissions/rte.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ const (

type rteSource struct {
logger log.Logger
client Client
ctx context.Context
cacheDuration int64
lastRequestTime int64
lastEmissionFactor float64
fetch func(ctx context.Context, client Client, logger log.Logger) (float64, error)
fetch func(ctx context.Context, logger log.Logger) (float64, error)
}

func init() {
Expand All @@ -38,15 +37,14 @@ func init() {
}

// NewRTESource returns a new Source that returns emission factor from RTE eCO2 mix
func NewRTESource(ctx context.Context, client Client, logger log.Logger) (Source, error) {
func NewRTESource(ctx context.Context, logger log.Logger) (Source, error) {
// Check if country is FR and if not return
if ctx.Value(ContextKey{}).(ContextValues).CountryCodeAlpha2 != "FR" {
return nil, fmt.Errorf("RTE eCO2 data is only available for France")
}
level.Info(logger).Log("msg", "Emission factor from RTE eCO2 mix will be reported.")
return &rteSource{
logger: logger,
client: client,
ctx: ctx,
cacheDuration: 1800,
lastRequestTime: time.Now().Unix(),
Expand All @@ -62,7 +60,7 @@ func NewRTESource(ctx context.Context, client Client, logger log.Logger) (Source
// scrape intervals.
func (s *rteSource) Update() (float64, error) {
if time.Now().Unix()-s.lastRequestTime > s.cacheDuration || s.lastEmissionFactor == -1 {
currentEmissionFactor, err := s.fetch(s.ctx, s.client, s.logger)
currentEmissionFactor, err := s.fetch(s.ctx, s.logger)
if err != nil {
level.Warn(s.logger).Log("msg", "Failed to retrieve emission factor from RTE source", "err", err)

Expand All @@ -86,7 +84,7 @@ func (s *rteSource) Update() (float64, error) {
}

// Make request to Opendatasoft API
func makeRTEAPIRequest(ctx context.Context, client Client, logger log.Logger) (float64, error) {
func makeRTEAPIRequest(ctx context.Context, logger log.Logger) (float64, error) {
// Make query string
params := url.Values{}
params.Add("dataset", "eco2mix-national-tr")
Expand All @@ -104,6 +102,10 @@ func makeRTEAPIRequest(ctx context.Context, client Client, logger log.Logger) (f
)
queryString := params.Encode()

// Create a context with timeout to ensure we dont have deadlocks
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet,
fmt.Sprintf(opendatasoftAPIPath, opendatasoftAPIBaseUrl, queryString), nil,
)
Expand All @@ -112,7 +114,7 @@ func makeRTEAPIRequest(ctx context.Context, client Client, logger log.Logger) (f
return float64(-1), err
}

resp, err := client.Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
level.Error(logger).Log("msg", "Failed to make HTTP request for RTE source", "err", err)
return float64(-1), err
Expand Down
4 changes: 2 additions & 2 deletions pkg/emissions/rte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ var (
rteIdx = 0
)

func mockRTEAPIRequest(ctx context.Context, client Client, logger log.Logger) (float64, error) {
func mockRTEAPIRequest(ctx context.Context, logger log.Logger) (float64, error) {
rteIdx++
return expectedRTEFactor[rteIdx-1], nil
}

func mockRTEAPIFailRequest(ctx context.Context, client Client, logger log.Logger) (float64, error) {
func mockRTEAPIFailRequest(ctx context.Context, logger log.Logger) (float64, error) {
return float64(-1), fmt.Errorf("Failed API request")
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/emissions/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var dataDir embed.FS
var (
CountryCodes CountryCode
emissionsLock = sync.RWMutex{}
factories = make(map[string]func(ctx context.Context, client Client, logger log.Logger) (Source, error))
factories = make(map[string]func(ctx context.Context, logger log.Logger) (Source, error))
)

func init() {
Expand All @@ -35,17 +35,17 @@ func init() {
// Register emission factor source
func RegisterSource(
source string,
factory func(ctx context.Context, client Client, logger log.Logger) (Source, error)) {
factory func(ctx context.Context, logger log.Logger) (Source, error)) {
factories[source] = factory
}

// NewEmissionSources creates a new EmissionSources
func NewEmissionSources(ctx context.Context, client Client, logger log.Logger) (*EmissionSources, error) {
func NewEmissionSources(ctx context.Context, logger log.Logger) (*EmissionSources, error) {
sources := make(map[string]Source)

// Loop over factories and create new instances
for key, factory := range factories {
source, err := factory(ctx, client, log.With(logger, "source", key))
source, err := factory(ctx, log.With(logger, "source", key))
if err != nil {
level.Error(logger).Log("msg", "Failed to create data source", "source", key, "err", err)
continue
Expand Down