Fix high memory usage

* Do not load all domain names in a file to memory, use generator instead
* Do not construct async tasks for all domains, slice it by TASK_AMOUNT_MULTIPLIER
    instead

This modification decreases memory usage from ~470 MB down to ~35-40 MB
This commit is contained in:
ValdikSS 2022-08-07 17:38:50 +03:00
parent c4bd586105
commit 3b26ff5307

View File

@ -8,6 +8,7 @@ import dns.rdatatype
import dns.asyncresolver import dns.asyncresolver
import dns.exception import dns.exception
import dns._asyncio_backend import dns._asyncio_backend
import itertools
# DNS timeout (in seconds) for the initial DNS resolving pass # DNS timeout (in seconds) for the initial DNS resolving pass
INITIAL_PASS_TIMEOUT = 3 INITIAL_PASS_TIMEOUT = 3
@ -19,6 +20,9 @@ FINAL_PASS_TIMEOUT = 10
# Number of concurrent resolving 'threads' for final pass # Number of concurrent resolving 'threads' for final pass
FINAL_PASS_CONCURRENCY = 35 FINAL_PASS_CONCURRENCY = 35
# Multiplication number of asynchronous tasks to build for a single
# resolver loop
TASK_AMOUNT_MULTIPLIER = 5
NS_FILTER_SUBSTRINGS = ("parking", "expired", ".afternic.com", "parklogic", ".parktons.com", NS_FILTER_SUBSTRINGS = ("parking", "expired", ".afternic.com", "parklogic", ".parktons.com",
".above.com", ".ztomy.com", ".notneiron.com", ".ibspark.com", ".bodis.com", ".above.com", ".ztomy.com", ".notneiron.com", ".ibspark.com", ".bodis.com",
@ -49,17 +53,25 @@ class AZResolver(dns.asyncresolver.Resolver):
# Do not thread domain as broken if the answer is empty # Do not thread domain as broken if the answer is empty
pass pass
async def runTasksWithProgress(tasks): def tasksProvider(domainiter, resolver):
for domain in domainiter:
yield asyncio.ensure_future(resolver.nxresolve(domain.strip()))
async def runTasksWithProgress(tasks, tasknumber, concurrent_tasks):
progress = 0 progress = 0
old_progress = 0 old_progress = 0
ret = [] ret = []
for task in asyncio.as_completed(tasks): tasklist = list(itertools.islice(tasks, concurrent_tasks))
while tasklist:
for task in asyncio.as_completed(tasklist):
ret.append(await task) ret.append(await task)
progress = int(len(ret) / len(tasks) * 100) progress = int(len(ret) / tasknumber * 100)
if old_progress < progress: if old_progress < progress:
print("{}%...".format(progress), end='\r', file=sys.stderr, flush=True) print("{}%...".format(progress), end='\r', file=sys.stderr, flush=True)
old_progress = progress old_progress = progress
tasklist = list(itertools.islice(tasks, concurrent_tasks))
print(file=sys.stderr) print(file=sys.stderr)
return ret return ret
@ -74,44 +86,50 @@ async def main():
r.lifetime = INITIAL_PASS_TIMEOUT r.lifetime = INITIAL_PASS_TIMEOUT
# Load domain file list and schedule resolving # Load domain file list and schedule resolving
tasks = [] domaincount = 0
try: try:
with open(sys.argv[1], 'rb') as domainlist: with open(sys.argv[1], 'r') as domainlist:
for domain in domainlist: for domain in domainlist:
tasks.append(asyncio.ensure_future(r.nxresolve(domain.decode().strip()))) domaincount += 1
except OSError as e: except OSError as e:
print("Can't open file", sys.argv[1], e, file=sys.stderr) print("Can't open file", sys.argv[1], e, file=sys.stderr)
sys.exit(2) sys.exit(2)
print("Loaded list of {} elements, resolving NXDOMAINS".format(len(tasks)), file=sys.stderr) print("Loaded list of {} elements, resolving NXDOMAINS".format(domaincount), file=sys.stderr)
#sys.exit(0) #sys.exit(0)
try: try:
domainfile = open(sys.argv[1], 'r')
# Resolve domains, first try # Resolve domains, first try
nxresolved_first = await runTasksWithProgress(tasks) nxresolved_first = await runTasksWithProgress(
tasksProvider(domainfile, r), domaincount,
INITIAL_PASS_CONCURRENCY * TASK_AMOUNT_MULTIPLIER
)
nxresolved_first = list(filter(None, nxresolved_first)) nxresolved_first = list(filter(None, nxresolved_first))
print("Got {} broken domains, trying to resolve them again " print("Got {} broken domains, trying to resolve them again "
"to make sure".format(len(nxresolved_first)), file=sys.stderr) "to make sure".format(len(nxresolved_first)), file=sys.stderr)
# Second try # Second try
tasks = []
r.limitConcurrency(FINAL_PASS_CONCURRENCY) r.limitConcurrency(FINAL_PASS_CONCURRENCY)
r.timeout = FINAL_PASS_TIMEOUT r.timeout = FINAL_PASS_TIMEOUT
r.lifetime = FINAL_PASS_TIMEOUT r.lifetime = FINAL_PASS_TIMEOUT
for domain in nxresolved_first: nxresolved_second = await runTasksWithProgress(
tasks.append(asyncio.ensure_future(r.nxresolve(domain))) tasksProvider(nxresolved_first, r), len(nxresolved_first),
nxresolved_second = await runTasksWithProgress(tasks) FINAL_PASS_CONCURRENCY * TASK_AMOUNT_MULTIPLIER
)
nxresolved_second = list(filter(None, nxresolved_second)) nxresolved_second = list(filter(None, nxresolved_second))
print("Finally, got {} broken domains".format(len(nxresolved_second)), file=sys.stderr) print("Finally, got {} broken domains".format(len(nxresolved_second)), file=sys.stderr)
for domain in nxresolved_second: for domain in nxresolved_second:
print(domain) print(domain)
domainfile.close()
except (SystemExit, KeyboardInterrupt): except (SystemExit, KeyboardInterrupt):
for task in tasks: print("Exiting")
task.cancel()
if __name__ == '__main__': if __name__ == '__main__':