Skip to content

Commit

Permalink
Merge pull request #936 from go-kivik/changesFeatures
Browse files Browse the repository at this point in the history
Longpoll changes feed with attachments
  • Loading branch information
flimzy committed Apr 15, 2024
2 parents 7e13000 + 93ec396 commit 31440b7
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 26 deletions.
2 changes: 1 addition & 1 deletion couchdb/chttp/chttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ func TestNetError(t *testing.T) {
return err
}(),
status: http.StatusBadGateway,
err: `Get "?http:https://127.0.0.1:\d+"?: context deadline exceeded`,
err: `(Get "?http:https://127.0.0.1:\d+"?: context deadline exceeded|dial tcp 127.0.0.1:\d+: i/o timeout)`,
},
{
name: "cannot resolve host",
Expand Down
50 changes: 30 additions & 20 deletions x/sqlite/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,19 +346,20 @@ func (d *db) Changes(ctx context.Context, options driver.Options) (driver.Change
return nil, err
}
if sinceNow && feed == feedLongpoll {
return d.newLongpollChanges(ctx, opts.includeDocs())
return d.newLongpollChanges(ctx, opts.includeDocs(), opts.attachments())
}

return d.newNormalChanges(ctx, opts, since, lastSeq, sinceNow, feed)
}

type longpollChanges struct {
stmt *sql.Stmt
since uint64
lastSeq string
ctx context.Context
cancel context.CancelFunc
changes <-chan longpollChange
stmt *sql.Stmt
since uint64
attachments bool
lastSeq string
ctx context.Context
cancel context.CancelFunc
changes <-chan longpollChange
}

type longpollChange struct {
Expand All @@ -368,14 +369,14 @@ type longpollChange struct {

var _ driver.Changes = (*longpollChanges)(nil)

func (d *db) newLongpollChanges(ctx context.Context, includeDocs bool) (*longpollChanges, error) {
func (d *db) newLongpollChanges(ctx context.Context, includeDocs, attachments bool) (*longpollChanges, error) {
if includeDocs {
return d.newLongpollChangesWithDocs(ctx)
return d.newLongpollChangesWithDocs(ctx, attachments)
}
return d.newLongpollChangesWithoutDocs(ctx)
}

func (d *db) newLongpollChangesWithDocs(ctx context.Context) (*longpollChanges, error) {
func (d *db) newLongpollChangesWithDocs(ctx context.Context, attachments bool) (*longpollChanges, error) {
since, err := d.lastSeq(ctx)
if err != nil {
return nil, err
Expand All @@ -392,7 +393,8 @@ func (d *db) newLongpollChangesWithDocs(ctx context.Context) (*longpollChanges,
content_type,
length,
digest,
rev_pos
rev_pos,
data
FROM (
SELECT
doc.id,
Expand All @@ -405,6 +407,7 @@ func (d *db) newLongpollChangesWithDocs(ctx context.Context) (*longpollChanges,
att.length,
att.digest,
att.rev_pos,
IIF($2, att.data, NULL) AS data,
ROW_NUMBER() OVER () AS row_number
FROM (
SELECT
Expand All @@ -430,11 +433,12 @@ func (d *db) newLongpollChangesWithDocs(ctx context.Context) (*longpollChanges,
ctx, cancel := context.WithCancel(ctx)
changes := make(chan longpollChange)
c := &longpollChanges{
stmt: stmt,
since: since,
ctx: ctx,
cancel: cancel,
changes: changes,
stmt: stmt,
since: since,
attachments: attachments,
ctx: ctx,
cancel: cancel,
changes: changes,
}

go c.watch(changes)
Expand All @@ -459,7 +463,8 @@ func (d *db) newLongpollChangesWithoutDocs(ctx context.Context) (*longpollChange
NULL AS content_type,
NULL AS length,
NULL AS digest,
NULL AS rev_pos
NULL AS rev_pos,
NULL AS data
FROM (
SELECT
id,
Expand Down Expand Up @@ -504,7 +509,7 @@ func (c *longpollChanges) watch(changes chan<- longpollChange) {
bo.MaxElapsedTime = 0

err := backoff.Retry(func() error {
rows, err := c.stmt.QueryContext(c.ctx, c.since)
rows, err := c.stmt.QueryContext(c.ctx, c.since, c.attachments)
if err != nil {
return backoff.Permanent(err)
}
Expand All @@ -522,11 +527,12 @@ func (c *longpollChanges) watch(changes chan<- longpollChange) {
length *int64
digest *md5sum
revPos *int
data *[]byte
)
for rows.Next() {
if err := rows.Scan(
&rowID, &rowSeq, &rowDeleted, &rowRev, &rowDoc,
&filename, &contentType, &length, &digest, &revPos,
&filename, &contentType, &length, &digest, &revPos, &data,
); err != nil {
return backoff.Permanent(err)
}
Expand All @@ -542,12 +548,16 @@ func (c *longpollChanges) watch(changes chan<- longpollChange) {
if atts == nil {
atts = map[string]*attachment{}
}
atts[*filename] = &attachment{
att := &attachment{
ContentType: *contentType,
Digest: *digest,
Length: *length,
RevPos: *revPos,
}
if data != nil {
att.Data, _ = json.Marshal(*data)
}
atts[*filename] = att
}
}
if err := rows.Err(); err != nil {
Expand Down
85 changes: 80 additions & 5 deletions x/sqlite/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ func TestDBChanges(t *testing.T) {

/*
TODO:
- attachments for longpoll feed
- ETag should be based only on last sequence, I think
- Options
- doc_ids
Expand Down Expand Up @@ -727,6 +726,82 @@ loop:
}
}

func TestDBChanges_longpoll_include_docs_and_attachments(t *testing.T) {
t.Parallel()
db := newDB(t)

// First create a single document to seed the changes feed
rev := db.tPut("doc1", map[string]string{"foo": "bar"})

// Start the changes feed, with feed=longpoll&since=now to block until
// another change is made.
feed, err := db.Changes(context.Background(), kivik.Params(map[string]interface{}{
"feed": "longpoll",
"attachments": true,
"since": "now",
"include_docs": true,
}))
if err != nil {
t.Fatalf("Failed to start changes feed: %s", err)
}
t.Cleanup(func() {
_ = feed.Close()
})

var mu sync.Mutex
var rev2 string
// Make a change to the database after a short delay
go func() {
time.Sleep(100 * time.Millisecond)
mu.Lock()
rev2 = db.tPut("doc1", map[string]interface{}{
"_attachments": newAttachments().
add("text.txt", "boring text").
add("text2.txt", "more boring text"),
}, kivik.Rev(rev))
mu.Unlock()
}()

start := time.Now()
// Meanwhile, the changes feed should block until the change is made
// iterate over feed
var got []driver.Change

loop:
for {
change := driver.Change{}
err := feed.Next(&change)
switch err {
case io.EOF:
break loop
case nil:
// continue
default:
t.Fatalf("iteration failed: %s", err)
}
got = append(got, change)
}

if time.Since(start) < 100*time.Millisecond {
t.Errorf("Changes feed returned too quickly")
}

mu.Lock()
wantChanges := []driver.Change{
{
ID: "doc1",
Seq: "2",
Changes: driver.ChangedRevs{rev2},
Doc: []byte(`{"_id":"doc1","_rev":"` + rev2 + `","_attachments":{"text.txt":{"content_type":"text/plain","digest":"md5-OIJSy6hr5f32Yfxm8ex95w==","length":11,"revpos":2,"data":"Ym9yaW5nIHRleHQ="},"text2.txt":{"content_type":"text/plain","digest":"md5-JlqzqsA7DA4Lw2arCp9iXQ==","length":16,"revpos":2,"data":"bW9yZSBib3JpbmcgdGV4dA=="}}}`),
},
}
mu.Unlock()

if d := cmp.Diff(wantChanges, got); d != "" {
t.Errorf("Unexpected changes:\n%s", d)
}
}

func TestDBChanges_longpoll_include_docs_with_attachment_stubs(t *testing.T) {
t.Parallel()
db := newDB(t)
Expand Down Expand Up @@ -920,7 +995,7 @@ func Test_longpoll_changes_query(t *testing.T) {

d := newDB(t)

changes, err := d.DB.(*db).newLongpollChanges(context.Background(), true)
changes, err := d.DB.(*db).newLongpollChanges(context.Background(), true, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -952,7 +1027,7 @@ func Test_longpoll_changes_query(t *testing.T) {
var result row
if err := rows.Scan(
&result.ID, &result.Seq, &result.Deleted, &result.Rev, &result.Doc,
&result.Filename, discard{}, discard{}, discard{}, discard{},
&result.Filename, discard{}, discard{}, discard{}, discard{}, discard{},
); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -981,7 +1056,7 @@ func Test_longpoll_changes_query_without_docs(t *testing.T) {

d := newDB(t)

changes, err := d.DB.(*db).newLongpollChanges(context.Background(), false)
changes, err := d.DB.(*db).newLongpollChanges(context.Background(), false, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1013,7 +1088,7 @@ func Test_longpoll_changes_query_without_docs(t *testing.T) {
var result row
if err := rows.Scan(
&result.ID, &result.Seq, &result.Deleted, &result.Rev, &result.Doc,
&result.Filename, discard{}, discard{}, discard{}, discard{},
&result.Filename, discard{}, discard{}, discard{}, discard{}, discard{},
); err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 31440b7

Please sign in to comment.