-
Notifications
You must be signed in to change notification settings - Fork 0
/
users.py
124 lines (98 loc) · 3.44 KB
/
users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project :k8s-python
@File :users.py
@Author :septemberhx
@Date :2022/3/19
@Description:
"""
import datetime
import json
import os.path
import random
import threading
from time import sleep
from typing import List
from flask import Flask, request
import requests
from logger import get_logger
logger = get_logger('users')
app = Flask(__name__)
DATA_DIR = './data/phy_data'
def read_users(user_data_path):
with open(user_data_path) as f:
users = json.load(f)
return users
def request(user_id, func_obj):
# todo: 同步请求,而非异步!
gateway = 'http:https://localhost:8081/gateway/request'
data = {
'svcId': f'service{func_obj["svcIndex"]}',
'patternUrl': func_obj['patternUrl'],
'userId': user_id,
'timestamp': datetime.datetime.now().timestamp() * 1000,
'callbackUrl': 'http:https://localhost:8082/callback',
'params': {
'status': 'Success',
'valueMap': {}
}
}
t1 = datetime.datetime.now().timestamp()
logger.debug(f'==> {user_id}|{data["svcId"]}|{data["patternUrl"]}|{t1}')
response = requests.post(gateway, json=json.dumps(data))
t2 = datetime.datetime.now().timestamp()
logger.debug(f'<== {user_id}|{data["svcId"]}|{data["patternUrl"]}|{t2}')
if response.status_code == 200:
status = response.json()['status']
logger.debug(f'{user_id}|{data["svcId"]}|{data["patternUrl"]}|{status}|{t2 - t1}')
else:
logger.debug(f'{user_id}|{data["svcId"]}|{data["patternUrl"]}|Fail|{t2 - t1}')
class MyThread(threading.Thread):
def __init__(self, thread_id, name, delay, user_id, user_data, func_map):
threading.Thread.__init__(self)
self.threadID = thread_id
self.name = name
self.delay = delay
self.user_id = user_id
self.user_data = user_data
self.func_map = func_map
self._running = True
def run(self):
print("开始模拟:" + self.name)
self.simulate_user()
print("退出模拟:" + self.name)
def simulate_user(self):
i = 1
func_i = 0
while self._running:
func_i = func_i % len(self.func_list)
request(f'{self.user_id}_{i}', self.func_map[self.func_list[func_i]])
sleep(random.randint(0, 5))
def stop(self):
self._running = False
thread_list = [] # type: List[MyThread]
def start_simulate(users, func_map):
for user_id in users:
thread = MyThread(user_id, f'Thread-{user_id}', 0, user_id, users[user_id], func_map)
thread.start()
thread_list.append(thread)
@app.route('/simulate/start', methods=['POST'])
def start():
if request.method == 'POST':
data_json = json.loads(request.data.decode('utf-8'))
data_index = data_json['index']
group = data_json['group']
user_data_path = os.path.join(DATA_DIR, group, f'users_{data_index}.json')
users = read_users(user_data_path)
with open(os.path.join(DATA_DIR, 'share', 'func_objs.json'), 'r') as f:
func_objs = json.load(f, object_hook=lambda d: {int(k) if k.lstrip('-').isdigit() else k: v for k, v in
d.items()})
start_simulate(users, func_objs)
@app.route('/simulate/start', methods=['POST'])
def end():
for thread in thread_list:
thread.stop()
thread_list.clear()
if __name__ == '__main__':
app.run('0.0.0.0', port=8082)