From e915c38d5f64391d3bb282efaf576aa2741b2776 Mon Sep 17 00:00:00 2001 From: Thomas Oettli Date: Mon, 9 Mar 2020 20:49:38 +0100 Subject: [PATCH] Another memory optimization --- uvscand/__init__.py | 44 +++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/uvscand/__init__.py b/uvscand/__init__.py index 30d0342..ff3a930 100644 --- a/uvscand/__init__.py +++ b/uvscand/__init__.py @@ -60,7 +60,6 @@ class AIO(asyncio.Protocol): if not AIO.queue: raise RuntimeError("queue not set") self.logger = logging.getLogger(__name__) - self.tmpfile = None def _send_response(self, response): response = response.encode() + AIO.separator @@ -75,7 +74,9 @@ class AIO(asyncio.Protocol): self.transport = transport self.request_time = str(time.time()) self.buffer = bytearray() - self.data = bytearray() + self.tmpfile = None + self.fh = None + self.fsize = 0 self.command = None self.length = None self.all_chunks = False @@ -111,25 +112,32 @@ class AIO(asyncio.Protocol): self.length = struct.unpack(">I", self.buffer[0:4])[0] self.buffer = self.buffer[4:] if self.length == 0: + self.logger.debug("{} got all chunks".format(self.peer)) self.all_chunks = True - tmpfile = os.path.join(AIO.config["tmpdir"], "uvscan_{}_{}".format(self.request_time, str(self.peer[1]))) - self.logger.debug("{} got last chunk, save data to {}".format(self.peer, tmpfile)) - with open(tmpfile, "wb") as f: - self.tmpfile = tmpfile - f.write(self.data) - self.data = bytearray() - AIO.queue.put_nowait((AIO.config["uvscan_path"], tmpfile, self.process_uvscan_result)) - self.logger.info("{} queued uvscan of {}, queue size is {}".format(self.peer, tmpfile, AIO.queue.qsize())) + self.fh.close() + self.fh = None + AIO.queue.put_nowait((AIO.config["uvscan_path"], self.tmpfile, self.process_uvscan_result)) + self.logger.info("{} queued uvscan of {}, queue size is {}".format(self.peer, self.tmpfile, AIO.queue.qsize())) break - self.logger.debug("{} got chunk size of {} bytes".format(self.peer, self.length)) + self.logger.debug("{} chunk size is {} bytes".format(self.peer, self.length)) else: - if len(self.buffer) < self.length: - self.logger.debug("{} got {} of {} bytes".format(self.peer, len(self.buffer), self.length)) + if len(self.buffer) == 0: break - self.logger.debug("{} chunk complete ({} bytes)".format(self.peer, self.length)) - self.data.extend(self.buffer[0:self.length]) - self.buffer = self.buffer[self.length:] - self.length = None + if not self.fh: + tmpfile = os.path.join(AIO.config["tmpdir"], "uvscan_{}_{}".format(self.request_time, str(self.peer[1]))) + self.logger.debug("{} saving data to {}".format(self.peer, tmpfile)) + self.fh = open(tmpfile, "wb") + self.tmpfile = tmpfile + left = self.length - self.fsize + data = self.buffer[0:left] + self.fh.write(data) + self.buffer = self.buffer[len(data):] + self.fsize += len(data) + if self.fsize < self.length: + self.logger.debug("{} got {} of {} bytes".format(self.peer, self.fsize, self.length)) + else: + self.logger.debug("{} chunk complete ({} bytes)".format(self.peer, self.length)) + self.length = None except (RuntimeError, IndexError, IOError, struct.error) as e: self.logger.warning("{} warning: {}".format(self.peer, e)) @@ -156,6 +164,8 @@ class AIO(asyncio.Protocol): for entry in entries: AIO.queue.put_nowait(entry) self.logger.debug("{} removing temporary file {}".format(self.peer, self.tmpfile)) + if self.fh: + self.fh.close() os.remove(self.tmpfile) self.logger.info("closed connection to {}".format(self.peer))