Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventSource support #11235

Merged
merged 17 commits into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add event-stream support
If the browser supports EventSource switch to use this instead of
polling notifications.

Signed-off-by: Andrew Thornton <[email protected]>
  • Loading branch information
zeripath committed Apr 28, 2020
commit f5bee5915a401ab1e9aff4c9cd6808c41e3fe03c
5 changes: 4 additions & 1 deletion custom/conf/app.ini.sample
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,15 @@ DESCRIPTION = Gitea (Git with a cup of tea) is a painless self-hosted Git servic
KEYWORDS = go,git,self-hosted,gitea

[ui.notification]
; Control how often notification is queried to update the notification
; Control how often the notification endpoint is polled to update the notification
; The timeout will increase to MAX_TIMEOUT in TIMEOUT_STEPs if the notification count is unchanged
; Set MIN_TIMEOUT to 0 to turn off
MIN_TIMEOUT = 10s
MAX_TIMEOUT = 60s
TIMEOUT_STEP = 10s
; This setting determines how often the db is queried to get the latest notification counts.
; If the browser client has EventSource the EventSource will be used in preference to polling notification.
zeripath marked this conversation as resolved.
Show resolved Hide resolved
EVENT_SOURCE_UPDATE_TIME = 10s

[markdown]
; Render soft line breaks as hard line breaks, which means a single newline character between
Expand Down
3 changes: 2 additions & 1 deletion docs/content/doc/advanced/config-cheat-sheet.en-us.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.

### UI - Notification (`ui.notification`)

- `MIN_TIMEOUT`: **10s**: These options control how often notification is queried to update the notification count. On page load the notification count will be checked after `MIN_TIMEOUT`. The timeout will increase to `MAX_TIMEOUT` by `TIMEOUT_STEP` if the notification count is unchanged. Set MIN_TIMEOUT to 0 to turn off.
- `MIN_TIMEOUT`: **10s**: These options control how often notification endpoint is polled to update the notification count. On page load the notification count will be checked after `MIN_TIMEOUT`. The timeout will increase to `MAX_TIMEOUT` by `TIMEOUT_STEP` if the notification count is unchanged. Set MIN_TIMEOUT to 0 to turn off.
- `MAX_TIMEOUT`: **60s**.
- `TIMEOUT_STEP`: **10s**.
- `EVENT_SOURCE_UPDATE_TIME`: **10s**: This setting determines how often the database is queried to update notification counts. If the browser client has `EventSource` - the `EventSource` will be used in preference to polling the notification endpoint.
zeripath marked this conversation as resolved.
Show resolved Hide resolved


## Markdown (`markdown`)
Expand Down
37 changes: 37 additions & 0 deletions models/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package models
import (
"fmt"
"path"
"strconv"
techknowlogick marked this conversation as resolved.
Show resolved Hide resolved

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
Expand Down Expand Up @@ -718,6 +719,42 @@ func getNotificationCount(e Engine, user *User, status NotificationStatus) (coun
return
}

// UserIDCount is a simple coalition of UserID and Count
type UserIDCount struct {
UserID int64
Count int64
}

// GetUIDsAndNotificationCounts between the two provided times
func GetUIDsAndNotificationCounts(since, until timeutil.TimeStamp) ([]UserIDCount, error) {
sql := `SELECT user_id, count(*) AS count FROM notification ` +
`WHERE user_id IN (SELECT user_id FROM notification WHERE updated_unix >= ? AND ` +
`updated_unix < ?) AND status = ? GROUP BY user_id`

res, err := x.Query(sql, since, until, NotificationStatusUnread)
if err != nil {
return nil, err
}

if len(res) == 0 {
return []UserIDCount{}, nil
}

uidCounts := make([]UserIDCount, len(res))
for i, result := range res {
uid, err := strconv.ParseInt(string(result["user_id"]), 10, 64)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
count, err := strconv.ParseInt(string(result["count"]), 10, 64)
if err != nil {
return nil, err
}
uidCounts[i] = UserIDCount{uid, count}
}
return uidCounts, err
}

func setNotificationStatusReadIfUnread(e Engine, userID, issueID int64) error {
notification, err := getIssueNotification(e, userID, issueID)
// ignore if not exists
Expand Down
116 changes: 116 additions & 0 deletions modules/eventsource/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package eventsource

import (
"bytes"
"encoding/json"
"fmt"
"io"
"strings"
"time"
)

func wrapNewlines(w io.Writer, prefix []byte, value []byte) (sum int64, err error) {
if len(value) == 0 {
return
}
n := 0
guillep2k marked this conversation as resolved.
Show resolved Hide resolved
last := 0
for j := bytes.IndexByte(value, '\n'); j > -1; j = bytes.IndexByte(value[last:], '\n') {
n, err = w.Write(prefix)
sum += int64(n)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return
}
n, err = w.Write(value[last : last+j+1])
sum += int64(n)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return
}
last += j + 1
}
n, err = w.Write(prefix)
sum += int64(n)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return
}
n, err = w.Write(value[last:])
sum += int64(n)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return
}
n, err = w.Write([]byte("\n"))
sum += int64(n)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
return
}

// Event is an eventsource event, not all fields need to be set
type Event struct {
// Name represents the value of the event: tag in the stream
Name string
// Data is either JSONified []byte or interface{} that can be JSONd
Data interface{}
// ID represents the ID of an event
ID string
// Retry tells the receiver only to attempt to reconnect to the source after this time
Retry time.Duration
}

// WriteTo writes data to w until there's no more data to write or when an error occurs.
// The return value n is the number of bytes written. Any error encountered during the write is also returned.
func (e *Event) WriteTo(w io.Writer) (int64, error) {
sum := int64(0)
nint := 0
n, err := wrapNewlines(w, []byte("event: "), []byte(e.Name))
zeripath marked this conversation as resolved.
Show resolved Hide resolved
sum += int64(n)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return sum, err
}

if e.Data != nil {
var data []byte
switch v := e.Data.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
var err error
data, err = json.Marshal(e.Data)
if err != nil {
return sum, err
}
}
n, err := wrapNewlines(w, []byte("data: "), data)
sum += int64(n)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return sum, err
}

}

n, err = wrapNewlines(w, []byte("id: "), []byte(e.ID))
sum += int64(n)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return sum, err
}

if e.Retry != 0 {
nint, err = fmt.Fprintf(w, "retry: %d\n", int64(e.Retry/time.Millisecond))
sum += int64(nint)
}

nint, err = w.Write([]byte("\n"))
sum += int64(nint)

return sum, err
}

func (e *Event) String() string {
buf := new(strings.Builder)
_, _ = e.WriteTo(buf)
return buf.String()
}
54 changes: 54 additions & 0 deletions modules/eventsource/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package eventsource

import (
"bytes"
"testing"
)

func Test_wrapNewlines(t *testing.T) {
tests := []struct {
name string
prefix string
value string
output string
}{
{
"check no new lines",
"prefix: ",
"value",
"prefix: value\n",
},
{
"check simple newline",
"prefix: ",
"value1\nvalue2",
"prefix: value1\nprefix: value2\n",
},
{
"check pathological newlines",
"p: ",
"\n1\n\n2\n3\n",
"p: \np: 1\np: \np: 2\np: 3\np: \n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &bytes.Buffer{}
gotSum, err := wrapNewlines(w, []byte(tt.prefix), []byte(tt.value))
if err != nil {
t.Errorf("wrapNewlines() error = %v", err)
return
}
if gotSum != int64(len(tt.output)) {
t.Errorf("wrapNewlines() = %v, want %v", gotSum, int64(len(tt.output)))
}
if gotW := w.String(); gotW != tt.output {
t.Errorf("wrapNewlines() = %v, want %v", gotW, tt.output)
}
})
}
}
83 changes: 83 additions & 0 deletions modules/eventsource/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package eventsource

import (
"sync"
)

// Manager manages the eventsource Messengers
type Manager struct {
mutex sync.Mutex

messengers map[int64]*Messenger
}

var manager *Manager

func init() {
manager = &Manager{
messengers: make(map[int64]*Messenger),
}
}

// GetManager returns a Manager and initializes one as singleton if there's none yet
func GetManager() *Manager {
return manager
}

// Register message channel
func (m *Manager) Register(uid int64) <-chan *Event {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
if !ok {
messenger = NewMessenger(uid)
m.messengers[uid] = messenger
}
m.mutex.Unlock()
return messenger.Register()
}

// Unregister message channel
func (m *Manager) Unregister(uid int64, channel <-chan *Event) {
m.mutex.Lock()
zeripath marked this conversation as resolved.
Show resolved Hide resolved
messenger, ok := m.messengers[uid]
guillep2k marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
messenger = NewMessenger(uid)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
m.messengers[uid] = messenger
zeripath marked this conversation as resolved.
Show resolved Hide resolved
}
m.mutex.Unlock()
zeripath marked this conversation as resolved.
Show resolved Hide resolved
messenger.Unregister(channel)
zeripath marked this conversation as resolved.
Show resolved Hide resolved
}

// UnregisterAll message channels
func (m *Manager) UnregisterAll() {
m.mutex.Lock()
zeripath marked this conversation as resolved.
Show resolved Hide resolved
for _, messenger := range m.messengers {
messenger.UnregisterAll()
}
m.messengers = map[int64]*Messenger{}
m.mutex.Unlock()
zeripath marked this conversation as resolved.
Show resolved Hide resolved
}

// SendMessage sends a message to a particular user
func (m *Manager) SendMessage(uid int64, message *Event) {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
m.mutex.Unlock()
if ok {
messenger.SendMessage(message)
}
}

// SendMessageBlocking sends a message to a particular user
func (m *Manager) SendMessageBlocking(uid int64, message *Event) {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
m.mutex.Unlock()
if ok {
messenger.SendMessageBlocking(message)
}
}
50 changes: 50 additions & 0 deletions modules/eventsource/manager_run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package eventsource

import (
"context"
"time"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
)

// Init starts this eventsource
func (m *Manager) Init() {
go graceful.GetManager().RunWithShutdownContext(m.Run)
}

// Run runs the manager within a provided context
func (m *Manager) Run(ctx context.Context) {
then := timeutil.TimeStampNow()
zeripath marked this conversation as resolved.
Show resolved Hide resolved
timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
loop:
for {
select {
case <-ctx.Done():
timer.Stop()
break loop
case <-timer.C:
now := timeutil.TimeStampNow()
zeripath marked this conversation as resolved.
Show resolved Hide resolved

uidCounts, err := models.GetUIDsAndNotificationCounts(then, now)
if err != nil {
log.Error("Unable to get UIDcounts: %v", err)
}
for _, uidCount := range uidCounts {
m.SendMessage(uidCount.UserID, &Event{
Name: "notification-count",
Data: uidCount,
})
}
then = now
}
}
m.UnregisterAll()
}
Loading