From 63646e5f5c917160ea4185d17dcdfeca2511169d Mon Sep 17 00:00:00 2001 From: Vivek Singh Date: Wed, 17 Mar 2021 13:33:58 +0530 Subject: [PATCH] Add async endpoint support This commit adds async endpoint support to cron connector which can be enabled using `asynchronous_invocation` environment variable. Fixes: #11 Signed-off-by: Vivek Singh --- main.go | 25 ++++++++++++++++++++----- main_test.go | 33 +++++++++++++++++++++++++++++++++ types/cron_function.go | 2 +- 3 files changed, 54 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index 081c7be..6346369 100644 --- a/main.go +++ b/main.go @@ -25,9 +25,11 @@ func main() { } sha, ver := version.GetReleaseInfo() - log.Printf("Version: %s\tCommit: %s", sha, ver) + log.Printf("Version: %s\tCommit: %s\n", sha, ver) + log.Printf("Gateway URL: %s", config.GatewayURL) + log.Printf("Async Invocation: %v", config.AsyncFunctionInvocation) - invoker := types.NewInvoker(config.GatewayURL, types.MakeClient(config.UpstreamTimeout), config.PrintResponse) + invoker := types.NewInvoker(gatewayRoute(config), types.MakeClient(config.UpstreamTimeout), config.PrintResponse) cronScheduler := cfunction.NewScheduler() topic := "cron-function" interval := time.Second * 10 @@ -40,6 +42,13 @@ func main() { } } +func gatewayRoute(config *types.ControllerConfig) string { + if config.AsyncFunctionInvocation { + return fmt.Sprintf("%s/%s", config.GatewayURL, "async-function") + } + return fmt.Sprintf("%s/%s", config.GatewayURL, "function") +} + func getControllerConfig() (*types.ControllerConfig, error) { gURL, ok := os.LookupEnv("gateway_url") @@ -47,10 +56,16 @@ func getControllerConfig() (*types.ControllerConfig, error) { return nil, fmt.Errorf("Gateway URL not set") } + asynchronousInvocation := false + if val, exists := os.LookupEnv("asynchronous_invocation"); exists { + asynchronousInvocation = (val == "1" || val == "true") + } + return &types.ControllerConfig{ - RebuildInterval: time.Millisecond * 1000, - GatewayURL: gURL, - PrintResponse: true, + RebuildInterval: time.Millisecond * 1000, + GatewayURL: gURL, + PrintResponse: true, + AsyncFunctionInvocation: asynchronousInvocation, }, nil } diff --git a/main_test.go b/main_test.go index 6a808a7..354a684 100644 --- a/main_test.go +++ b/main_test.go @@ -5,6 +5,7 @@ package main import ( "testing" + "github.com/openfaas/connector-sdk/types" cfunction "github.com/openfaas/cron-connector/types" ptypes "github.com/openfaas/faas-provider/types" ) @@ -68,3 +69,35 @@ func TestNamespaceFuncs(t *testing.T) { t.Error("function should be left as it is") } } + +func TestGatewayRoute_Async(t *testing.T) { + testscases := []struct { + GatewayURL string + ExpectedGatewayURL string + AsyncFunctionInvocation bool + }{ + { + GatewayURL: "http://localhost:8080", + AsyncFunctionInvocation: true, + ExpectedGatewayURL: "http://localhost:8080/async-function", + }, + { + GatewayURL: "http://localhost:8080", + AsyncFunctionInvocation: false, + ExpectedGatewayURL: "http://localhost:8080/function", + }, + } + + for _, test := range testscases { + config := &types.ControllerConfig{ + GatewayURL: test.GatewayURL, + AsyncFunctionInvocation: test.AsyncFunctionInvocation, + } + + val := gatewayRoute(config) + if val != test.ExpectedGatewayURL { + t.Errorf("expected: %s, got: %s", test.ExpectedGatewayURL, val) + } + } + +} diff --git a/types/cron_function.go b/types/cron_function.go index a7c38bd..83c25db 100644 --- a/types/cron_function.go +++ b/types/cron_function.go @@ -66,7 +66,7 @@ func ToCronFunction(f ptypes.FunctionStatus, namespace string, topic string) (Cr // InvokeFunction Invokes the cron function func (c CronFunction) InvokeFunction(i *types.Invoker) (*[]byte, error) { - gwURL := fmt.Sprintf("%s/function/%s.%s", i.GatewayURL, c.Name, c.Namespace) + gwURL := fmt.Sprintf("%s/%s.%s", i.GatewayURL, c.Name, c.Namespace) reader := bytes.NewReader(make([]byte, 0)) httpReq, _ := http.NewRequest(http.MethodPost, gwURL, reader)