Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Apr 8, 2024
2 parents 7987e05 + cf3e189 commit ead2f1e
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 33 deletions.
56 changes: 56 additions & 0 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3858,6 +3858,62 @@ func propagateOrg(org Org) error {
return nil
}

func propagateApp(appId string, delete bool) error {
if len(appId) == 0 {
return errors.New("no ID provided for app")
}

if delete {
log.Printf("[INFO] Deletion propagation is disabled right now.")
return nil
}

if len(propagateUrl) == 0 || len(propagateToken) == 0 {
return errors.New("no SHUFFLE_PROPAGATE_URL or SHUFFLE_PROPAGATE_TOKEN provided")
}
// SHUFFLE_GCE_LOCATION
gceRegion := os.Getenv("SHUFFLE_GCEPROJECT_LOCATION")

log.Printf("[INFO] Asking %s to propagate app %s", propagateUrl, appId)
data := map[string]string{"mode": "app", "appId": appId, "region": gceRegion}

reqBody, err := json.Marshal(data)
if err != nil {
log.Printf("[WARNING] Failed marshalling propagation data %s: %s", appId, err)
return err
}

req, err := http.NewRequest("POST", propagateUrl, bytes.NewBuffer(reqBody))
if err != nil {
log.Printf("[WARNING] Failed creating request for app %s: %s", appId, err)
return err
}

// Set headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", propagateToken)

// Send the request via a client
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Printf("[WARNING] Failed sending request for app %s: %s", appId, err)
return err
}

defer resp.Body.Close()

// Check the response
if resp.StatusCode != 200 {
log.Printf("[WARNING] Error in propagation: %s for app %s", resp.Status, appId)
return errors.New(fmt.Sprintf("bad statuscode: %d", resp.StatusCode))
}

log.Printf("[INFO] Propagation successful for app %s", appId)

return nil
}

func propagateUser(user User, delete bool) error {
if len(user.Id) == 0 {
return errors.New("no ID provided for user")
Expand Down
87 changes: 54 additions & 33 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,7 +2228,6 @@ func HandleStopExecutions(resp http.ResponseWriter, request *http.Request) {
if project.Environment == "cloud" {
queueName = fmt.Sprintf("%s_%s", strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(env.Name, " ", "-"), "_", "-")), user.ActiveOrg.Id)
}


for i := 0; i < 10; i++ {
executionRequests, err := GetWorkflowQueue(ctx, queueName, maxAmount)
Expand Down Expand Up @@ -2272,7 +2271,7 @@ func HandleStopExecutions(resp http.ResponseWriter, request *http.Request) {
}

// Delete the index entirely
indexName := "workflowqueue-"+queueName
indexName := "workflowqueue-" + queueName
err = DeleteDbIndex(ctx, indexName)
if err != nil {
log.Printf("[ERROR] Failed deleting index %s: %s", indexName, err)
Expand Down Expand Up @@ -5365,7 +5364,7 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
}
}

// Handle app versions & upgrades
// Handle app versions & upgrades
for _, action := range workflow.Actions {
actionApp := strings.ToLower(strings.Replace(action.AppName, " ", "", -1))

Expand Down Expand Up @@ -7285,7 +7284,7 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {
return
}

// Handle app versions & upgrades
// Handle app versions & upgrades
for _, action := range workflow.Actions {
actionApp := strings.ToLower(strings.Replace(action.AppName, " ", "", -1))

Expand Down Expand Up @@ -7586,14 +7585,14 @@ func HandleDeleteUsersAccount(resp http.ResponseWriter, request *http.Request) {
var requestBody struct {
Password string `json:"password"`
}

if err := json.NewDecoder(request.Body).Decode(&requestBody); err != nil {
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false, "reason": "Failed decoding request body"}`))
return
}
password := requestBody.Password

password := requestBody.Password

err = bcrypt.CompareHashAndPassword([]byte(foundUser.Password), []byte(password))
if err != nil {
Expand All @@ -7607,11 +7606,11 @@ func HandleDeleteUsersAccount(resp http.ResponseWriter, request *http.Request) {

// Overwrite incase the user is in the same org
// This could be a way to jump into someone elses organisation if the user already has the correct org name without correct name.
if (foundUser.ActiveOrg.Id == "" && foundUser.ActiveOrg.Name == userInfo.ActiveOrg.Name && len(foundUser.Orgs) == 0) {
if foundUser.ActiveOrg.Id == "" && foundUser.ActiveOrg.Name == userInfo.ActiveOrg.Name && len(foundUser.Orgs) == 0 {
foundUser.ActiveOrg.Id = string(userInfo.ActiveOrg.Id)
}

if (foundUser.SupportAccess) {
if foundUser.SupportAccess {
log.Printf("[AUDIT] Can't delete support user %s (%s)", userInfo.Username, userInfo.Id)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Can't delete support user"}`))
Expand Down Expand Up @@ -7671,11 +7670,11 @@ func HandleDeleteUsersAccount(resp http.ResponseWriter, request *http.Request) {

org.Users = users
if len(org.Users) > 1 {
err = SetOrg(ctx, *org, org.Id)
if err != nil {
log.Printf("[WARNING] Failed updating org (delete user %s) %s: %s", foundUser.Username, org.Id, err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Removed their access but failed updating own user list"}`)))
err = SetOrg(ctx, *org, org.Id)
if err != nil {
log.Printf("[WARNING] Failed updating org (delete user %s) %s: %s", foundUser.Username, org.Id, err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Removed their access but failed updating own user list"}`)))
return
}
}
Expand Down Expand Up @@ -12879,6 +12878,29 @@ func ActivateWorkflowApp(resp http.ResponseWriter, request *http.Request) {
app = &selectedApp
} else {
log.Printf("[WARNING] Error getting app with ID %s (app config): %s", fileId, err)
if project.Environment == "cloud" && gceProject != "shuffler" {
_, err := HandleAlgoliaAppSearch(ctx, appName)
if err == nil {
// this means that the app exists. so, let's
// ask our propagator to proagate it further.
log.Printf("[INFO] Found app %s in algolia", appName)

go func() {
err = propagateApp(fileId, false)
if err != nil {
log.Printf("[WARNING] Error propagating app %s: %s", appName, err)
} else {
log.Printf("[INFO] Propagated app %s", appName)
}
}()

resp.WriteHeader(202)
resp.Write([]byte(`{"success": false, "reason": "Please try activation again in a few seconds!"}`))
return
} else {
log.Printf("[WARNING] Error getting app %s (algolia): %s", appName, err)
}
}
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "App doesn't exist"}`))
return
Expand Down Expand Up @@ -14550,13 +14572,13 @@ func PrepareSingleAction(ctx context.Context, user User, fileId string, body []b

newParams := []WorkflowAppActionParameter{}
/*
for _, param := range action.Parameters {
if param.Configuration && len(param.Value) == 0 {
continue
}
for _, param := range action.Parameters {
if param.Configuration && len(param.Value) == 0 {
continue
}

newParams = append(newParams, param)
}
newParams = append(newParams, param)
}
*/

// Auth is handled in PrepareWorkflowExec, so this may not be needed
Expand Down Expand Up @@ -14688,9 +14710,9 @@ func HandleRetValidation(ctx context.Context, workflowExecution WorkflowExecutio
}

// VERY short sleeptime here on purpose
maxSeconds := 15
maxSeconds := 15
if project.Environment != "cloud" {
maxSeconds = 60
maxSeconds = 60
}

sleeptime := 100
Expand Down Expand Up @@ -14729,7 +14751,7 @@ func HandleRetValidation(ctx context.Context, workflowExecution WorkflowExecutio
//log.Printf("Cnt: %d", cnt)
if cnt == (maxSeconds * (maxSeconds * 100 / sleeptime)) {

returnBody.Success = true
returnBody.Success = true

returnBody.Errors = []string{fmt.Sprintf("Polling timed out after %d seconds. Use the /api/v1/streams API with body `{\"execution_id\": \"%s\", \"authorization\": \"%s\"}` to get the latest results", maxSeconds, workflowExecution.ExecutionId, workflowExecution.Authorization)}

Expand Down Expand Up @@ -17371,10 +17393,10 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
// Adding so it can be used to fail the auth naturally with Outlook

/*
newAuth.Fields = append(newAuth.Fields, AuthenticationStore{
Key: "access_token",
Value: "FAILURE_REFRESH",
})
newAuth.Fields = append(newAuth.Fields, AuthenticationStore{
Key: "access_token",
Value: "FAILURE_REFRESH",
})
*/

// Commented out as we don't want to stop the app, but just continue with the old tokens
Expand Down Expand Up @@ -20962,10 +20984,10 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
client := GetExternalClient(streamUrl)

// AI fallback mechanism to handle missing fields
// This is in case some fields are not sent in properly
// This is in case some fields are not sent in properly
authorization := ""
optionalExecutionId := ""
if len(missingFields) > 0 {
if len(missingFields) > 0 {
formattedQueryFields := []string{}
for _, field := range value.Fields {
formattedQueryFields = append(formattedQueryFields, fmt.Sprintf("%s=%s", field.Key, field.Value))
Expand Down Expand Up @@ -21057,7 +21079,6 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
secondAction.LargeImage = newSecondAction.LargeImage
}


if value.SkipWorkflow {
log.Printf("[DEBUG] Skipping workflow generation, and instead attempting to directly run the action. This is only applicable IF the action is atomic (skip_workflow=true).")

Expand Down Expand Up @@ -21096,7 +21117,7 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {

authorization = request.URL.Query().Get("authorization")
optionalExecutionId = request.URL.Query().Get("execution_id")
}
}

if len(value.OrgId) > 0 {
streamUrl = fmt.Sprintf("%s&org_id=%s", streamUrl, value.OrgId)
Expand Down Expand Up @@ -21153,11 +21174,11 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {

// Unmarshal responseBody back to secondAction
// FIXME: add auth config to save translation files properly

//streamUrl = fmt.Sprintf("%s&execution_id=%s&authorization=%s", streamUrl, request.URL.Query().Get("execution_id"), request.URL.Query().Get("authorization"))

baseurlSplit := strings.Split(streamUrl, "/")
if len(baseurlSplit) > 2 {
if len(baseurlSplit) > 2 {
// Only grab from https -> start of path
streamUrl = strings.Join(baseurlSplit[0:3], "/")
}
Expand Down

0 comments on commit ead2f1e

Please sign in to comment.