From 8f8e075541db665c496040bb141dd789b242fc1a Mon Sep 17 00:00:00 2001 From: Thomas Oettli Date: Fri, 31 Jan 2020 11:45:17 +0100 Subject: [PATCH] Limit number of parallel scans --- uvscand/__init__.py | 51 +++++++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/uvscand/__init__.py b/uvscand/__init__.py index 0ca1bbf..32f75e7 100644 --- a/uvscand/__init__.py +++ b/uvscand/__init__.py @@ -31,25 +31,34 @@ from subprocess import Popen, PIPE uvscan_regex = re.compile(r"Found:?(?: the| potentially unwanted program| (?:virus|trojan) or variant)? (.+?)(?:\.| (?:virus |trojan )?)", re.MULTILINE) -async def run(uvscan, filename): - proc = await asyncio.create_subprocess_exec(uvscan, "--secure", "--mime", "--noboot", "--panalyse", "--manalyse", filename, stdout=asyncio.subprocess.PIPE) - stdout, _ = await proc.communicate() - if proc.returncode == 13: - match = uvscan_regex.search(stdout.decode()) - name = match.group(1) if match else "UNKNOWN" - result = "stream: {} FOUND".format(name) - else: - result = "stream: OK" - return result +async def uvscan_worker(queue): + while True: + job = await queue.get() + if job is None: + await queue.put(None) + break + uvscan, filename, cb = job + proc = await asyncio.create_subprocess_exec(uvscan, "--secure", "--mime", "--noboot", "--panalyse", "--manalyse", filename, stdout=asyncio.subprocess.PIPE) + stdout, _ = await proc.communicate() + if proc.returncode == 13: + match = uvscan_regex.search(stdout.decode()) + name = match.group(1) if match else "UNKNOWN" + result = "stream: {} FOUND".format(name) + else: + result = "stream: OK" + cb(result) class AIO(asyncio.Protocol): config = None + queue = asyncio.Queue() separator = b"\x00" def __init__(self): if not AIO.config: raise RuntimeError("configuration not set") + if not AIO.queue: + raise RuntimeError("queue not set") self.logger = logging.getLogger(__name__) self.data = bytearray() self.tmpfile = None @@ -89,16 +98,12 @@ class AIO(asyncio.Protocol): f.write(self.data[pos:pos + length]) pos += length self.logger.debug("starting uvscan for file {}".format(self.tmpfile)) - task = asyncio.async(run(AIO.config["uvscan_path"], self.tmpfile)) - task.add_done_callback(self.handle_uvscan_result) + asyncio.async(AIO.queue.put((AIO.config["uvscan_path"], self.tmpfile, self.send_response))) else: raise RuntimeError("unknown command") except (RuntimeError, IndexError, IOError, struct.error) as e: self.send_response(str(e)) - def handle_uvscan_result(self, task): - self.send_response(task.result()) - def send_response(self, response): response = response.encode() response += AIO.separator @@ -120,6 +125,8 @@ def main(): formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=45, width=140)) parser.add_argument("-c", "--config", help="List of config files to read.", nargs="+", default=["/etc/uvscand.conf"]) + parser.add_argument("-m", "--maxprocs", help="Maximum number of parallel scan processes.", + type=int, default=8) parser.add_argument("-d", "--debug", help="Log debugging messages.", action="store_true") args = parser.parse_args() @@ -169,8 +176,15 @@ def main(): if not os.path.isfile(config["uvscan_path"]) or not os.access(config["uvscan_path"], os.X_OK): logger.error("uvscan binary '{}' does not exist or is not executable".format(config["uvscan_path"])) sys.exit(1) - loop = asyncio.get_event_loop() + + # setup protocol AIO.config = config + + # start uvscan workers + loop = asyncio.get_event_loop() + workers = [loop.create_task(uvscan_worker(AIO.queue)) for _ in range(args.maxprocs)] + + # start server coro = loop.create_server(AIO, config["bind_address"], config["bind_port"]) server = loop.run_until_complete(coro) logger.info("uvscand started") @@ -180,8 +194,13 @@ def main(): except KeyboardInterrupt: pass + # close server server.close() loop.run_until_complete(server.wait_closed()) + + # shutdown uvscan workers + loop.run_until_complete(AIO.queue.put(None)) + loop.run_until_complete(asyncio.wait(workers)) loop.close() logger.info("uvscand stopped")