diff --git a/go/pkg/server/gateway.go b/go/pkg/server/gateway.go index e97c1a4..4e89b4c 100644 --- a/go/pkg/server/gateway.go +++ b/go/pkg/server/gateway.go @@ -177,7 +177,10 @@ func (g *Gateway) forwardCheckRequestMiddleware(next http.Handler) http.Handler if err == nil { dslFound = true } - // TODO: 各処理を並列化する + + ch := make(chan []interface{}) + errCh := make(chan error) + numForwarded := 0 for _, validation := range validations { validation, ok := validation.(map[string]interface{}) if !ok { @@ -202,69 +205,83 @@ func (g *Gateway) forwardCheckRequestMiddleware(next http.Handler) http.Handler } if isForwardNeed { - // Find the slave node that can handle validation ID - slaveNode, err := g.slaveManager.FindSlave(id) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - var client *http.Client - if slaveNode.TLSEnabled { - transport := &http.Transport{ - TLSClientConfig: &tls.Config{}, + numForwarded++ + go func(id string, ch chan []interface{}) { + // Find the slave node that can handle validation ID + slaveNode, err := g.slaveManager.FindSlave(id) + if err != nil { + errCh <- err + return } - client = &http.Client{Transport: transport} - } else { - client = &http.Client{} - } - client.Timeout = 5 * time.Second - reqBody := map[string]interface{}{ - "validations": []interface{}{validation}, - } - body, err := json.Marshal(reqBody) - if err != nil { - http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) - return - } - req, err := http.NewRequest("POST", slaveNode.Addr+"/v1/check", bytes.NewBuffer(body)) - if err != nil { - http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) - return - } - req.Header.Set("Content-Type", "application/json") + var client *http.Client + if slaveNode.TLSEnabled { + transport := &http.Transport{ + TLSClientConfig: &tls.Config{}, + } + client = &http.Client{Transport: transport} + } else { + client = &http.Client{} + } + client.Timeout = 5 * time.Second - resp, err := client.Do(req) - if err != nil { - http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) - return - } - defer resp.Body.Close() + reqBody := map[string]interface{}{ + "validations": []interface{}{validation}, + } + body, err := json.Marshal(reqBody) + if err != nil { + errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed) + return + } + req, err := http.NewRequest("POST", slaveNode.Addr+"/v1/check", bytes.NewBuffer(body)) + if err != nil { + errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed) + return + } + req.Header.Set("Content-Type", "application/json") - if resp.StatusCode != http.StatusOK { - http.Error(w, failure.New(appError.ErrValidateServiceForwardFailed, failure.Messagef("Failed to forward the validate request to slave: %d", resp.StatusCode)).Error(), http.StatusInternalServerError) - return - } + resp, err := client.Do(req) + if err != nil { + errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed) + return + } + defer resp.Body.Close() - var respBody map[string]interface{} - if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil { - http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) - return - } - results, ok := respBody["results"].([]interface{}) - if !ok { - http.Error(w, failure.New(appError.ErrValidateServiceForwardFailed, failure.Messagef("results field is invalid")).Error(), http.StatusInternalServerError) - return - } - validationResults = append(validationResults, results...) + if resp.StatusCode != http.StatusOK { + errCh <- failure.New(appError.ErrValidateServiceForwardFailed, failure.Messagef("Failed to forward the validate request to slave: %d", resp.StatusCode)) + return + } - g.logger.Info(fmt.Sprintf("⚽️ Request (id:%s) Forwarded to Slave %s", id, slaveNode.Id)) + var respBody map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil { + errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed) + return + } + results, ok := respBody["results"].([]interface{}) + if !ok { + errCh <- failure.New(appError.ErrRequestParameterInvalid, failure.Messagef("results field is invalid")) + return + } + ch <- results + g.logger.Info(fmt.Sprintf("⚽️ Request (id:%s) Forwarded to Slave %s", id, slaveNode.Id)) + }(id, ch) } else { modifiedRequestValidations = append(modifiedRequestValidations, validation) } } + for i := 0; i < numForwarded; i++ { + select { + case err := <-errCh: + http.Error(w, err.Error(), http.StatusInternalServerError) + return + case results := <-ch: + validationResults = append(validationResults, results...) + case <-time.After(30 * time.Second): + http.Error(w, failure.New(appError.ErrValidateServiceForwardFailed, failure.Message("Timeout")).Error(), http.StatusInternalServerError) + } + } + reqBody["validations"] = modifiedRequestValidations modifiedReqBody, err := json.Marshal(reqBody) if err != nil {