Skip to content
This repository has been archived by the owner on Jan 4, 2023. It is now read-only.

Commit

Permalink
fix user traffic bug
Browse files Browse the repository at this point in the history
  • Loading branch information
hossinasaadi committed Nov 19, 2022
1 parent 7fc1597 commit 603f061
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 77 deletions.
18 changes: 5 additions & 13 deletions web/job/xray_traffic_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,7 @@ func (j *XrayTrafficJob) Run() {
return
}

// get Client Traffic

clientTraffics, err := j.xrayService.GetXrayClientTraffic()
if err != nil {
logger.Warning("get xray client traffic failed:", err)
return
}
err = j.inboundService.AddClientTraffic(clientTraffics)
if err != nil {
logger.Warning("add client traffic failed:", err)
}

traffics, err := j.xrayService.GetXrayTraffic()
traffics, clientTraffics, err := j.xrayService.GetXrayTraffic()
if err != nil {
logger.Warning("get xray traffic failed:", err)
return
Expand All @@ -41,6 +29,10 @@ func (j *XrayTrafficJob) Run() {
logger.Warning("add traffic failed:", err)
}

err = j.inboundService.AddClientTraffic(clientTraffics)
if err != nil {
logger.Warning("add client traffic failed:", err)
}


}
2 changes: 1 addition & 1 deletion web/service/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *InboundService) AddClientTraffic(traffics []*xray.ClientTraffic) (err e
err := db.Model(model.Inbound{}).Where("settings like ?", "%" + traffic.Email + "%").First(inbound).Error
traffic.InboundId = inbound.Id
if err != nil {
logger.Warning("AddClientTraffic find model ", err)
logger.Warning("AddClientTraffic find model ", err, traffic.Email)
continue
}
// get settings clients
Expand Down
10 changes: 2 additions & 8 deletions web/service/xray.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,12 @@ func (s *XrayService) GetXrayConfig() (*xray.Config, error) {
return xrayConfig, nil
}

func (s *XrayService) GetXrayTraffic() ([]*xray.Traffic, error) {
func (s *XrayService) GetXrayTraffic() ([]*xray.Traffic, []*xray.ClientTraffic, error) {
if !s.IsXrayRunning() {
return nil, errors.New("xray is not running")
return nil, nil, errors.New("xray is not running")
}
return p.GetTraffic(true)
}
func (s *XrayService) GetXrayClientTraffic() ([]*xray.ClientTraffic, error) {
if !s.IsXrayRunning() {
return nil, errors.New("xray is not running")
}
return p.GetClientTraffic(false)
}

func (s *XrayService) RestartXray(isForce bool) error {
lock.Lock()
Expand Down
90 changes: 35 additions & 55 deletions xray/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,13 @@ func (p *process) Stop() error {
return p.cmd.Process.Kill()
}

func (p *process) GetTraffic(reset bool) ([]*Traffic, error) {
func (p *process) GetTraffic(reset bool) ([]*Traffic, []*ClientTraffic, error) {
if p.apiPort == 0 {
return nil, common.NewError("xray api port wrong:", p.apiPort)
return nil, nil, common.NewError("xray api port wrong:", p.apiPort)
}
conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%v", p.apiPort), grpc.WithInsecure())
if err != nil {
return nil, err
return nil, nil, err
}
defer conn.Close()

Expand All @@ -248,13 +248,43 @@ func (p *process) GetTraffic(reset bool) ([]*Traffic, error) {
}
resp, err := client.QueryStats(ctx, request)
if err != nil {
return nil, err
return nil, nil, err
}
tagTrafficMap := map[string]*Traffic{}
emailTrafficMap := map[string]*ClientTraffic{}

clientTraffics := make([]*ClientTraffic, 0)
traffics := make([]*Traffic, 0)
for _, stat := range resp.GetStat() {
matchs := trafficRegex.FindStringSubmatch(stat.Name)
if len(matchs) < 3 {

matchs := ClientTrafficRegex.FindStringSubmatch(stat.Name)
if len(matchs) < 3 {
continue
}else {

isUser := matchs[1] == "user"
email := matchs[2]
isDown := matchs[3] == "downlink"
if ! isUser {
continue
}
traffic, ok := emailTrafficMap[email]
if !ok {
traffic = &ClientTraffic{
Email: email,
}
emailTrafficMap[email] = traffic
clientTraffics = append(clientTraffics, traffic)
}
if isDown {
traffic.Down = stat.Value
} else {
traffic.Up = stat.Value
}

}
continue
}
isInbound := matchs[1] == "inbound"
Expand All @@ -279,55 +309,5 @@ func (p *process) GetTraffic(reset bool) ([]*Traffic, error) {
}
}

return traffics, nil
}
func (p *process) GetClientTraffic(reset bool) ([]*ClientTraffic, error) {
if p.apiPort == 0 {
return nil, common.NewError("xray api port wrong:", p.apiPort)
}
conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%v", p.apiPort), grpc.WithInsecure())
if err != nil {
return nil, err
}
defer conn.Close()

client := statsservice.NewStatsServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
request := &statsservice.QueryStatsRequest{
Reset_: reset,
}
resp, err := client.QueryStats(ctx, request)
if err != nil {
return nil, err
}
emailTrafficMap := map[string]*ClientTraffic{}
traffics := make([]*ClientTraffic, 0)
for _, stat := range resp.GetStat() {
matchs := ClientTrafficRegex.FindStringSubmatch(stat.Name)
if len(matchs) < 3 {
continue
}
isUser := matchs[1] == "user"
email := matchs[2]
isDown := matchs[3] == "downlink"
if ! isUser {
continue
}
traffic, ok := emailTrafficMap[email]
if !ok {
traffic = &ClientTraffic{
Email: email,
}
emailTrafficMap[email] = traffic
traffics = append(traffics, traffic)
}
if isDown {
traffic.Down = stat.Value
} else {
traffic.Up = stat.Value
}
}

return traffics, nil
return traffics, clientTraffics, nil
}

0 comments on commit 603f061

Please sign in to comment.