Skip to content

Commit

Permalink
Merge pull request #970 from go-kivik/mergQueryMethods2
Browse files Browse the repository at this point in the history
Improve views support
  • Loading branch information
flimzy committed May 14, 2024
2 parents 8b40051 + f55db2c commit 2d75f7c
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 74 deletions.
98 changes: 73 additions & 25 deletions x/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (d *db) Query(ctx context.Context, ddoc, view string, options driver.Option
return nil, err
}

results, err := d.performQuery(ctx, ddoc, view, update, reduce, group, includeDocs, groupLevel, limit, skip)
results, err := d.performQuery(ctx, ddoc, view, update, reduce, group, includeDocs, conflicts, groupLevel, limit, skip)
if err != nil {
return nil, err
}
Expand All @@ -104,7 +104,7 @@ func (d *db) performQuery(
ctx context.Context,
ddoc, view, update string,
reduce *bool,
group, includeDocs bool,
group, includeDocs, conflicts bool,
groupLevel uint64,
limit, skip int64,
) (driver.Rows, error) {
Expand Down Expand Up @@ -133,6 +133,19 @@ func (d *db) performQuery(
AND rev_id = $3
AND func_type = 'reduce'
AND func_name = $4
),
leaves AS (
SELECT
rev.id,
rev.rev,
rev.rev_id,
doc.doc,
doc.deleted
FROM {{ .Revs }} AS rev
LEFT JOIN {{ .Revs }} AS child ON child.id = rev.id AND rev.rev = child.parent_rev AND rev.rev_id = child.parent_rev_id
JOIN {{ .Docs }} AS doc ON rev.id = doc.id AND rev.rev = doc.rev AND rev.rev_id = doc.rev_id
WHERE child.id IS NULL
AND NOT doc.deleted
)
SELECT
Expand Down Expand Up @@ -177,11 +190,13 @@ func (d *db) performQuery(
map.value,
IIF($7, docs.rev || '-' || docs.rev_id, "") AS rev,
IIF($7, docs.doc, NULL) AS doc,
"" AS conflicts
IIF($8, GROUP_CONCAT(conflicts.rev || '-' || conflicts.rev_id, ','), NULL) AS conflicts
FROM {{ .Map }} AS map
JOIN reduce
JOIN {{ .Docs }} AS docs ON map.id = docs.id AND map.rev = docs.rev AND map.rev_id = docs.rev_id
LEFT JOIN leaves AS conflicts ON conflicts.id = map.id AND NOT (map.rev = conflicts.rev AND map.rev_id = conflicts.rev_id)
WHERE $6 == FALSE OR NOT reduce.reducable
GROUP BY map.id, map.key, map.value, map.rev, map.rev_id
ORDER BY key
LIMIT %[1]d OFFSET %[2]d
)
Expand All @@ -190,7 +205,7 @@ func (d *db) performQuery(
results, err = d.db.QueryContext( //nolint:rowserrcheck // Err checked in Next
ctx, query,
"_design/"+ddoc, rev.rev, rev.id, view, kivik.EndKeySuffix, reduce,
includeDocs,
includeDocs, conflicts,
)
switch {
case errIsNoSuchTable(err):
Expand Down Expand Up @@ -365,6 +380,18 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision
}

query := d.query(`
WITH leaves AS (
SELECT
rev.id AS id,
rev.rev AS rev,
rev.rev_id AS rev_id,
doc.doc,
doc.deleted
FROM {{ .Revs }} AS rev
LEFT JOIN {{ .Revs }} AS child ON child.id = rev.id AND rev.rev = child.parent_rev AND rev.rev_id = child.parent_rev_id
JOIN {{ .Docs }} AS doc ON rev.id = doc.id AND rev.rev = doc.rev AND rev.rev_id = doc.rev_id
WHERE child.id IS NULL
)
SELECT
CASE WHEN row_number = 1 THEN seq END AS seq,
CASE WHEN row_number = 1 THEN id END AS id,
Expand All @@ -379,27 +406,48 @@ func (d *db) updateIndex(ctx context.Context, ddoc, view, mode string) (revision
rev_pos
FROM (
SELECT
doc.seq AS seq,
rev.id AS id,
rev.rev || '-' || rev.rev_id AS rev,
doc.doc AS doc,
doc.deleted AS deleted,
SUM(CASE WHEN bridge.pk IS NOT NULL THEN 1 ELSE 0 END) OVER (PARTITION BY rev.id, rev.rev, rev.rev_id) AS attachment_count,
ROW_NUMBER() OVER (PARTITION BY rev.id, rev.rev, rev.rev_id) AS row_number,
att.filename,
att.content_type,
att.length,
att.digest,
att.rev_pos
FROM {{ .Revs }} AS rev
LEFT JOIN {{ .Revs }} AS child ON rev.id = child.id AND rev.rev = child.parent_rev AND rev.rev_id = child.parent_rev_id
JOIN {{ .Docs }} AS doc ON rev.id = doc.id AND rev.rev = doc.rev AND rev.rev_id = doc.rev_id
LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON doc.id = bridge.id AND doc.rev = bridge.rev AND doc.rev_id = bridge.rev_id
LEFT JOIN {{ .Attachments }} AS att ON bridge.pk = att.pk
WHERE rev.id NOT LIKE '_local/%'
AND child.id IS NULL
AND doc.seq > $1
ORDER BY doc.seq
seq.seq AS seq,
doc.id AS id,
doc.rev || '-' || doc.rev_id AS rev,
seq.doc AS doc,
seq.deleted AS deleted,
doc.attachment_count,
doc.row_number,
doc.filename,
doc.content_type,
doc.length,
doc.digest,
doc.rev_pos
FROM {{ .Docs }} AS seq
LEFT JOIN (
SELECT
rev.id,
rev.rev,
rev.rev_id,
SUM(CASE WHEN bridge.pk IS NOT NULL THEN 1 ELSE 0 END) OVER (PARTITION BY rev.id, rev.rev, rev.rev_id) AS attachment_count,
ROW_NUMBER() OVER (PARTITION BY rev.id, rev.rev, rev.rev_id) AS row_number,
att.filename,
att.content_type,
att.length,
att.digest,
att.rev_pos
FROM (
SELECT
id AS id,
rev AS rev,
rev_id AS rev_id,
IIF($1, doc, NULL) AS doc,
deleted AS deleted, -- TODO:remove this?
ROW_NUMBER() OVER (PARTITION BY id ORDER BY rev DESC, rev_id DESC) AS rank
FROM leaves
) AS rev
LEFT JOIN {{ .AttachmentsBridge }} AS bridge ON rev.id = bridge.id AND rev.rev = bridge.rev AND rev.rev_id = bridge.rev_id
LEFT JOIN {{ .Attachments }} AS att ON bridge.pk = att.pk
WHERE rev.rank = 1
) AS doc ON seq.id = doc.id AND seq.rev = doc.rev AND seq.rev_id = doc.rev_id
WHERE doc.id NOT LIKE '_local/%'
AND seq.seq > $1
ORDER BY seq.seq
)
`)
docs, err := d.db.QueryContext(ctx, query, lastSeq)
Expand Down
71 changes: 70 additions & 1 deletion x/sqlite/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,76 @@ func TestDBQuery(t *testing.T) {
},
}
})
tests.Add("views should only include winning rev in case of conflict", func(t *testing.T) interface{} {
d := newDB(t)
_ = d.tPut("_design/foo", map[string]interface{}{
"views": map[string]interface{}{
"bar": map[string]string{
"map": `function(doc) {
if (doc.key) {
emit(doc.key, doc.value);
}
}`,
},
},
})
_ = d.tPut("a", map[string]interface{}{
"key": "a",
"value": 1,
"_rev": "1-abc",
}, kivik.Param("new_edits", false))
_ = d.tPut("a", map[string]interface{}{
"key": "a",
"value": 2,
"_rev": "1-xyz",
}, kivik.Param("new_edits", false))

return test{
db: d,
ddoc: "_design/foo",
view: "_view/bar",
want: []rowResult{
{ID: "a", Key: `"a"`, Value: "2"},
},
}
})
tests.Add("conflicts=true", func(t *testing.T) interface{} {
d := newDB(t)
_ = d.tPut("_design/foo", map[string]interface{}{
"views": map[string]interface{}{
"bar": map[string]string{
"map": `function(doc) {
if (doc.key) {
emit(doc.key, doc.value);
}
}`,
},
},
})
_ = d.tPut("a", map[string]interface{}{
"key": "a",
"value": 1,
"_rev": "1-abc",
}, kivik.Param("new_edits", false))
_ = d.tPut("a", map[string]interface{}{
"key": "a",
"value": 2,
"_rev": "1-xyz",
}, kivik.Param("new_edits", false))

return test{
db: d,
ddoc: "_design/foo",
view: "_view/bar",
options: kivik.Params(map[string]interface{}{
"include_docs": true,
"conflicts": true,
}),
want: []rowResult{
{ID: "a", Key: `"a"`, Value: "2", Doc: `{"_id":"a","_rev":"1-xyz","key":"a","value":2,"_conflicts":["1-abc"]}`},
},
}
})
/*
TODO:
- _stats
Expand All @@ -1309,7 +1379,6 @@ func TestDBQuery(t *testing.T) {
- group behavior
- _stats (https://docs.couchdb.org/en/stable/ddocs/ddocs.html#stats)
- Options:
- conflicts
- descending
- endkey
- end_key
Expand Down
13 changes: 0 additions & 13 deletions x/sqlite/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,6 @@ var schema = []string{
FOREIGN KEY (id, rev, rev_id) REFERENCES {{ .Docs }} (id, rev, rev_id) ON DELETE CASCADE,
UNIQUE (id, rev, rev_id, pk)
)`,
`CREATE VIEW {{ .Leaves }} AS
SELECT
doc.seq AS seq,
rev.id AS id,
rev.rev AS rev,
rev.rev_id AS rev_id,
doc.doc AS doc,
doc.deleted AS deleted
FROM {{ .Revs }} AS rev
LEFT JOIN {{ .Revs }} AS child ON rev.id = child.id AND rev.rev = child.parent_rev AND rev.rev_id = child.parent_rev_id
JOIN {{ .Docs }} AS doc ON rev.id = doc.id AND rev.rev = doc.rev AND rev.rev_id = doc.rev_id
WHERE child.id IS NULL
`,
/*
The .Design table is used to store design documents. The schema is as follows:
- id: The document ID.
Expand Down
5 changes: 0 additions & 5 deletions x/sqlite/templ.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ func (t *tmplFuncs) AttachmentsBridge() string {
return strconv.Quote(t.db.name + "_attachments_bridge")
}

func (t *tmplFuncs) Leaves() string {
return strconv.Quote(t.db.name + "_leaves")
}

func (t *tmplFuncs) Design() string {
return strconv.Quote(t.db.name + "_design")
}
Expand Down Expand Up @@ -101,7 +97,6 @@ func (t *tmplFuncs) IndexMap() string {
// {{ .Revs }} -> db.name + "_revs"
// {{ .Attachments }} -> db.name + "_attachments"
// {{ .AttachmentsBridge }} -> db.name + "_attachments_bridge"
// {{ .Leaves }} -> db.name + "_leaves"
// {{ .Design }} -> db.name + "_design"
func (d *db) query(format string) string {
var buf bytes.Buffer
Expand Down
61 changes: 31 additions & 30 deletions x/sqlite/views.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (d *db) queryBuiltinView(
limit, skip int64,
includeDocs, descending, inclusiveEnd, conflicts bool,
) (driver.Rows, error) {
args := []interface{}{includeDocs}
args := []interface{}{includeDocs, conflicts}

where := []string{"rev.rank = 1"}
switch view {
Expand All @@ -90,34 +90,37 @@ func (d *db) queryBuiltinView(
}

query := fmt.Sprintf(d.query(`
WITH RankedRevisions AS (
WITH leaves AS (
SELECT
id AS id,
rev AS rev,
rev_id AS rev_id,
IIF($1, doc, NULL) AS doc,
deleted AS deleted, -- TODO:remove this?
ROW_NUMBER() OVER (PARTITION BY id ORDER BY rev DESC, rev_id DESC) AS rank
FROM {{ .Leaves }} AS rev
WHERE NOT deleted
rev.id,
rev.rev,
rev.rev_id,
doc.doc,
doc.deleted
FROM {{ .Revs }} AS rev
LEFT JOIN {{ .Revs }} AS child ON child.id = rev.id AND rev.rev = child.parent_rev AND rev.rev_id = child.parent_rev_id
JOIN {{ .Docs }} AS doc ON rev.id = doc.id AND rev.rev = doc.rev AND rev.rev_id = doc.rev_id
WHERE child.id IS NULL
AND NOT doc.deleted
)
SELECT
rev.id AS id,
rev.id AS key,
'{"value":{"rev":"' || rev.rev || '-' || rev.rev_id || '"}}' AS value,
rev.rev || '-' || rev.rev_id AS rev,
rev.doc AS doc,
GROUP_CONCAT(conflicts.rev || '-' || conflicts.rev_id, ',') AS conflicts
FROM RankedRevisions AS rev
LEFT JOIN (
SELECT rev.id, rev.rev, rev.rev_id
FROM {{ .Revs }} AS rev
LEFT JOIN {{ .Revs }} AS child
ON child.id = rev.id
AND rev.rev = child.parent_rev
AND rev.rev_id = child.parent_rev_id
WHERE child.id IS NULL
) AS conflicts ON conflicts.id = rev.id AND NOT (rev.rev = conflicts.rev AND rev.rev_id = conflicts.rev_id)
IIF($2, GROUP_CONCAT(conflicts.rev || '-' || conflicts.rev_id, ','), NULL) AS conflicts
FROM (
SELECT
id AS id,
rev AS rev,
rev_id AS rev_id,
IIF($1, doc, NULL) AS doc,
deleted AS deleted, -- TODO:remove this?
ROW_NUMBER() OVER (PARTITION BY id ORDER BY rev DESC, rev_id DESC) AS rank
FROM leaves
) AS rev
LEFT JOIN leaves AS conflicts ON conflicts.id = rev.id AND NOT (rev.rev = conflicts.rev AND rev.rev_id = conflicts.rev_id)
WHERE %[2]s
GROUP BY rev.id, rev.rev, rev.rev_id
ORDER BY id %[1]s
Expand All @@ -129,10 +132,9 @@ func (d *db) queryBuiltinView(
}

return &rows{
ctx: ctx,
db: d,
rows: results,
conflicts: conflicts,
ctx: ctx,
db: d,
rows: results,
}, nil
}

Expand All @@ -144,10 +146,9 @@ func descendingToDirection(descending bool) string {
}

type rows struct {
ctx context.Context
db *db
rows *sql.Rows
conflicts bool
ctx context.Context
db *db
rows *sql.Rows
}

var _ driver.Rows = (*rows)(nil)
Expand Down Expand Up @@ -187,7 +188,7 @@ func (r *rows) Next(row *driver.Row) error {
Rev: rev,
Doc: doc,
}
if r.conflicts {
if conflicts != nil {
toMerge.Conflicts = strings.Split(*conflicts, ",")
}
row.Doc = toMerge.toReader()
Expand Down

0 comments on commit 2d75f7c

Please sign in to comment.