Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
6848f7dc6a
|
|||
|
e915c38d5f
|
|||
|
5443b5769f
|
@@ -60,7 +60,6 @@ class AIO(asyncio.Protocol):
|
||||
if not AIO.queue:
|
||||
raise RuntimeError("queue not set")
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.tmpfile = None
|
||||
|
||||
def _send_response(self, response):
|
||||
response = response.encode() + AIO.separator
|
||||
@@ -75,7 +74,9 @@ class AIO(asyncio.Protocol):
|
||||
self.transport = transport
|
||||
self.request_time = str(time.time())
|
||||
self.buffer = bytearray()
|
||||
self.data = bytearray()
|
||||
self.tmpfile = None
|
||||
self.fh = None
|
||||
self.fsize = 0
|
||||
self.command = None
|
||||
self.length = None
|
||||
self.all_chunks = False
|
||||
@@ -111,24 +112,33 @@ class AIO(asyncio.Protocol):
|
||||
self.length = struct.unpack(">I", self.buffer[0:4])[0]
|
||||
self.buffer = self.buffer[4:]
|
||||
if self.length == 0:
|
||||
self.logger.debug("{} got all chunks".format(self.peer))
|
||||
self.all_chunks = True
|
||||
tmpfile = os.path.join(AIO.config["tmpdir"], "uvscan_{}_{}".format(self.request_time, str(self.peer[1])))
|
||||
self.logger.debug("{} got last chunk, save data to {}".format(self.peer, tmpfile))
|
||||
with open(tmpfile, "wb") as f:
|
||||
self.tmpfile = tmpfile
|
||||
f.write(self.data)
|
||||
AIO.queue.put_nowait((AIO.config["uvscan_path"], tmpfile, self.process_uvscan_result))
|
||||
self.logger.info("{} queued uvscan of {}, queue size is {}".format(self.peer, tmpfile, AIO.queue.qsize()))
|
||||
self.fh.close()
|
||||
self.fh = None
|
||||
AIO.queue.put_nowait((AIO.config["uvscan_path"], self.tmpfile, self.process_uvscan_result))
|
||||
self.logger.info("{} queued uvscan of {}, queue size is {}".format(self.peer, self.tmpfile, AIO.queue.qsize()))
|
||||
break
|
||||
self.logger.debug("{} got chunk size of {} bytes".format(self.peer, self.length))
|
||||
self.logger.debug("{} chunk size is {} bytes".format(self.peer, self.length))
|
||||
else:
|
||||
if len(self.buffer) < self.length:
|
||||
self.logger.debug("{} got {} of {} bytes".format(self.peer, len(self.buffer), self.length))
|
||||
if len(self.buffer) == 0:
|
||||
break
|
||||
self.logger.debug("{} chunk complete ({} bytes)".format(self.peer, self.length))
|
||||
self.data.extend(self.buffer[0:self.length])
|
||||
self.buffer = self.buffer[self.length:]
|
||||
self.length = None
|
||||
if not self.fh:
|
||||
tmpfile = os.path.join(AIO.config["tmpdir"], "uvscan_{}_{}".format(self.request_time, str(self.peer[1])))
|
||||
self.logger.debug("{} saving data to {}".format(self.peer, tmpfile))
|
||||
self.fh = open(tmpfile, "wb")
|
||||
self.tmpfile = tmpfile
|
||||
left = self.length - self.fsize
|
||||
data = self.buffer[0:left]
|
||||
self.fh.write(data)
|
||||
self.buffer = self.buffer[len(data):]
|
||||
self.fsize += len(data)
|
||||
if self.fsize < self.length:
|
||||
self.logger.debug("{} got {} of {} bytes".format(self.peer, self.fsize, self.length))
|
||||
else:
|
||||
self.logger.debug("{} chunk complete ({} bytes)".format(self.peer, self.length))
|
||||
self.length = None
|
||||
self.fsize = 0
|
||||
|
||||
except (RuntimeError, IndexError, IOError, struct.error) as e:
|
||||
self.logger.warning("{} warning: {}".format(self.peer, e))
|
||||
@@ -155,6 +165,8 @@ class AIO(asyncio.Protocol):
|
||||
for entry in entries:
|
||||
AIO.queue.put_nowait(entry)
|
||||
self.logger.debug("{} removing temporary file {}".format(self.peer, self.tmpfile))
|
||||
if self.fh:
|
||||
self.fh.close()
|
||||
os.remove(self.tmpfile)
|
||||
self.logger.info("closed connection to {}".format(self.peer))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user