Another memory optimization
This commit is contained in:
@@ -60,7 +60,6 @@ class AIO(asyncio.Protocol):
|
|||||||
if not AIO.queue:
|
if not AIO.queue:
|
||||||
raise RuntimeError("queue not set")
|
raise RuntimeError("queue not set")
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.tmpfile = None
|
|
||||||
|
|
||||||
def _send_response(self, response):
|
def _send_response(self, response):
|
||||||
response = response.encode() + AIO.separator
|
response = response.encode() + AIO.separator
|
||||||
@@ -75,7 +74,9 @@ class AIO(asyncio.Protocol):
|
|||||||
self.transport = transport
|
self.transport = transport
|
||||||
self.request_time = str(time.time())
|
self.request_time = str(time.time())
|
||||||
self.buffer = bytearray()
|
self.buffer = bytearray()
|
||||||
self.data = bytearray()
|
self.tmpfile = None
|
||||||
|
self.fh = None
|
||||||
|
self.fsize = 0
|
||||||
self.command = None
|
self.command = None
|
||||||
self.length = None
|
self.length = None
|
||||||
self.all_chunks = False
|
self.all_chunks = False
|
||||||
@@ -111,25 +112,32 @@ class AIO(asyncio.Protocol):
|
|||||||
self.length = struct.unpack(">I", self.buffer[0:4])[0]
|
self.length = struct.unpack(">I", self.buffer[0:4])[0]
|
||||||
self.buffer = self.buffer[4:]
|
self.buffer = self.buffer[4:]
|
||||||
if self.length == 0:
|
if self.length == 0:
|
||||||
|
self.logger.debug("{} got all chunks".format(self.peer))
|
||||||
self.all_chunks = True
|
self.all_chunks = True
|
||||||
tmpfile = os.path.join(AIO.config["tmpdir"], "uvscan_{}_{}".format(self.request_time, str(self.peer[1])))
|
self.fh.close()
|
||||||
self.logger.debug("{} got last chunk, save data to {}".format(self.peer, tmpfile))
|
self.fh = None
|
||||||
with open(tmpfile, "wb") as f:
|
AIO.queue.put_nowait((AIO.config["uvscan_path"], self.tmpfile, self.process_uvscan_result))
|
||||||
self.tmpfile = tmpfile
|
self.logger.info("{} queued uvscan of {}, queue size is {}".format(self.peer, self.tmpfile, AIO.queue.qsize()))
|
||||||
f.write(self.data)
|
|
||||||
self.data = bytearray()
|
|
||||||
AIO.queue.put_nowait((AIO.config["uvscan_path"], tmpfile, self.process_uvscan_result))
|
|
||||||
self.logger.info("{} queued uvscan of {}, queue size is {}".format(self.peer, tmpfile, AIO.queue.qsize()))
|
|
||||||
break
|
break
|
||||||
self.logger.debug("{} got chunk size of {} bytes".format(self.peer, self.length))
|
self.logger.debug("{} chunk size is {} bytes".format(self.peer, self.length))
|
||||||
else:
|
else:
|
||||||
if len(self.buffer) < self.length:
|
if len(self.buffer) == 0:
|
||||||
self.logger.debug("{} got {} of {} bytes".format(self.peer, len(self.buffer), self.length))
|
|
||||||
break
|
break
|
||||||
self.logger.debug("{} chunk complete ({} bytes)".format(self.peer, self.length))
|
if not self.fh:
|
||||||
self.data.extend(self.buffer[0:self.length])
|
tmpfile = os.path.join(AIO.config["tmpdir"], "uvscan_{}_{}".format(self.request_time, str(self.peer[1])))
|
||||||
self.buffer = self.buffer[self.length:]
|
self.logger.debug("{} saving data to {}".format(self.peer, tmpfile))
|
||||||
self.length = None
|
self.fh = open(tmpfile, "wb")
|
||||||
|
self.tmpfile = tmpfile
|
||||||
|
left = self.length - self.fsize
|
||||||
|
data = self.buffer[0:left]
|
||||||
|
self.fh.write(data)
|
||||||
|
self.buffer = self.buffer[len(data):]
|
||||||
|
self.fsize += len(data)
|
||||||
|
if self.fsize < self.length:
|
||||||
|
self.logger.debug("{} got {} of {} bytes".format(self.peer, self.fsize, self.length))
|
||||||
|
else:
|
||||||
|
self.logger.debug("{} chunk complete ({} bytes)".format(self.peer, self.length))
|
||||||
|
self.length = None
|
||||||
|
|
||||||
except (RuntimeError, IndexError, IOError, struct.error) as e:
|
except (RuntimeError, IndexError, IOError, struct.error) as e:
|
||||||
self.logger.warning("{} warning: {}".format(self.peer, e))
|
self.logger.warning("{} warning: {}".format(self.peer, e))
|
||||||
@@ -156,6 +164,8 @@ class AIO(asyncio.Protocol):
|
|||||||
for entry in entries:
|
for entry in entries:
|
||||||
AIO.queue.put_nowait(entry)
|
AIO.queue.put_nowait(entry)
|
||||||
self.logger.debug("{} removing temporary file {}".format(self.peer, self.tmpfile))
|
self.logger.debug("{} removing temporary file {}".format(self.peer, self.tmpfile))
|
||||||
|
if self.fh:
|
||||||
|
self.fh.close()
|
||||||
os.remove(self.tmpfile)
|
os.remove(self.tmpfile)
|
||||||
self.logger.info("closed connection to {}".format(self.peer))
|
self.logger.info("closed connection to {}".format(self.peer))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user