-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix request spelling error and add socket resource release #1165
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,13 +63,20 @@ def _create_socket_server(path): | |
Args: | ||
path (str): a file path. | ||
""" | ||
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||
path_dir = os.path.dirname(path) | ||
os.makedirs(path_dir, exist_ok=True) | ||
if os.path.exists(path): | ||
os.unlink(path) | ||
server.bind(path) | ||
server.listen(0) | ||
server = None | ||
try: | ||
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||
path_dir = os.path.dirname(path) | ||
os.makedirs(path_dir, exist_ok=True) | ||
if os.path.exists(path): | ||
os.unlink(path) | ||
server.bind(path) | ||
server.listen(0) | ||
except OSError as e: | ||
logger.error(f"An error occurred while creating the socket server: {e}") | ||
if server: | ||
server.close() | ||
raise | ||
return server | ||
|
||
|
||
|
@@ -212,13 +219,13 @@ def _sync(self): | |
|
||
@retry_socket | ||
def _request(self, request: SocketRequest): | ||
"""Create a socket client to requet the shared object.""" | ||
"""Create a socket client to request the shared object.""" | ||
client = _create_socket_client(self._socket_file) | ||
message = pickle.dumps(request) | ||
_socket_send(client, message) | ||
recv_data = _socket_recv(client) | ||
rcv_data = _socket_recv(client) | ||
client.close() | ||
response: LockAcquireResponse = pickle.loads(recv_data) | ||
response: LockAcquireResponse = pickle.loads(rcv_data) | ||
return response | ||
|
||
|
||
|
@@ -242,26 +249,35 @@ def __init__(self, name="", create=False): | |
else: | ||
self._lock = None | ||
|
||
def deal_client_msg(self, connection): | ||
try: | ||
recv_data = _socket_recv(connection) | ||
msg: SocketRequest = pickle.loads(recv_data) | ||
if msg.method == "acquire": | ||
response = LockAcquireResponse() | ||
response.acquired = self.acquire(**msg.args) | ||
elif msg.method == "locked": | ||
response = LockedResponse() | ||
response.locked = self.locked() | ||
elif msg.method == "release": | ||
self.release() | ||
response.status = SUCCESS_CODE | ||
except Exception: | ||
response = SocketResponse() | ||
response.status = ERROR_CODE | ||
send_data = pickle.dumps(response) | ||
_socket_send(connection, send_data) | ||
|
||
def _sync(self): | ||
while True: | ||
connection, _ = self._server.accept() | ||
try: | ||
recv_data = _socket_recv(connection) | ||
msg: SocketRequest = pickle.loads(recv_data) | ||
if msg.method == "acquire": | ||
response = LockAcquireResponse() | ||
response.acquired = self.acquire(**msg.args) | ||
elif msg.method == "locked": | ||
response = LockedResponse() | ||
response.locked = self.locked() | ||
elif msg.method == "release": | ||
self.release() | ||
response.status = SUCCESS_CODE | ||
except Exception: | ||
response = SocketResponse() | ||
response.status = ERROR_CODE | ||
send_data = pickle.dumps(response) | ||
_socket_send(connection, send_data) | ||
connection, _ = self._server.accept() | ||
try: | ||
self.deal_client_msg(connection) | ||
finally: | ||
connection.close() | ||
except Exception as e: | ||
logger.error(f"An error in SharedLock occurred: {e}") | ||
|
||
def acquire(self, blocking=True): | ||
""" | ||
|
@@ -363,30 +379,40 @@ def __init__(self, name="", create=False, maxsize=1): | |
else: | ||
self._queue = None | ||
|
||
def deal_client_msg(self, connection): | ||
try: | ||
rcv_data = _socket_recv(connection) | ||
msg: SocketRequest = pickle.loads(rcv_data) | ||
response = SocketResponse() | ||
if msg.method == "put": | ||
self.put(**msg.args) | ||
elif msg.method == "get": | ||
response = QueueGetResponse() | ||
response.obj = self.get(**msg.args) | ||
elif msg.method == "qsize": | ||
response = QueueSizeResponse() | ||
response.size = self.qsize() | ||
elif msg.method == "empty": | ||
response = QueueEmptyResponse() | ||
response.empty = self.empty() | ||
response.status = SUCCESS_CODE | ||
except Exception: | ||
response = SocketResponse() | ||
response.status = ERROR_CODE | ||
|
||
message = pickle.dumps(response) | ||
_socket_send(connection, message) | ||
|
||
def _sync(self): | ||
while True: | ||
connection, _ = self._server.accept() | ||
try: | ||
recv_data = _socket_recv(connection) | ||
msg: SocketRequest = pickle.loads(recv_data) | ||
response = SocketResponse() | ||
if msg.method == "put": | ||
self.put(**msg.args) | ||
elif msg.method == "get": | ||
response = QueueGetResponse() | ||
response.obj = self.get(**msg.args) | ||
elif msg.method == "qsize": | ||
response = QueueSizeResponse() | ||
response.size = self.qsize() | ||
elif msg.method == "empty": | ||
response = QueueEmptyResponse() | ||
response.empty = self.empty() | ||
response.status = SUCCESS_CODE | ||
except Exception: | ||
response = SocketResponse() | ||
response.status = ERROR_CODE | ||
message = pickle.dumps(response) | ||
_socket_send(connection, message) | ||
connection, _ = self._server.accept() | ||
try: | ||
self.deal_client_msg(connection) | ||
finally: | ||
connection.close() | ||
except Exception as e: | ||
logger.error(f"An error in SharedQueue occurred: {e}") | ||
|
||
def put(self, obj, block=True, timeout=None): | ||
"""Put an object into the queue.""" | ||
|
@@ -471,28 +497,38 @@ def __init__(self, name="", create=False): | |
name=f"shard_dict_{name}", create=self._create | ||
) | ||
|
||
def deal_client_msg(self, connection): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are 3 methods with the same name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
try: | ||
rcv_data = _socket_recv(connection) | ||
msg: SocketRequest = pickle.loads(rcv_data) | ||
response = DictMessage() | ||
if msg.method == "set": | ||
self.set(**msg.args) | ||
elif msg.method == "get": | ||
response = DictMessage() | ||
response.meta_dict = self.get(**msg.args) | ||
response.status = SUCCESS_CODE | ||
except Exception as e: | ||
response = SocketResponse() | ||
response.status = ERROR_CODE | ||
logger.error(e) | ||
finally: | ||
if not self._shared_queue.empty(): | ||
self._shared_queue.get(1) | ||
message = pickle.dumps(response) | ||
_socket_send(connection, message) | ||
|
||
def _sync(self): | ||
while True: | ||
connection, _ = self._server.accept() | ||
try: | ||
recv_data = _socket_recv(connection) | ||
msg: SocketRequest = pickle.loads(recv_data) | ||
response = DictMessage() | ||
if msg.method == "set": | ||
self.set(**msg.args) | ||
elif msg.method == "get": | ||
response = DictMessage() | ||
response.meta_dict = self.get(**msg.args) | ||
response.status = SUCCESS_CODE | ||
connection, _ = self._server.accept() | ||
try: | ||
self.deal_client_msg(connection) | ||
finally: | ||
# 确保连接被关闭 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use English annotations. |
||
connection.close() | ||
except Exception as e: | ||
response = SocketResponse() | ||
response.status = ERROR_CODE | ||
logger.error(e) | ||
finally: | ||
if not self._shared_queue.empty(): | ||
self._shared_queue.get(1) | ||
message = pickle.dumps(response) | ||
_socket_send(connection, message) | ||
logger.error(f"An error in SharedDict occurred: {e}") | ||
|
||
def set(self, new_dict): | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deal_client_msg
->_deal_client_msg
which is a private method.