Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue indexer queue redis support #6218

Merged
merged 8 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
finished indexer redis queue
  • Loading branch information
lunny committed Apr 2, 2019
commit 4459d28cc0c1dbc7887aee20ec288123fcb0b152
2 changes: 2 additions & 0 deletions custom/conf/app.ini.sample
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ ISSUE_INDEXER_QUEUE_TYPE = levelqueue
; When ISSUE_INDEXER_QUEUE_TYPE is levelqueue, this will be the queue will be saved path,
; default is indexers/issues.queue
ISSUE_INDEXER_QUEUE_DIR = indexers/issues.queue
; When `ISSUE_INDEXER_QUEUE_TYPE` is `redis`, this will store the redis connection string.
ISSUE_INDEXER_QUEUE_CONN_STR = "addrs=127.0.0.1:6379 db=0"
; Batch queue number, default is 20
ISSUE_INDEXER_QUEUE_BATCH_NUMBER = 20

Expand Down
7 changes: 4 additions & 3 deletions docs/content/doc/advanced/config-cheat-sheet.en-us.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.

- `ISSUE_INDEXER_TYPE`: **bleve**: Issue indexer type, currently support: bleve or db, if it's db, below issue indexer item will be invalid.
- `ISSUE_INDEXER_PATH`: **indexers/issues.bleve**: Index file used for issue search.
- `ISSUE_INDEXER_QUEUE_TYPE`: **levelqueue**: Issue indexer queue, currently support: channel or levelqueue
- `ISSUE_INDEXER_QUEUE_DIR`: **indexers/issues.queue**: When ISSUE_INDEXER_QUEUE_TYPE is levelqueue, this will be the queue will be saved path
- `ISSUE_INDEXER_QUEUE_BATCH_NUMBER`: **20**: Batch queue number
- `ISSUE_INDEXER_QUEUE_TYPE`: **levelqueue**: Issue indexer queue, currently support: `channel`, `levelqueue`, `redis`.
lunny marked this conversation as resolved.
Show resolved Hide resolved
- `ISSUE_INDEXER_QUEUE_DIR`: **indexers/issues.queue**: When `ISSUE_INDEXER_QUEUE_TYPE` is `levelqueue`, this will be the queue will be saved path.
- `ISSUE_INDEXER_QUEUE_CONN_STR`: **addrs=127.0.0.1:6379 db=0**: When `ISSUE_INDEXER_QUEUE_TYPE` is `redis`, this will store the redis connection string.
- `ISSUE_INDEXER_QUEUE_BATCH_NUMBER`: **20**: Batch queue number.

- `REPO_INDEXER_ENABLED`: **false**: Enables code search (uses a lot of disk space, about 6 times more than the repository size).
- `REPO_INDEXER_PATH`: **indexers/repos.bleve**: Index file used for code search.
Expand Down
4 changes: 2 additions & 2 deletions docs/content/doc/advanced/config-cheat-sheet.zh-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ menu:
- `PATH`: Tidb 或者 SQLite3 数据文件存放路径。
- `LOG_SQL`: **true**: 显示生成的SQL,默认为真。


## Indexer (`indexer`)

- `ISSUE_INDEXER_TYPE`: **bleve**: 工单索引类型,当前支持 `bleve` 或 `db`,当为 `db` 时其它工单索引项可不用设置。
- `ISSUE_INDEXER_PATH`: **indexers/issues.bleve**: 工单索引文件存放路径,当索引类型为 `bleve` 时有效。
- `ISSUE_INDEXER_QUEUE_TYPE`: **levelqueue**: 工单索引队列类型,当前支持 `channel` 或 `levelqueue`。
- `ISSUE_INDEXER_QUEUE_TYPE`: **levelqueue**: 工单索引队列类型,当前支持 `channel`, `levelqueue` 或 `redis`。
- `ISSUE_INDEXER_QUEUE_DIR`: **indexers/issues.queue**: 当 `ISSUE_INDEXER_QUEUE_TYPE` 为 `levelqueue` 时,保存索引队列的磁盘路径。
- `ISSUE_INDEXER_QUEUE_CONN_STR`: **addrs=127.0.0.1:6379 db=0**: 当 `ISSUE_INDEXER_QUEUE_TYPE` 为 `redis` 时,保存Redis队列的连接字符串。
- `ISSUE_INDEXER_QUEUE_BATCH_NUMBER`: **20**: 队列处理中批量提交数量。

- `REPO_INDEXER_ENABLED`: **false**: 是否启用代码搜索(启用后会占用比较大的磁盘空间)。
Expand Down
35 changes: 22 additions & 13 deletions modules/indexer/issues/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ type Indexer interface {
}

var (
// issueIndexerUpdateQueue queue of issue ids to be updated
issueIndexerUpdateQueue Queue
issueIndexer Indexer
// issueIndexerQueue queue of issue ids to be updated
issueIndexerQueue Queue
issueIndexer Indexer
)

// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
Expand All @@ -72,27 +72,36 @@ func InitIssueIndexer(syncReindex bool) error {
}

if dummyQueue {
issueIndexerUpdateQueue = &DummyQueue{}
issueIndexerQueue = &DummyQueue{}
return nil
}

var err error
switch setting.Indexer.IssueIndexerQueueType {
switch setting.Indexer.IssueQueueType {
case setting.LevelQueueType:
issueIndexerUpdateQueue, err = NewLevelQueue(
issueIndexerQueue, err = NewLevelQueue(
issueIndexer,
setting.Indexer.IssueIndexerQueueDir,
setting.Indexer.IssueIndexerQueueBatchNumber)
setting.Indexer.IssueQueueDir,
setting.Indexer.IssueQueueBatchNumber)
if err != nil {
return err
}
case setting.ChannelQueueType:
issueIndexerUpdateQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber)
issueIndexerQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueQueueBatchNumber)
case setting.RedisQueueType:
addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr)
if err != nil {
return err
}
issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, issueIndexer, setting.Indexer.IssueQueueBatchNumber)
if err != nil {
return err
}
default:
return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType)
return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueQueueType)
}

go issueIndexerUpdateQueue.Run()
go issueIndexerQueue.Run()

if populate {
if syncReindex {
Expand Down Expand Up @@ -152,7 +161,7 @@ func UpdateIssueIndexer(issue *models.Issue) {
comments = append(comments, comment.Content)
}
}
issueIndexerUpdateQueue.Push(&IndexerData{
issueIndexerQueue.Push(&IndexerData{
ID: issue.ID,
RepoID: issue.RepoID,
Title: issue.Title,
Expand All @@ -174,7 +183,7 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
return
}

issueIndexerUpdateQueue.Push(&IndexerData{
issueIndexerQueue.Push(&IndexerData{
IDs: ids,
IsDelete: true,
})
Expand Down
15 changes: 10 additions & 5 deletions modules/indexer/issues/queue_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
type redisClient interface {
RPush(key string, args ...interface{}) *redis.IntCmd
LPop(key string) *redis.StringCmd
Ping() *redis.StatusCmd
}

// RedisQueue redis queue
Expand Down Expand Up @@ -75,6 +76,9 @@ func NewRedisQueue(addrs string, password string, dbIdx int, indexer Indexer, ba
Addrs: dbs,
})
}
if err := queue.client.Ping().Err(); err != nil {
return nil, err
}
return &queue, nil
}

Expand All @@ -83,8 +87,9 @@ func (r *RedisQueue) Run() error {
var datas = make([]*IndexerData, 0, r.batchNumber)
for {
bs, err := r.client.LPop(r.queueName).Bytes()
if err != nil {
log.Error(4, "LPop", err)
if err != nil && err != redis.Nil {
log.Error(4, "LPop faile: %v", err)
time.Sleep(time.Millisecond * 100)
continue
}

Expand Down Expand Up @@ -120,12 +125,12 @@ func (r *RedisQueue) Run() error {
log.Error(4, "indexer.Delete: %v", err)
}
}
time.Sleep(time.Millisecond * 10)
time.Sleep(time.Millisecond * 100)
continue
}

datas = append(datas, &data)
time.Sleep(time.Millisecond * 10)
time.Sleep(time.Millisecond * 100)
}
return nil
}
Expand All @@ -136,5 +141,5 @@ func (r *RedisQueue) Push(data *IndexerData) error {
if err != nil {
return err
}
return r.client.RPush(r.queueName, bs, 10*time.Second).Err()
return r.client.RPush(r.queueName, bs).Err()
}
38 changes: 21 additions & 17 deletions modules/setting/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,29 @@ import (
const (
LevelQueueType = "levelqueue"
ChannelQueueType = "channel"
RedisQueueType = "redis"
)

var (
// Indexer settings
Indexer = struct {
IssueType string
IssuePath string
RepoIndexerEnabled bool
RepoPath string
UpdateQueueLength int
MaxIndexerFileSize int64
IssueIndexerQueueType string
IssueIndexerQueueDir string
IssueIndexerQueueBatchNumber int
IssueType string
IssuePath string
RepoIndexerEnabled bool
RepoPath string
UpdateQueueLength int
MaxIndexerFileSize int64
IssueQueueType string
IssueQueueDir string
IssueQueueConnStr string
IssueQueueBatchNumber int
}{
IssueType: "bleve",
IssuePath: "indexers/issues.bleve",
IssueIndexerQueueType: LevelQueueType,
IssueIndexerQueueDir: "indexers/issues.queue",
IssueIndexerQueueBatchNumber: 20,
IssueType: "bleve",
IssuePath: "indexers/issues.bleve",
IssueQueueType: LevelQueueType,
IssueQueueDir: "indexers/issues.queue",
IssueQueueConnStr: "",
IssueQueueBatchNumber: 20,
}
)

Expand All @@ -50,7 +53,8 @@ func newIndexerService() {
}
Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20)
Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024)
Indexer.IssueIndexerQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType)
Indexer.IssueIndexerQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue"))
Indexer.IssueIndexerQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20)
Indexer.IssueQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType)
Indexer.IssueQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue"))
Indexer.IssueQueueConnStr = sec.Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString(path.Join(AppDataPath, ""))
Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20)
}