Skip to content

Commit

Permalink
Merge pull request #984 from go-kivik/reduceTests
Browse files Browse the repository at this point in the history
Start adding tests specifically around the reduceRows method
  • Loading branch information
flimzy committed May 23, 2024
2 parents 90559ef + 263298b commit ffb8060
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 32 deletions.
18 changes: 10 additions & 8 deletions x/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (d *db) performQuery(
vopts *viewOptions,
) (driver.Rows, error) {
if vopts.group {
return d.performGroupQuery(ctx, ddoc, view, vopts.update, vopts.groupLevel, vopts)
return d.performGroupQuery(ctx, ddoc, view, vopts)
}
for {
rev, err := d.updateIndex(ctx, ddoc, view, vopts.update)
Expand Down Expand Up @@ -202,7 +202,8 @@ func (d *db) performQuery(
}

if meta.reducible && (vopts.reduce == nil || *vopts.reduce) {
return d.reduceRows(results, meta.reduceFuncJS, false, 0, vopts)
ri := &reduceRowIter{results: results}
return d.reduceRows(ri, meta.reduceFuncJS, vopts)
}

// If the results are up to date, OR, we're in false/lazy update mode,
Expand All @@ -216,14 +217,14 @@ func (d *db) performQuery(
}
}

func (d *db) performGroupQuery(ctx context.Context, ddoc, view, update string, groupLevel uint64, vopts *viewOptions) (driver.Rows, error) {
func (d *db) performGroupQuery(ctx context.Context, ddoc, view string, vopts *viewOptions) (driver.Rows, error) {
var (
results *sql.Rows
reducible bool
reduceFuncJS *string
)
for {
rev, err := d.updateIndex(ctx, ddoc, view, update)
rev, err := d.updateIndex(ctx, ddoc, view, vopts.update)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -274,7 +275,7 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view, update string, g
)
`)

results, err = d.db.QueryContext(
results, err = d.db.QueryContext( //nolint:rowserrcheck // Err checked in iterator
ctx, query,
"_design/"+ddoc, rev.rev, rev.id, view, kivik.EndKeySuffix, true,
)
Expand All @@ -291,7 +292,7 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view, update string, g
// should never happen
return nil, errors.New("no rows returned")
}
if update != updateModeTrue {
if vopts.update != updateModeTrue {
break
}
var upToDate bool
Expand All @@ -300,7 +301,7 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view, update string, g
}
if !reducible {
field := "group"
if groupLevel > 0 {
if vopts.groupLevel > 0 {
field = "group_level"
}
return nil, &internal.Error{Status: http.StatusBadRequest, Message: field + " is invalid for map-only views"}
Expand All @@ -310,7 +311,8 @@ func (d *db) performGroupQuery(ctx context.Context, ddoc, view, update string, g
}
}

return d.reduceRows(results, reduceFuncJS, true, groupLevel, vopts)
ri := &reduceRowIter{results: results}
return d.reduceRows(ri, reduceFuncJS, vopts)
}

const batchSize = 100
Expand Down
75 changes: 51 additions & 24 deletions x/sqlite/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,53 +30,80 @@ import (
"github.com/go-kivik/kivik/x/sqlite/v4/internal"
)

func (d *db) reduceRows(results *sql.Rows, reduceFuncJS *string, group bool, groupLevel uint64, vopts *viewOptions) (driver.Rows, error) {
type reduceRowIter struct {
results *sql.Rows
}

type reduceRow struct {
ID string
Key string
Value *string
}

func (r *reduceRowIter) Next() (*reduceRow, error) {
if !r.results.Next() {
if err := r.results.Err(); err != nil {
return nil, err
}
return nil, io.EOF
}
var row reduceRow
if err := r.results.Scan(&row.ID, &row.Key, &row.Value, discard{}, discard{}, discard{}); err != nil {
return nil, err
}
return &row, nil
}

type reduceRows interface {
Next() (*reduceRow, error)
}

func (d *db) reduceRows(ri reduceRows, reduceFuncJS *string, vopts *viewOptions) (*reducedRows, error) {
reduceFn, err := d.reduceFunc(reduceFuncJS, d.logger)
if err != nil {
return nil, err
}
var (
intermediate = map[string][]interface{}{}

id, rowKey string
rowValue *string
)
intermediate := map[string][]interface{}{}

for results.Next() {
if err := results.Scan(&id, &rowKey, &rowValue, discard{}, discard{}, discard{}); err != nil {
for {
row, err := ri.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}

var key, value interface{}
_ = json.Unmarshal([]byte(rowKey), &key)
if rowValue != nil {
_ = json.Unmarshal([]byte(*rowValue), &value)
_ = json.Unmarshal([]byte(row.Key), &key)
if row.Value != nil {
_ = json.Unmarshal([]byte(*row.Value), &value)
}
rv, err := reduceFn([][2]interface{}{{id, key}}, []interface{}{value}, false)
rv, err := reduceFn([][2]interface{}{{row.ID, key}}, []interface{}{value}, false)
if err != nil {
return nil, err
}
// group is handled below
if groupLevel > 0 {
if vopts.groupLevel > 0 {
var unkey []interface{}
_ = json.Unmarshal([]byte(rowKey), &unkey)
if len(unkey) > int(groupLevel) {
newKey, _ := json.Marshal(unkey[:groupLevel])
rowKey = string(newKey)
_ = json.Unmarshal([]byte(row.Key), &unkey)
if len(unkey) > int(vopts.groupLevel) {
newKey, _ := json.Marshal(unkey[:vopts.groupLevel])
row.Key = string(newKey)
}
}
intermediate[rowKey] = append(intermediate[rowKey], rv)
}

if err := results.Err(); err != nil {
return nil, err
intermediate[row.Key] = append(intermediate[row.Key], rv)
}

// group_level is handled above
if !group {
if !vopts.group {
var values []interface{}
for _, v := range intermediate {
values = append(values, v...)
}
if len(values) == 0 {
return &reducedRows{}, nil
}
rv, err := reduceFn(nil, values, true)
if err != nil {
return nil, err
Expand Down
187 changes: 187 additions & 0 deletions x/sqlite/reduce_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

//go:build !js
// +build !js

package sqlite

import (
"io"
"testing"

"github.com/google/go-cmp/cmp"
"gitlab.com/flimzy/testy"

"github.com/go-kivik/kivik/v4"
)

type testReduceRows []*reduceRow

var _ reduceRows = &testReduceRows{}

func (r *testReduceRows) Next() (*reduceRow, error) {
if len(*r) == 0 {
return nil, io.EOF
}
row := (*r)[0]
*r = (*r)[1:]
return row, nil
}

func Test_reduceRows(t *testing.T) {
t.Parallel()

type test struct {
rows reduceRows
reduceFuncJS string
vopts *viewOptions
want []reducedRow
wantErr string
wantErrStatus int
}

tests := testy.NewTable()
tests.Add("no rows", test{
rows: &testReduceRows{},
want: []reducedRow{},
})
tests.Add("one row", test{
rows: &testReduceRows{
{ID: "foo", Key: "foo", Value: &[]string{"1"}[0]},
},
reduceFuncJS: `_sum`,
want: []reducedRow{
{
Key: "null",
Value: "1",
},
},
})
tests.Add("two rows", test{
rows: &testReduceRows{
{ID: "foo", Key: "foo", Value: &[]string{"1"}[0]},
{ID: "bar", Key: "bar", Value: &[]string{"1"}[0]},
},
reduceFuncJS: `_sum`,
want: []reducedRow{
{
Key: "null",
Value: "2",
},
},
})
tests.Add("group=true", test{
rows: &testReduceRows{
{ID: "foo", Key: "foo", Value: &[]string{"1"}[0]},
{ID: "bar", Key: "bar", Value: &[]string{"1"}[0]},
},
reduceFuncJS: `_sum`,
vopts: &viewOptions{
group: true,
},
want: []reducedRow{
{
Key: "foo",
Value: "1",
},
{
Key: "bar",
Value: "1",
},
},
})
tests.Add("group_level=1", test{
rows: &testReduceRows{
{ID: "foo", Key: `["a","b"]`, Value: &[]string{"1"}[0]},
{ID: "bar", Key: `["a","b"]`, Value: &[]string{"1"}[0]},
},
reduceFuncJS: `_sum`,
vopts: &viewOptions{
group: true,
groupLevel: 1,
},
want: []reducedRow{
{
Key: `["a"]`,
Value: "2",
},
},
})

/* TODO:
- group_level=0 vs group_level = max
*/

tests.Run(t, func(t *testing.T, tt test) {
t.Parallel()

d := newDB(t)
var reduceFuncJS *string
if tt.reduceFuncJS != "" {
reduceFuncJS = &tt.reduceFuncJS
}
vopts := &viewOptions{
sorted: true,
}
if tt.vopts != nil {
vopts = tt.vopts
}
got, err := d.DB.(*db).reduceRows(tt.rows, reduceFuncJS, vopts)
if !testy.ErrorMatches(tt.wantErr, err) {
t.Errorf("Unexpected error: %s", err)
}
if status := kivik.HTTPStatus(err); status != tt.wantErrStatus {
t.Errorf("Unexpected status: %d", status)
}
if err != nil {
return
}
checkReducedRows(t, tt.want, got)
})
}

type reducedRow struct {
ID string
Rev string
Key string
Value string
Doc string
Error string
}

func checkReducedRows(t *testing.T, want []reducedRow, got *reducedRows) {
t.Helper()
g := make([]reducedRow, 0, len(*got))
for _, row := range *got {
newRow := reducedRow{
ID: row.ID,
Rev: row.Rev,
Key: string(row.Key),
}
if row.Value != nil {
v, _ := io.ReadAll(row.Value)
newRow.Value = string(v)
}
if row.Doc != nil {
d, _ := io.ReadAll(row.Doc)
newRow.Doc = string(d)
}
if row.Error != nil {
newRow.Error = row.Error.Error()
}
g = append(g, newRow)
}
if d := cmp.Diff(want, g); d != "" {
t.Errorf("Unexpected reduced rows:\n%s", d)
}
}

0 comments on commit ffb8060

Please sign in to comment.