Skip to content

Commit

Permalink
Merge pull request #996 from go-kivik/firstlastkey
Browse files Browse the repository at this point in the history
Allow reducing in batches
  • Loading branch information
flimzy authored Jun 16, 2024
2 parents 709d2a4 + 19583c7 commit c1dc743
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 92 deletions.
6 changes: 3 additions & 3 deletions x/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,14 +406,14 @@ func (d *db) reduce(ctx context.Context, seq int, ddoc, name, rev string, result
}
callback := func(depth uint, rows []reduce.Row) {
for _, row := range rows {
key, _ := json.Marshal(row.Key)
key, _ := json.Marshal(row.FirstKey)
var value []byte
if row.Value != nil {
value, _ = json.Marshal(row.Value)
}
if _, err = stmt.ExecContext(ctx, seq, depth, key, row.First, key, row.Last, value); err != nil {
if _, err = stmt.ExecContext(ctx, seq, depth, key, row.FirstPK, key, row.LastPK, value); err != nil {
d.logger.Printf("Failed to insert reduce result [%v, %v, %v, %v, %v, %v, %v]: %s",
seq, depth, key, row.First, key, row.Last, value,
seq, depth, key, row.FirstPK, key, row.LastPK, value,
err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions x/sqlite/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ func (r *reduceRowIter) ReduceNext(row *reduce.Row) error {
}
var key, value *[]byte
err := r.results.Scan(
&row.ID, &key, &value, &row.First, &row.Last, discard{},
&row.ID, &key, &value, &row.FirstPK, &row.LastPK, discard{},
discard{}, discard{}, discard{}, discard{}, discard{}, discard{}, discard{},
)
if err != nil {
return err
}
if key != nil {
if err = json.Unmarshal(*key, &row.Key); err != nil {
if err = json.Unmarshal(*key, &row.FirstKey); err != nil {
return err
}
} else {
row.Key = nil
row.FirstKey = nil
}
if value != nil {
if err = json.Unmarshal(*value, &row.Value); err != nil {
Expand Down
99 changes: 66 additions & 33 deletions x/sqlite/reduce/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,29 @@ type Reducer interface {

// Row represents a single row of data to be reduced, or the result of a
// reduction. Key and Value are expected to represent JSON serializable data,
// and passing non-serializable data may result in a panic. ID is only used for
// input rows as returned by a map function. It is always empty for output rows.
// and passing non-serializable data may result in a panic.
type Row struct {
// First and Last reference the key's primary key, and are used to
// disambiguate rows with the same key. For map inputs, they should be
// the same. For reduced inputs, they represent a range of keys.
First int
Last int
ID string
Key any
// ID is the document ID. It is only populated for input rows. It is always
// empty for output rows.
ID string

// TargetKey is the key of a map row, or the key of a reduced row. It is
// used in output for grouping.
TargetKey any

// FirstKey represents the key of a map row, or the first key of a reduced
// row. It is used for grouping and caching.
FirstKey any
// FirstPK disambiguates multiple identical keys.
FirstPK int

// LastKey is the last key of a reduced row. It is only populated for
// reduced rows. LastKey and LastPK may be omitted for map rows. If omitted,
// Key and PK are used.
LastKey any
// LastPK disambiguates multiple identical keys.
LastPK int

Value any
}

Expand All @@ -66,7 +79,7 @@ func (r *Rows) Next(row *driver.Row) error {
}
thisRow := (*r)[0]
*r = (*r)[1:]
row.Key, _ = json.Marshal(thisRow.Key)
row.Key, _ = json.Marshal(thisRow.TargetKey)
value, _ := json.Marshal(thisRow.Value)
row.Value = bytes.NewReader(value)
return nil
Expand Down Expand Up @@ -96,6 +109,8 @@ type Func func(keys [][2]interface{}, values []interface{}, rereduce bool) ([]in
// reduce call. It can be used to cache intermediate results.
type Callback func(depth uint, rows []Row)

const defaultBatchSize = 1000

// Reduce calls fn on rows, and returns the results. The input must be in
// key-sorted order, and may contain both previously reduced rows, and map
// output rows. cb, if not nil, is called with the results of every
Expand All @@ -109,16 +124,23 @@ type Callback func(depth uint, rows []Row)
// 0: No grouping, same as group=false
// 1+: Group by the first N elements of the key, same as group_level=N
func Reduce(rows Reducer, javascript string, logger *log.Logger, groupLevel int, cb Callback) (*Rows, error) {
return reduceWithBatchSize(rows, javascript, logger, groupLevel, cb, defaultBatchSize)
}

func reduceWithBatchSize(rows Reducer, javascript string, logger *log.Logger, groupLevel int, cb Callback, batchSize int) (*Rows, error) {
fn, err := ParseFunc(javascript, logger)
if err != nil {
return nil, err
}
return reduce(rows, fn, groupLevel, cb)
return reduce(rows, fn, groupLevel, batchSize, cb)
}

func reduce(rows Reducer, fn Func, groupLevel int, cb Callback) (*Rows, error) {
func reduce(rows Reducer, fn Func, groupLevel int, batchSize int, cb Callback) (*Rows, error) {
out := make(Rows, 0, 1)
var first, last int
var (
firstKey, lastKey any
firstPK, lastPK int
)

callReduce := func(keys [][2]interface{}, values []interface{}, rereduce bool, key any) error {
if len(keys) == 0 {
Expand All @@ -127,10 +149,12 @@ func reduce(rows Reducer, fn Func, groupLevel int, cb Callback) (*Rows, error) {
if len(keys) == 1 && rereduce {
// Nothing to rereduce if we have only a single input--just pass it through
out = append(out, Row{
Key: key,
Value: values[0],
First: first,
Last: last,
TargetKey: key,
FirstKey: firstKey,
FirstPK: firstPK,
LastKey: lastKey,
LastPK: lastPK,
Value: values[0],
})
return nil
}
Expand All @@ -141,15 +165,17 @@ func reduce(rows Reducer, fn Func, groupLevel int, cb Callback) (*Rows, error) {
rows := make([]Row, 0, len(results))
for _, result := range results {
row := Row{
Value: result,
First: first,
Last: last,
FirstKey: firstKey,
FirstPK: firstPK,
LastKey: lastKey,
LastPK: lastPK,
Value: result,
}
if keyLen(key) > 0 {
row.Key = key
row.TargetKey = key
}
rows = append(rows, row)
first, last = 0, 0
firstKey, firstPK, lastKey, lastPK = nil, 0, nil, 0
}
if cb != nil {
var depth uint
Expand Down Expand Up @@ -182,8 +208,9 @@ func reduce(rows Reducer, fn Func, groupLevel int, cb Callback) (*Rows, error) {
}

switch {
case (groupLevel == 0 && rereduce != (row.ID == "")) ||
(targetKey != nil && (!reflect.DeepEqual(targetKey, truncateKey(row.Key, groupLevel)) || rereduce != (row.ID == ""))):
case (len(keys) >= batchSize) ||
(groupLevel == 0 && rereduce != (row.ID == "")) ||
(targetKey != nil && (!reflect.DeepEqual(targetKey, truncateKey(row.FirstKey, groupLevel)) || rereduce != (row.ID == ""))):
if err := callReduce(keys, values, rereduce, targetKey); err != nil {
return nil, err
}
Expand All @@ -192,16 +219,22 @@ func reduce(rows Reducer, fn Func, groupLevel int, cb Callback) (*Rows, error) {
values = values[:0]
fallthrough
case targetKey == nil:
targetKey = truncateKey(row.Key, groupLevel)
targetKey = truncateKey(row.FirstKey, groupLevel)
rereduce = row.ID == ""
}

if first == 0 {
first = row.First
if firstPK == 0 {
firstKey = row.FirstKey
firstPK = row.FirstPK
}
lastKey = row.LastKey
lastPK = row.LastPK
if lastPK == 0 {
lastKey = row.FirstKey
lastPK = row.FirstPK
}
last = row.Last

keys = append(keys, [2]interface{}{row.Key, row.ID})
keys = append(keys, [2]interface{}{row.FirstKey, row.ID})
values = append(values, row.Value)
}

Expand All @@ -216,11 +249,11 @@ func reduce(rows Reducer, fn Func, groupLevel int, cb Callback) (*Rows, error) {

// If we received mixed map/reduce inputs, then we may need to re-reduce
// the output before returning.
lastKey := truncateKey(out[0].Key, groupLevel)
finalKey := truncateKey(out[0].FirstKey, groupLevel)
for i := 1; i < len(out); i++ {
key := truncateKey(out[i].Key, groupLevel)
if reflect.DeepEqual(lastKey, key) {
return reduce(&out, fn, groupLevel, cb)
key := truncateKey(out[i].FirstKey, groupLevel)
if reflect.DeepEqual(finalKey, key) {
return reduce(&out, fn, groupLevel, batchSize, cb)
}
}

Expand Down
Loading

0 comments on commit c1dc743

Please sign in to comment.