tackle the problem of memory leak
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
from select import select
|
||||
from communication.world_parser import WorldParser
|
||||
|
||||
@@ -10,15 +12,32 @@ class Server:
|
||||
def __init__(self, host: str, port: int, world_parser: WorldParser):
|
||||
self.world_parser: WorldParser = world_parser
|
||||
self.__host: str = host
|
||||
self.__port: str = port
|
||||
self.__socket: socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
self.__port: int = port
|
||||
self.__socket: socket.socket = self._create_socket()
|
||||
self.__send_buff = []
|
||||
self.__rcv_buffer_size = 1024
|
||||
self.__rcv_buffer_default_size = 1024
|
||||
self.__max_msg_size = 1048576
|
||||
self.__shrink_threshold = 8192
|
||||
self.__shrink_after_msgs = 200
|
||||
self.__small_msg_streak = 0
|
||||
self.__rcv_buffer = bytearray(self.__rcv_buffer_size)
|
||||
|
||||
def _create_socket(self) -> socket.socket:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
return sock
|
||||
|
||||
def connect(self) -> None:
|
||||
logger.info("Connecting to server at %s:%d...", self.__host, self.__port)
|
||||
|
||||
# Always reconnect with a fresh socket object.
|
||||
try:
|
||||
self.__socket.close()
|
||||
except OSError:
|
||||
pass
|
||||
self.__socket = self._create_socket()
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.__socket.connect((self.__host, self.__port))
|
||||
@@ -27,12 +46,19 @@ class Server:
|
||||
logger.error(
|
||||
"Connection refused. Make sure the server is running and listening on {self.__host}:{self.__port}."
|
||||
)
|
||||
time.sleep(0.05)
|
||||
|
||||
logger.info(f"Server connection established to {self.__host}:{self.__port}.")
|
||||
|
||||
def shutdown(self) -> None:
|
||||
self.__socket.close()
|
||||
self.__socket.shutdown(socket.SHUT_RDWR)
|
||||
try:
|
||||
self.__socket.shutdown(socket.SHUT_RDWR)
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
self.__socket.close()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def send_immediate(self, msg: str) -> None:
|
||||
"""
|
||||
@@ -72,37 +98,50 @@ class Server:
|
||||
self.commit(msg)
|
||||
self.send()
|
||||
|
||||
def receive(self) -> None:
|
||||
"""
|
||||
Receive the next message from the TCP/IP socket and updates world
|
||||
"""
|
||||
def receive(self):
|
||||
|
||||
# Receive message length information
|
||||
if (
|
||||
self.__socket.recv_into(
|
||||
self.__rcv_buffer, nbytes=4, flags=socket.MSG_WAITALL
|
||||
while True:
|
||||
|
||||
if (
|
||||
self.__socket.recv_into(
|
||||
self.__rcv_buffer, nbytes=4, flags=socket.MSG_WAITALL
|
||||
) != 4
|
||||
):
|
||||
raise ConnectionResetError
|
||||
|
||||
msg_size = int.from_bytes(self.__rcv_buffer[:4], byteorder="big", signed=False)
|
||||
|
||||
# Guard against corrupted frame lengths that would trigger huge allocations.
|
||||
if msg_size <= 0 or msg_size > self.__max_msg_size:
|
||||
raise ConnectionResetError
|
||||
|
||||
if msg_size > self.__rcv_buffer_size:
|
||||
self.__rcv_buffer_size = msg_size
|
||||
self.__rcv_buffer = bytearray(self.__rcv_buffer_size)
|
||||
|
||||
if (
|
||||
self.__socket.recv_into(
|
||||
self.__rcv_buffer, nbytes=msg_size, flags=socket.MSG_WAITALL
|
||||
) != msg_size
|
||||
):
|
||||
raise ConnectionResetError
|
||||
|
||||
self.world_parser.parse(
|
||||
message=self.__rcv_buffer[:msg_size].decode()
|
||||
)
|
||||
!= 4
|
||||
):
|
||||
raise ConnectionResetError
|
||||
|
||||
msg_size = int.from_bytes(self.__rcv_buffer[:4], byteorder="big", signed=False)
|
||||
if msg_size <= self.__shrink_threshold and self.__rcv_buffer_size > self.__rcv_buffer_default_size:
|
||||
self.__small_msg_streak += 1
|
||||
if self.__small_msg_streak >= self.__shrink_after_msgs:
|
||||
self.__rcv_buffer_size = self.__rcv_buffer_default_size
|
||||
self.__rcv_buffer = bytearray(self.__rcv_buffer_size)
|
||||
self.__small_msg_streak = 0
|
||||
else:
|
||||
self.__small_msg_streak = 0
|
||||
|
||||
# Ensure receive buffer is large enough to hold the message
|
||||
if msg_size > self.__rcv_buffer_size:
|
||||
self.__rcv_buffer_size = msg_size
|
||||
self.__rcv_buffer = bytearray(self.__rcv_buffer_size)
|
||||
|
||||
# Receive message with the specified length
|
||||
if (
|
||||
self.__socket.recv_into(
|
||||
self.__rcv_buffer, nbytes=msg_size, flags=socket.MSG_WAITALL
|
||||
)
|
||||
!= msg_size
|
||||
):
|
||||
raise ConnectionResetError
|
||||
|
||||
self.world_parser.parse(message=self.__rcv_buffer[:msg_size].decode())
|
||||
# 如果socket没有更多数据就退出
|
||||
if len(select([self.__socket], [], [], 0.0)[0]) == 0:
|
||||
break
|
||||
|
||||
def commit_beam(self, pos2d: list, rotation: float) -> None:
|
||||
assert len(pos2d) == 2
|
||||
|
||||
Reference in New Issue
Block a user