-
Notifications
You must be signed in to change notification settings - Fork 0
/
users.py
138 lines (111 loc) · 4.03 KB
/
users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project :k8s-python
@File :users.py
@Author :septemberhx
@Date :2022/3/19
@Description:
"""
import datetime
import json
import os.path
import random
import threading
from time import sleep
from typing import List
from flask import Flask, request
import requests
from logger import get_logger
logger = get_logger('users')
app = Flask(__name__)
DATA_DIR = './data'
def read_users(user_data_path):
with open(user_data_path) as f:
users = json.load(f, object_hook=lambda d: {int(k) if k.lstrip('-').isdigit() else k: v for k, v in d.items()})
return users
def send_request(user_id, func_obj):
# todo: 同步请求,而非异步!
gateway = 'http:https://localhost:8081/gateway/request'
data = {
'svcId': f'Service{func_obj["svcIndex"]}',
'patternUrl': func_obj['patternUrl'],
'userId': user_id,
'timestamp': int(datetime.datetime.now().timestamp() * 1000),
'callbackUrl': 'http:https://localhost:8082/callback',
'params': {
'status': 'Success',
'valueMap': {
'first': func_obj['index']
}
}
}
t1 = datetime.datetime.now().timestamp() * 1000 # ms
logger.debug(f'==> {user_id}|{data["svcId"]}|{data["patternUrl"]}|{t1}|{threading.currentThread().name}')
response = requests.post(gateway, json=data)
t2 = datetime.datetime.now().timestamp() * 1000 # ms
logger.debug(f'<== {user_id}|{data["svcId"]}|{data["patternUrl"]}|{t2}|{threading.currentThread().name}')
if (data["patternUrl"] == '/Service11/29'):
logger.debug(response)
if response.status_code == 200:
if 'status' in response.json():
status = response.json()['status']
logger.debug(f'{user_id}|{data["svcId"]}|{data["patternUrl"]}|{status}|{t2 - t1}')
else:
logger.debug(response.json())
else:
logger.debug(f'{user_id}|{data["svcId"]}|{data["patternUrl"]}|Fail|{t2 - t1}')
class MyThread(threading.Thread):
def __init__(self, thread_id, name, delay, user_id, user_data, func_objs):
threading.Thread.__init__(self)
self.threadID = thread_id
self.name = name
self.delay = delay
self.user_id = user_id
self.user_data = user_data
self.func_objs = func_objs
self._running = True
def run(self):
print("开始模拟:" + self.name)
self.simulate_user()
print("退出模拟:" + self.name)
def simulate_user(self):
i = 1
func_i = 0
while self._running:
func_i = func_i % len(self.user_data)
print(self.user_data)
send_request(f'{self.user_id}_{i}', self.func_objs[self.user_data[func_i]])
func_i += 1
sleep(random.randint(0, 5))
def stop(self):
self._running = False
thread_list = [] # type: List[MyThread]
def start_simulate(users, func_objs):
for user_id in users:
thread = MyThread(user_id, f'Thread-{user_id}', 0, user_id, users[user_id], func_objs)
thread.start()
thread_list.append(thread)
@app.route('/simulate/start', methods=['POST'])
def start():
if request.method == 'POST':
data_json = json.loads(request.data.decode('utf-8'))
data_index = data_json['index']
group = data_json['group']
node = data_json['node']
user_data_path = os.path.join(DATA_DIR, 'phy_data', f'group0{group}', f'users_{data_index}.json')
users = read_users(user_data_path)
with open(os.path.join(DATA_DIR, 'share', 'func_objs.json'), 'r') as f:
func_objs = json.load(f, object_hook=lambda d: {int(k) if k.lstrip('-').isdigit() else k: v for k, v in
d.items()})
print(users)
start_simulate(users[node], func_objs)
return 'ok'
@app.route('/simulate/stop', methods=['POST'])
def end():
for thread in thread_list:
thread.stop()
thread_list.clear()
return 'ok'
if __name__ == '__main__':
app.run('0.0.0.0', port=8082)