Limit number of parallel scans

This commit is contained in:
2020-01-31 11:45:17 +01:00
parent 62c7398b1c
commit 8f8e075541

View File

@@ -31,7 +31,13 @@ from subprocess import Popen, PIPE
uvscan_regex = re.compile(r"Found:?(?: the| potentially unwanted program| (?:virus|trojan) or variant)? (.+?)(?:\.| (?:virus |trojan )?)", re.MULTILINE) uvscan_regex = re.compile(r"Found:?(?: the| potentially unwanted program| (?:virus|trojan) or variant)? (.+?)(?:\.| (?:virus |trojan )?)", re.MULTILINE)
async def run(uvscan, filename): async def uvscan_worker(queue):
while True:
job = await queue.get()
if job is None:
await queue.put(None)
break
uvscan, filename, cb = job
proc = await asyncio.create_subprocess_exec(uvscan, "--secure", "--mime", "--noboot", "--panalyse", "--manalyse", filename, stdout=asyncio.subprocess.PIPE) proc = await asyncio.create_subprocess_exec(uvscan, "--secure", "--mime", "--noboot", "--panalyse", "--manalyse", filename, stdout=asyncio.subprocess.PIPE)
stdout, _ = await proc.communicate() stdout, _ = await proc.communicate()
if proc.returncode == 13: if proc.returncode == 13:
@@ -40,16 +46,19 @@ async def run(uvscan, filename):
result = "stream: {} FOUND".format(name) result = "stream: {} FOUND".format(name)
else: else:
result = "stream: OK" result = "stream: OK"
return result cb(result)
class AIO(asyncio.Protocol): class AIO(asyncio.Protocol):
config = None config = None
queue = asyncio.Queue()
separator = b"\x00" separator = b"\x00"
def __init__(self): def __init__(self):
if not AIO.config: if not AIO.config:
raise RuntimeError("configuration not set") raise RuntimeError("configuration not set")
if not AIO.queue:
raise RuntimeError("queue not set")
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.data = bytearray() self.data = bytearray()
self.tmpfile = None self.tmpfile = None
@@ -89,16 +98,12 @@ class AIO(asyncio.Protocol):
f.write(self.data[pos:pos + length]) f.write(self.data[pos:pos + length])
pos += length pos += length
self.logger.debug("starting uvscan for file {}".format(self.tmpfile)) self.logger.debug("starting uvscan for file {}".format(self.tmpfile))
task = asyncio.async(run(AIO.config["uvscan_path"], self.tmpfile)) asyncio.async(AIO.queue.put((AIO.config["uvscan_path"], self.tmpfile, self.send_response)))
task.add_done_callback(self.handle_uvscan_result)
else: else:
raise RuntimeError("unknown command") raise RuntimeError("unknown command")
except (RuntimeError, IndexError, IOError, struct.error) as e: except (RuntimeError, IndexError, IOError, struct.error) as e:
self.send_response(str(e)) self.send_response(str(e))
def handle_uvscan_result(self, task):
self.send_response(task.result())
def send_response(self, response): def send_response(self, response):
response = response.encode() response = response.encode()
response += AIO.separator response += AIO.separator
@@ -120,6 +125,8 @@ def main():
formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=45, width=140)) formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=45, width=140))
parser.add_argument("-c", "--config", help="List of config files to read.", nargs="+", parser.add_argument("-c", "--config", help="List of config files to read.", nargs="+",
default=["/etc/uvscand.conf"]) default=["/etc/uvscand.conf"])
parser.add_argument("-m", "--maxprocs", help="Maximum number of parallel scan processes.",
type=int, default=8)
parser.add_argument("-d", "--debug", help="Log debugging messages.", action="store_true") parser.add_argument("-d", "--debug", help="Log debugging messages.", action="store_true")
args = parser.parse_args() args = parser.parse_args()
@@ -169,8 +176,15 @@ def main():
if not os.path.isfile(config["uvscan_path"]) or not os.access(config["uvscan_path"], os.X_OK): if not os.path.isfile(config["uvscan_path"]) or not os.access(config["uvscan_path"], os.X_OK):
logger.error("uvscan binary '{}' does not exist or is not executable".format(config["uvscan_path"])) logger.error("uvscan binary '{}' does not exist or is not executable".format(config["uvscan_path"]))
sys.exit(1) sys.exit(1)
loop = asyncio.get_event_loop()
# setup protocol
AIO.config = config AIO.config = config
# start uvscan workers
loop = asyncio.get_event_loop()
workers = [loop.create_task(uvscan_worker(AIO.queue)) for _ in range(args.maxprocs)]
# start server
coro = loop.create_server(AIO, config["bind_address"], config["bind_port"]) coro = loop.create_server(AIO, config["bind_address"], config["bind_port"])
server = loop.run_until_complete(coro) server = loop.run_until_complete(coro)
logger.info("uvscand started") logger.info("uvscand started")
@@ -180,8 +194,13 @@ def main():
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
# close server
server.close() server.close()
loop.run_until_complete(server.wait_closed()) loop.run_until_complete(server.wait_closed())
# shutdown uvscan workers
loop.run_until_complete(AIO.queue.put(None))
loop.run_until_complete(asyncio.wait(workers))
loop.close() loop.close()
logger.info("uvscand stopped") logger.info("uvscand stopped")