From 3b26ff5307c325d65ccf943ebe8e9bafeedb5754 Mon Sep 17 00:00:00 2001 From: ValdikSS Date: Sun, 7 Aug 2022 17:38:50 +0300 Subject: [PATCH] Fix high memory usage * Do not load all domain names in a file to memory, use generator instead * Do not construct async tasks for all domains, slice it by TASK_AMOUNT_MULTIPLIER instead This modification decreases memory usage from ~470 MB down to ~35-40 MB --- scripts/resolve-dns-nxdomain.py | 54 ++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/scripts/resolve-dns-nxdomain.py b/scripts/resolve-dns-nxdomain.py index b947d91..736b87c 100755 --- a/scripts/resolve-dns-nxdomain.py +++ b/scripts/resolve-dns-nxdomain.py @@ -8,6 +8,7 @@ import dns.rdatatype import dns.asyncresolver import dns.exception import dns._asyncio_backend +import itertools # DNS timeout (in seconds) for the initial DNS resolving pass INITIAL_PASS_TIMEOUT = 3 @@ -19,6 +20,9 @@ FINAL_PASS_TIMEOUT = 10 # Number of concurrent resolving 'threads' for final pass FINAL_PASS_CONCURRENCY = 35 +# Multiplication number of asynchronous tasks to build for a single +# resolver loop +TASK_AMOUNT_MULTIPLIER = 5 NS_FILTER_SUBSTRINGS = ("parking", "expired", ".afternic.com", "parklogic", ".parktons.com", ".above.com", ".ztomy.com", ".notneiron.com", ".ibspark.com", ".bodis.com", @@ -49,17 +53,25 @@ class AZResolver(dns.asyncresolver.Resolver): # Do not thread domain as broken if the answer is empty pass -async def runTasksWithProgress(tasks): +def tasksProvider(domainiter, resolver): + for domain in domainiter: + yield asyncio.ensure_future(resolver.nxresolve(domain.strip())) + +async def runTasksWithProgress(tasks, tasknumber, concurrent_tasks): progress = 0 old_progress = 0 ret = [] - for task in asyncio.as_completed(tasks): - ret.append(await task) - progress = int(len(ret) / len(tasks) * 100) - if old_progress < progress: - print("{}%...".format(progress), end='\r', file=sys.stderr, flush=True) - old_progress = progress + tasklist = list(itertools.islice(tasks, concurrent_tasks)) + while tasklist: + for task in asyncio.as_completed(tasklist): + ret.append(await task) + progress = int(len(ret) / tasknumber * 100) + if old_progress < progress: + print("{}%...".format(progress), end='\r', file=sys.stderr, flush=True) + old_progress = progress + tasklist = list(itertools.islice(tasks, concurrent_tasks)) + print(file=sys.stderr) return ret @@ -74,44 +86,50 @@ async def main(): r.lifetime = INITIAL_PASS_TIMEOUT # Load domain file list and schedule resolving - tasks = [] + domaincount = 0 try: - with open(sys.argv[1], 'rb') as domainlist: + with open(sys.argv[1], 'r') as domainlist: for domain in domainlist: - tasks.append(asyncio.ensure_future(r.nxresolve(domain.decode().strip()))) + domaincount += 1 except OSError as e: print("Can't open file", sys.argv[1], e, file=sys.stderr) sys.exit(2) - print("Loaded list of {} elements, resolving NXDOMAINS".format(len(tasks)), file=sys.stderr) + print("Loaded list of {} elements, resolving NXDOMAINS".format(domaincount), file=sys.stderr) #sys.exit(0) try: + domainfile = open(sys.argv[1], 'r') + # Resolve domains, first try - nxresolved_first = await runTasksWithProgress(tasks) + nxresolved_first = await runTasksWithProgress( + tasksProvider(domainfile, r), domaincount, + INITIAL_PASS_CONCURRENCY * TASK_AMOUNT_MULTIPLIER + ) nxresolved_first = list(filter(None, nxresolved_first)) print("Got {} broken domains, trying to resolve them again " "to make sure".format(len(nxresolved_first)), file=sys.stderr) # Second try - tasks = [] r.limitConcurrency(FINAL_PASS_CONCURRENCY) r.timeout = FINAL_PASS_TIMEOUT r.lifetime = FINAL_PASS_TIMEOUT - for domain in nxresolved_first: - tasks.append(asyncio.ensure_future(r.nxresolve(domain))) - nxresolved_second = await runTasksWithProgress(tasks) + nxresolved_second = await runTasksWithProgress( + tasksProvider(nxresolved_first, r), len(nxresolved_first), + FINAL_PASS_CONCURRENCY * TASK_AMOUNT_MULTIPLIER + ) nxresolved_second = list(filter(None, nxresolved_second)) print("Finally, got {} broken domains".format(len(nxresolved_second)), file=sys.stderr) for domain in nxresolved_second: print(domain) + domainfile.close() + except (SystemExit, KeyboardInterrupt): - for task in tasks: - task.cancel() + print("Exiting") if __name__ == '__main__':