Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor resultForDefaultIter #2674

Merged
merged 4 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,19 +552,23 @@ func (h *Handler) resultForDefaultIter(

eg, ctx := ctx.NewErrgroup()

var rowChan chan sql.Row

rowChan = make(chan sql.Row, 512)

pan2err := func() {
if recoveredPanic := recover(); recoveredPanic != nil {
returnErr = fmt.Errorf("handler caught panic: %v", recoveredPanic)
}
}

pollCtx, cancelF := ctx.NewSubContext()
eg.Go(func() error {
defer pan2err()
return h.pollForClosedConnection(pollCtx, c)
})

wg := sync.WaitGroup{}
wg.Add(2)

// Read rows off the row iterator and send them to the row channel.
var rowChan = make(chan sql.Row, 512)
eg.Go(func() error {
defer pan2err()
defer wg.Done()
Expand All @@ -590,12 +594,6 @@ func (h *Handler) resultForDefaultIter(
}
})

pollCtx, cancelF := ctx.NewSubContext()
eg.Go(func() error {
defer pan2err()
return h.pollForClosedConnection(pollCtx, c)
})

// Default waitTime is one minute if there is no timeout configured, in which case
// it will loop to iterate again unless the socket died by the OS timeout or other problems.
// If there is a timeout, it will be enforced to ensure that Vitess has a chance to
Expand All @@ -607,7 +605,7 @@ func (h *Handler) resultForDefaultIter(
timer := time.NewTimer(waitTime)
defer timer.Stop()

// reads rows from the channel, converts them to wire format,
// Reads rows from the channel, converts them to wire format,
// and calls |callback| to give them to vitess.
eg.Go(func() error {
defer pan2err()
Expand Down Expand Up @@ -679,7 +677,6 @@ func (h *Handler) resultForDefaultIter(
}
returnErr = err
}

return
}

Expand Down Expand Up @@ -906,18 +903,19 @@ func updateMaxUsedConnectionsStatusVariable() {

func rowToSQL(ctx *sql.Context, s sql.Schema, row sql.Row) ([]sqltypes.Value, error) {
o := make([]sqltypes.Value, len(row))
// need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock)
if len(s) == 0 {
return o, nil
}
var err error
for i, v := range row {
if v == nil {
o[i] = sqltypes.NULL
continue
}
// need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock)
if len(s) > 0 {
o[i], err = s[i].Type.SQL(ctx, nil, v)
if err != nil {
return nil, err
}
o[i], err = s[i].Type.SQL(ctx, nil, v)
if err != nil {
return nil, err
}
}

Expand Down
13 changes: 6 additions & 7 deletions sql/rowexec/rel_iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,25 +148,24 @@ func ProjectRow(
projections []sql.Expression,
row sql.Row,
) (sql.Row, error) {
var secondPass []int
var fields sql.Row
var fields = make(sql.Row, len(projections))
var secondPass = make([]int, 0, len(projections))
for i, expr := range projections {
// Default values that are expressions may reference other fields, thus they must evaluate after all other exprs.
// Also default expressions may not refer to other columns that come after them if they also have a default expr.
// This ensures that all columns referenced by expressions will have already been evaluated.
// Since literals do not reference other columns, they're evaluated on the first pass.
defaultVal, isDefaultVal := defaultValFromProjectExpr(expr)
if isDefaultVal && !defaultVal.IsLiteral() {
fields = append(fields, nil)
secondPass = append(secondPass, i)
continue
}
f, fErr := expr.Eval(ctx, row)
field, fErr := expr.Eval(ctx, row)
if fErr != nil {
return nil, fErr
}
f = normalizeNegativeZeros(f)
fields = append(fields, f)
field = normalizeNegativeZeros(field)
fields[i] = field
}
for _, index := range secondPass {
field, err := projections[index].Eval(ctx, fields)
Expand All @@ -176,7 +175,7 @@ func ProjectRow(
field = normalizeNegativeZeros(field)
fields[index] = field
}
return sql.NewRow(fields...), nil
return fields, nil
}

func defaultValFromProjectExpr(e sql.Expression) (*sql.ColumnDefaultValue, bool) {
Expand Down