Skip to content

Commit

Permalink
make operation parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
shibukazu committed Aug 23, 2024
1 parent e8089ce commit ec82657
Showing 1 changed file with 70 additions and 53 deletions.
123 changes: 70 additions & 53 deletions go/pkg/server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ func (g *Gateway) forwardCheckRequestMiddleware(next http.Handler) http.Handler
if err == nil {
dslFound = true
}
// TODO: 各処理を並列化する

ch := make(chan []interface{})
errCh := make(chan error)
numForwarded := 0
for _, validation := range validations {
validation, ok := validation.(map[string]interface{})
if !ok {
Expand All @@ -202,69 +205,83 @@ func (g *Gateway) forwardCheckRequestMiddleware(next http.Handler) http.Handler
}

if isForwardNeed {
// Find the slave node that can handle validation ID
slaveNode, err := g.slaveManager.FindSlave(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

var client *http.Client
if slaveNode.TLSEnabled {
transport := &http.Transport{
TLSClientConfig: &tls.Config{},
numForwarded++
go func(id string, ch chan []interface{}) {
// Find the slave node that can handle validation ID
slaveNode, err := g.slaveManager.FindSlave(id)
if err != nil {
errCh <- err
return
}
client = &http.Client{Transport: transport}
} else {
client = &http.Client{}
}
client.Timeout = 5 * time.Second

reqBody := map[string]interface{}{
"validations": []interface{}{validation},
}
body, err := json.Marshal(reqBody)
if err != nil {
http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError)
return
}
req, err := http.NewRequest("POST", slaveNode.Addr+"/v1/check", bytes.NewBuffer(body))
if err != nil {
http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError)
return
}
req.Header.Set("Content-Type", "application/json")
var client *http.Client
if slaveNode.TLSEnabled {
transport := &http.Transport{
TLSClientConfig: &tls.Config{},
}
client = &http.Client{Transport: transport}
} else {
client = &http.Client{}
}
client.Timeout = 5 * time.Second

resp, err := client.Do(req)
if err != nil {
http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError)
return
}
defer resp.Body.Close()
reqBody := map[string]interface{}{
"validations": []interface{}{validation},
}
body, err := json.Marshal(reqBody)
if err != nil {
errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed)
return
}
req, err := http.NewRequest("POST", slaveNode.Addr+"/v1/check", bytes.NewBuffer(body))
if err != nil {
errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed)
return
}
req.Header.Set("Content-Type", "application/json")

if resp.StatusCode != http.StatusOK {
http.Error(w, failure.New(appError.ErrValidateServiceForwardFailed, failure.Messagef("Failed to forward the validate request to slave: %d", resp.StatusCode)).Error(), http.StatusInternalServerError)
return
}
resp, err := client.Do(req)
if err != nil {
errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed)
return
}
defer resp.Body.Close()

var respBody map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil {
http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError)
return
}
results, ok := respBody["results"].([]interface{})
if !ok {
http.Error(w, failure.New(appError.ErrValidateServiceForwardFailed, failure.Messagef("results field is invalid")).Error(), http.StatusInternalServerError)
return
}
validationResults = append(validationResults, results...)
if resp.StatusCode != http.StatusOK {
errCh <- failure.New(appError.ErrValidateServiceForwardFailed, failure.Messagef("Failed to forward the validate request to slave: %d", resp.StatusCode))
return
}

g.logger.Info(fmt.Sprintf("⚽️ Request (id:%s) Forwarded to Slave %s", id, slaveNode.Id))
var respBody map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil {
errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed)
return
}
results, ok := respBody["results"].([]interface{})
if !ok {
errCh <- failure.New(appError.ErrRequestParameterInvalid, failure.Messagef("results field is invalid"))
return
}
ch <- results
g.logger.Info(fmt.Sprintf("⚽️ Request (id:%s) Forwarded to Slave %s", id, slaveNode.Id))
}(id, ch)
} else {
modifiedRequestValidations = append(modifiedRequestValidations, validation)
}
}

for i := 0; i < numForwarded; i++ {
select {
case err := <-errCh:
http.Error(w, err.Error(), http.StatusInternalServerError)
return
case results := <-ch:
validationResults = append(validationResults, results...)
case <-time.After(30 * time.Second):
http.Error(w, failure.New(appError.ErrValidateServiceForwardFailed, failure.Message("Timeout")).Error(), http.StatusInternalServerError)
}
}

reqBody["validations"] = modifiedRequestValidations
modifiedReqBody, err := json.Marshal(reqBody)
if err != nil {
Expand Down

0 comments on commit ec82657

Please sign in to comment.