Source code for pooltool.utils

"""Utilities"""

import importlib.util
import linecache
import os
import pickle
import tracemalloc

import pandas as pd
import pprofile
from panda3d.core import Filename


[docs]class classproperty(property): """Decorator for a class property Examples: >>> from pooltool.utils import classproperty >>> class Test: >>> @classproperty >>> def foo(cls): >>> return cls.__name__ """ def __get__(self, owner_self, owner_cls): # type: ignore return self.fget(owner_cls) # type: ignore
[docs]def save_pickle(x, path): """Save an object `x` to filepath `path`""" with open(path, "wb") as f: pickle.dump(x, f)
def load_pickle(path): with open(path, "rb") as f: return pickle.load(f)
[docs]def is_pickleable(obj): """https://stackoverflow.com/a/53398070""" try: pickle.dumps(obj) except pickle.PicklingError: return False except TypeError: return False return True
def panda_path(path) -> str: panda_path = Filename.fromOsSpecific(str(path)) panda_path.makeTrueCase() return str(panda_path)
[docs]def get_total_memory_usage(keep_raw=False): """Get the total memory, including children Parameters ========== keep_raw : bool, False A human readable format is returned, e.g. "1.41 GB". If keep_raw, the raw number is returned, e.g. 1515601920 """ if importlib.util.find_spec("psutil") is None: # psutil does not exist in this distribution return "??" else: import psutil current_process = psutil.Process(os.getpid()) mem = current_process.memory_info().rss for child in current_process.children(recursive=True): try: mem += child.memory_info().rss except Exception: pass return mem if keep_raw else human_readable_file_size(mem)
[docs]def display_top_memory_usage(snapshot, key_type="lineno", limit=10): """A pretty-print for the tracemalloc memory usage module Modified from https://docs.python.org/3/library/tracemalloc.html Examples ======== >>> import tracemalloc >>> import pooltool.utils as utils >>> tracemalloc.start() >>> snap = tracemalloc.take_snapshot >>> utils.display_top_memory_usage(snap) Top 10 lines #1: anvio/bamops.py:160: 4671.3 KiB constants.cigar_consumption, #2: anvio/bamops.py:96: 2571.6 KiB self.cigartuples = np.array(read.cigartuples) #3: python3.6/linecache.py:137: 1100.0 KiB lines = fp.readlines() #4: <frozen importlib._bootstrap_external>:487: 961.4 KiB #5: typing/templates.py:627: 334.3 KiB return type(base)(name, (base,), dct) #6: typing/templates.py:923: 315.7 KiB class Template(cls): #7: python3.6/_weakrefset.py:84: 225.2 KiB self.data.add(ref(item, self._remove)) #8: targets/npyimpl.py:411: 143.2 KiB class _KernelImpl(_Kernel): #9: _vendor/pyparsing.py:3349: 139.7 KiB self.errmsg = "Expected " + _ustr(self) #10: typing/context.py:456: 105.1 KiB def on_disposal(wr, pop=self._globals.pop): 3212 other: 4611.9 KiB Total allocated size: 15179.4 KiB """ snapshot = snapshot.filter_traces( ( tracemalloc.Filter(False, "<frozen importlib._bootstrap>"), tracemalloc.Filter(False, "<unknown>"), ) ) top_stats = snapshot.statistics(key_type) print("Top %s lines" % limit) for index, stat in enumerate(top_stats[:limit], 1): frame = stat.traceback[0] # replace "/path/to/module/file.py" with "module/file.py" filename = os.sep.join(frame.filename.split(os.sep)[-2:]) print( "#%s: %s:%s: %.1f KiB" % (index, filename, frame.lineno, stat.size / 1024) ) line = linecache.getline(frame.filename, frame.lineno).strip() if line: print(" %s" % line) other = top_stats[limit:] if other: size = sum(stat.size for stat in other) print("%s other: %.1f KiB" % (len(other), size / 1024)) total = sum(stat.size for stat in top_stats) print("Total allocated size: %.1f KiB" % (total / 1024))
[docs]def memory_usage_to_dataframe(snapshot, key_type="lineno", limit=10): """ Convert memory usage data from tracemalloc into a pandas DataFrame. Args: snapshot (tracemalloc.Snapshot): The snapshot of memory allocation. key_type (str): The type of key to categorize memory usage. limit (int): The number of top entries to include. Returns: pd.DataFrame: DataFrame containing memory usage information. """ snapshot = snapshot.filter_traces( ( tracemalloc.Filter(False, "<frozen importlib._bootstrap>"), tracemalloc.Filter(False, "<frozen importlib._bootstrap_external>"), tracemalloc.Filter(False, "<unknown>"), ) ) top_stats = snapshot.statistics(key_type) # Initialize lists to store data ranks = [] files = [] line_numbers = [] memory_usages = [] code_snippets = [] categories = [] # Process top statistics for index, stat in enumerate(top_stats[:limit], 1): frame = stat.traceback[0] filename = os.sep.join(frame.filename.split(os.sep)[-2:]) line = linecache.getline(frame.filename, frame.lineno).strip() ranks.append(index) files.append(filename) line_numbers.append(frame.lineno) memory_usages.append(stat.size / 1024) code_snippets.append(line) categories.append("Top") # Process other statistics other = top_stats[limit:] if other: size = sum(stat.size for stat in other) ranks.append(None) files.append(None) line_numbers.append(None) memory_usages.append(size / 1024) code_snippets.append(None) categories.append("Other") # Create DataFrame data = { "Rank": ranks, "File": files, "Line Number": line_numbers, "Memory Usage (KiB)": memory_usages, "Code Snippet": code_snippets, "Category": categories, } return pd.DataFrame(data)
def human_readable_file_size(nbytes): suffixes = ["B", "KB", "MB", "GB", "TB", "PB"] if nbytes == 0: return "0 B" i = 0 while nbytes >= 1024 and i < len(suffixes) - 1: nbytes /= 1024.0 i += 1 f = ("%.2f" % nbytes).rstrip("0").rstrip(".") return "%s %s" % (f, suffixes[i])
[docs]class PProfile(pprofile.Profile): """Small wrapper for pprofile that accepts a filepath and outputs cachegrind file""" def __init__(self, path, should_run: bool = True): self.should_run = should_run self.path = path pprofile.Profile.__init__(self) def __enter__(self): if self.should_run: return super().__enter__() else: return self def __exit__(self, *args): if self.should_run: pprofile.Profile.__exit__(self, *args) self.dump_stats(self.path)