3 Commits

Author SHA1 Message Date
6848f7dc6a Another small fix 2020-03-09 20:59:36 +01:00
e915c38d5f Another memory optimization 2020-03-09 20:49:38 +01:00
5443b5769f Optimize memory consumption 2020-03-09 18:10:35 +01:00

View File

@@ -60,7 +60,6 @@ class AIO(asyncio.Protocol):
if not AIO.queue:
raise RuntimeError("queue not set")
self.logger = logging.getLogger(__name__)
self.tmpfile = None
def _send_response(self, response):
response = response.encode() + AIO.separator
@@ -75,7 +74,9 @@ class AIO(asyncio.Protocol):
self.transport = transport
self.request_time = str(time.time())
self.buffer = bytearray()
self.data = bytearray()
self.tmpfile = None
self.fh = None
self.fsize = 0
self.command = None
self.length = None
self.all_chunks = False
@@ -111,24 +112,33 @@ class AIO(asyncio.Protocol):
self.length = struct.unpack(">I", self.buffer[0:4])[0]
self.buffer = self.buffer[4:]
if self.length == 0:
self.logger.debug("{} got all chunks".format(self.peer))
self.all_chunks = True
tmpfile = os.path.join(AIO.config["tmpdir"], "uvscan_{}_{}".format(self.request_time, str(self.peer[1])))
self.logger.debug("{} got last chunk, save data to {}".format(self.peer, tmpfile))
with open(tmpfile, "wb") as f:
self.tmpfile = tmpfile
f.write(self.data)
AIO.queue.put_nowait((AIO.config["uvscan_path"], tmpfile, self.process_uvscan_result))
self.logger.info("{} queued uvscan of {}, queue size is {}".format(self.peer, tmpfile, AIO.queue.qsize()))
self.fh.close()
self.fh = None
AIO.queue.put_nowait((AIO.config["uvscan_path"], self.tmpfile, self.process_uvscan_result))
self.logger.info("{} queued uvscan of {}, queue size is {}".format(self.peer, self.tmpfile, AIO.queue.qsize()))
break
self.logger.debug("{} got chunk size of {} bytes".format(self.peer, self.length))
self.logger.debug("{} chunk size is {} bytes".format(self.peer, self.length))
else:
if len(self.buffer) < self.length:
self.logger.debug("{} got {} of {} bytes".format(self.peer, len(self.buffer), self.length))
if len(self.buffer) == 0:
break
self.logger.debug("{} chunk complete ({} bytes)".format(self.peer, self.length))
self.data.extend(self.buffer[0:self.length])
self.buffer = self.buffer[self.length:]
self.length = None
if not self.fh:
tmpfile = os.path.join(AIO.config["tmpdir"], "uvscan_{}_{}".format(self.request_time, str(self.peer[1])))
self.logger.debug("{} saving data to {}".format(self.peer, tmpfile))
self.fh = open(tmpfile, "wb")
self.tmpfile = tmpfile
left = self.length - self.fsize
data = self.buffer[0:left]
self.fh.write(data)
self.buffer = self.buffer[len(data):]
self.fsize += len(data)
if self.fsize < self.length:
self.logger.debug("{} got {} of {} bytes".format(self.peer, self.fsize, self.length))
else:
self.logger.debug("{} chunk complete ({} bytes)".format(self.peer, self.length))
self.length = None
self.fsize = 0
except (RuntimeError, IndexError, IOError, struct.error) as e:
self.logger.warning("{} warning: {}".format(self.peer, e))
@@ -155,6 +165,8 @@ class AIO(asyncio.Protocol):
for entry in entries:
AIO.queue.put_nowait(entry)
self.logger.debug("{} removing temporary file {}".format(self.peer, self.tmpfile))
if self.fh:
self.fh.close()
os.remove(self.tmpfile)
self.logger.info("closed connection to {}".format(self.peer))