Skip to content

Commit

Permalink
Add async endpoint support
Browse files Browse the repository at this point in the history
This commit adds async endpoint support to cron connector which can be enabled
using `asynchronous_invocation` environment variable.

Fixes: #11
Signed-off-by: Vivek Singh <[email protected]>
  • Loading branch information
viveksyngh authored and alexellis committed Mar 24, 2021
1 parent 549c1dc commit 63646e5
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 6 deletions.
25 changes: 20 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ func main() {
}

sha, ver := version.GetReleaseInfo()
log.Printf("Version: %s\tCommit: %s", sha, ver)
log.Printf("Version: %s\tCommit: %s\n", sha, ver)
log.Printf("Gateway URL: %s", config.GatewayURL)
log.Printf("Async Invocation: %v", config.AsyncFunctionInvocation)

invoker := types.NewInvoker(config.GatewayURL, types.MakeClient(config.UpstreamTimeout), config.PrintResponse)
invoker := types.NewInvoker(gatewayRoute(config), types.MakeClient(config.UpstreamTimeout), config.PrintResponse)
cronScheduler := cfunction.NewScheduler()
topic := "cron-function"
interval := time.Second * 10
Expand All @@ -40,17 +42,30 @@ func main() {
}
}

func gatewayRoute(config *types.ControllerConfig) string {
if config.AsyncFunctionInvocation {
return fmt.Sprintf("%s/%s", config.GatewayURL, "async-function")
}
return fmt.Sprintf("%s/%s", config.GatewayURL, "function")
}

func getControllerConfig() (*types.ControllerConfig, error) {
gURL, ok := os.LookupEnv("gateway_url")

if !ok {
return nil, fmt.Errorf("Gateway URL not set")
}

asynchronousInvocation := false
if val, exists := os.LookupEnv("asynchronous_invocation"); exists {
asynchronousInvocation = (val == "1" || val == "true")
}

return &types.ControllerConfig{
RebuildInterval: time.Millisecond * 1000,
GatewayURL: gURL,
PrintResponse: true,
RebuildInterval: time.Millisecond * 1000,
GatewayURL: gURL,
PrintResponse: true,
AsyncFunctionInvocation: asynchronousInvocation,
}, nil
}

Expand Down
33 changes: 33 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main
import (
"testing"

"github.com/openfaas/connector-sdk/types"
cfunction "github.com/openfaas/cron-connector/types"
ptypes "github.com/openfaas/faas-provider/types"
)
Expand Down Expand Up @@ -68,3 +69,35 @@ func TestNamespaceFuncs(t *testing.T) {
t.Error("function should be left as it is")
}
}

func TestGatewayRoute_Async(t *testing.T) {
testscases := []struct {
GatewayURL string
ExpectedGatewayURL string
AsyncFunctionInvocation bool
}{
{
GatewayURL: "http://localhost:8080",
AsyncFunctionInvocation: true,
ExpectedGatewayURL: "http://localhost:8080/async-function",
},
{
GatewayURL: "http://localhost:8080",
AsyncFunctionInvocation: false,
ExpectedGatewayURL: "http://localhost:8080/function",
},
}

for _, test := range testscases {
config := &types.ControllerConfig{
GatewayURL: test.GatewayURL,
AsyncFunctionInvocation: test.AsyncFunctionInvocation,
}

val := gatewayRoute(config)
if val != test.ExpectedGatewayURL {
t.Errorf("expected: %s, got: %s", test.ExpectedGatewayURL, val)
}
}

}
2 changes: 1 addition & 1 deletion types/cron_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func ToCronFunction(f ptypes.FunctionStatus, namespace string, topic string) (Cr

// InvokeFunction Invokes the cron function
func (c CronFunction) InvokeFunction(i *types.Invoker) (*[]byte, error) {
gwURL := fmt.Sprintf("%s/function/%s.%s", i.GatewayURL, c.Name, c.Namespace)
gwURL := fmt.Sprintf("%s/%s.%s", i.GatewayURL, c.Name, c.Namespace)
reader := bytes.NewReader(make([]byte, 0))
httpReq, _ := http.NewRequest(http.MethodPost, gwURL, reader)

Expand Down

0 comments on commit 63646e5

Please sign in to comment.