Merge 715e08b03bfafd578057d88c63aca622dad089f8 into 68f9b2859d45a60e699839f87b1b9558cd36a329

This commit is contained in:
Bob Firestone 2023-10-08 11:04:16 -03:00 committed by GitHub
commit a6dc6f3614
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -51,13 +51,13 @@ from yarg.exceptions import HTTPError
from pipreqs import __version__ from pipreqs import __version__
REGEXP = [ REGEXP = [
re.compile(r'^import (.+)$'), re.compile(r"^import (.+)$"),
re.compile(r'^from ((?!\.+).*?) import (?:.*)$') re.compile(r"^from ((?!\.+).*?) import (?:.*)$"),
] ]
@contextmanager @contextmanager
def _open(filename=None, mode='r'): def _open(filename=None, mode="r"):
"""Open a file or ``sys.stdout`` depending on the provided filename. """Open a file or ``sys.stdout`` depending on the provided filename.
Args: Args:
@ -70,13 +70,13 @@ def _open(filename=None, mode='r'):
A file handle. A file handle.
""" """
if not filename or filename == '-': if not filename or filename == "-":
if not mode or 'r' in mode: if not mode or "r" in mode:
file = sys.stdin file = sys.stdin
elif 'w' in mode: elif "w" in mode:
file = sys.stdout file = sys.stdout
else: else:
raise ValueError('Invalid mode for file: {}'.format(mode)) raise ValueError("Invalid mode for file: {}".format(mode))
else: else:
file = open(filename, mode) file = open(filename, mode)
@ -87,8 +87,7 @@ def _open(filename=None, mode='r'):
file.close() file.close()
def get_all_imports( def get_all_imports(path, encoding=None, extra_ignore_dirs=None, follow_links=True):
path, encoding=None, extra_ignore_dirs=None, follow_links=True):
imports = set() imports = set()
raw_imports = set() raw_imports = set()
candidates = [] candidates = []
@ -137,11 +136,11 @@ def get_all_imports(
# Cleanup: We only want to first part of the import. # Cleanup: We only want to first part of the import.
# Ex: from django.conf --> django.conf. But we only want django # Ex: from django.conf --> django.conf. But we only want django
# as an import. # as an import.
cleaned_name, _, _ = name.partition('.') cleaned_name, _, _ = name.partition(".")
imports.add(cleaned_name) imports.add(cleaned_name)
packages = imports - (set(candidates) & imports) packages = imports - (set(candidates) & imports)
logging.debug('Found packages: {0}'.format(packages)) logging.debug("Found packages: {0}".format(packages))
with open(join("stdlib"), "r") as f: with open(join("stdlib"), "r") as f:
data = {x.strip() for x in f} data = {x.strip() for x in f}
@ -155,61 +154,68 @@ def filter_line(line):
def generate_requirements_file(path, imports, symbol): def generate_requirements_file(path, imports, symbol):
with _open(path, "w") as out_file: with _open(path, "w") as out_file:
logging.debug('Writing {num} requirements: {imports} to {file}'.format( logging.debug(
"Writing {num} requirements: {imports} to {file}".format(
num=len(imports), num=len(imports),
file=path, file=path,
imports=", ".join([x['name'] for x in imports]) imports=", ".join([x["name"] for x in imports]),
)) )
fmt = '{name}' + symbol + '{version}' )
out_file.write('\n'.join( fmt = "{name}" + symbol + "{version}"
fmt.format(**item) if item['version'] else '{name}'.format(**item) out_file.write(
for item in imports) + '\n') "\n".join(
fmt.format(**item) if item["version"] else "{name}".format(**item)
for item in imports
)
+ "\n"
)
def output_requirements(imports, symbol): def output_requirements(imports, symbol):
generate_requirements_file('-', imports, symbol) generate_requirements_file("-", imports, symbol)
def get_imports_info( def get_imports_info(imports, pypi_server="https://pypi.python.org/pypi/", proxy=None):
imports, pypi_server="https://pypi.python.org/pypi/", proxy=None):
result = [] result = []
for item in imports: for item in imports:
try: try:
logging.warning( logging.warning(
'Import named "%s" not found locally. ' 'Import named "%s" not found locally. '
'Trying to resolve it at the PyPI server.', "Trying to resolve it at the PyPI server.",
item item,
) )
response = requests.get( response = requests.get(
"{0}{1}/json".format(pypi_server, item), proxies=proxy) "{0}{1}/json".format(pypi_server, item), proxies=proxy
)
if response.status_code == 200: if response.status_code == 200:
if hasattr(response.content, 'decode'): if hasattr(response.content, "decode"):
data = json2package(response.content.decode()) data = json2package(response.content.decode())
else: else:
data = json2package(response.content) data = json2package(response.content)
elif response.status_code >= 300: elif response.status_code >= 300:
raise HTTPError(status_code=response.status_code, raise HTTPError(
reason=response.reason) status_code=response.status_code, reason=response.reason
)
except HTTPError: except HTTPError:
logging.warning( logging.warning('Package "%s" does not exist or network problems', item)
'Package "%s" does not exist or network problems', item)
continue continue
logging.warning( logging.warning(
'Import named "%s" was resolved to "%s:%s" package (%s).\n' 'Import named "%s" was resolved to "%s:%s" package (%s).\n'
'Please, verify manually the final list of requirements.txt ' "Please, verify manually the final list of requirements.txt "
'to avoid possible dependency confusions.', "to avoid possible dependency confusions.",
item, item,
data.name, data.name,
data.latest_release_id, data.latest_release_id,
data.pypi_url data.pypi_url,
) )
result.append({'name': item, 'version': data.latest_release_id}) result.append({"name": item, "version": data.latest_release_id})
return result return result
def get_locally_installed_packages(encoding=None): def get_locally_installed_packages(encoding=None):
packages = [] packages = []
unique_package_names = set()
ignore = ["tests", "_tests", "egg", "EGG", "info"] ignore = ["tests", "_tests", "egg", "EGG", "info"]
for path in sys.path: for path in sys.path:
for root, dirs, files in os.walk(path): for root, dirs, files in os.walk(path):
@ -218,6 +224,7 @@ def get_locally_installed_packages(encoding=None):
item = os.path.join(root, item) item = os.path.join(root, item)
with open(item, "r", encoding=encoding) as f: with open(item, "r", encoding=encoding) as f:
package = root.split(os.sep)[-1].split("-") package = root.split(os.sep)[-1].split("-")
if package[0] not in unique_package_names:
try: try:
top_level_modules = f.read().strip().split("\n") top_level_modules = f.read().strip().split("\n")
except: # NOQA except: # NOQA
@ -229,25 +236,51 @@ def get_locally_installed_packages(encoding=None):
filtered_top_level_modules = list() filtered_top_level_modules = list()
for module in top_level_modules: for module in top_level_modules:
if ( if (module not in ignore) and (
(module not in ignore) and package[0] not in ignore
(package[0] not in ignore)
): ):
# append exported top level modules to the list # append exported top level modules to the list
filtered_top_level_modules.append(module) filtered_top_level_modules.append(module)
version = None version = None
if len(package) > 1: if len(package) > 1:
version = package[1].replace( version = (
".dist", "").replace(".egg", "") package[1].replace(".dist", "").replace(".egg", "")
)
# append package: top_level_modules pairs # append package: top_level_modules pairs
# instead of top_level_module: package pairs # instead of top_level_module: package pairs
packages.append({
'name': package[0], unique_package_names.add(package[0])
'version': version,
'exports': filtered_top_level_modules packages.append(
}) {
"name": package[0],
"version": version,
"exports": filtered_top_level_modules,
}
)
if "METADATA" in item:
item = os.path.join(root, item)
with open(item, "r", encoding=encoding) as file:
try:
data = {}
for line in file:
if ":" in line:
key, value = line.split(":", 1)
data[key.strip()] = value.strip()
name = data["Name"]
version = data["Version"]
if name and name not in unique_package_names:
unique_package_names.add(name)
packages.append(
{"name": name, "version": version, "exports": []}
)
except:
continue
return packages return packages
@ -260,14 +293,14 @@ def get_import_local(imports, encoding=None):
# if candidate import name matches export name # if candidate import name matches export name
# or candidate import name equals to the package name # or candidate import name equals to the package name
# append it to the result # append it to the result
if item in package['exports'] or item == package['name']: if item in package["exports"] or item == package["name"]:
result.append(package) result.append(package)
# removing duplicates of package/version # removing duplicates of package/version
# had to use second method instead of the previous one, # had to use second method instead of the previous one,
# because we have a list in the 'exports' field # because we have a list in the 'exports' field
# https://stackoverflow.com/questions/9427163/remove-duplicate-dict-in-list-in-python # https://stackoverflow.com/questions/9427163/remove-duplicate-dict-in-list-in-python
result_unique = [i for n, i in enumerate(result) if i not in result[n+1:]] result_unique = [i for n, i in enumerate(result) if i not in result[n + 1 :]]
return result_unique return result_unique
@ -298,7 +331,7 @@ def get_name_without_alias(name):
match = REGEXP[0].match(name.strip()) match = REGEXP[0].match(name.strip())
if match: if match:
name = match.groups(0)[0] name = match.groups(0)[0]
return name.partition(' as ')[0].partition('.')[0].strip() return name.partition(" as ")[0].partition(".")[0].strip()
def join(f): def join(f):
@ -384,7 +417,8 @@ def diff(file_, imports):
logging.info( logging.info(
"The following modules are in {} but do not seem to be imported: " "The following modules are in {} but do not seem to be imported: "
"{}".format(file_, ", ".join(x for x in modules_not_imported))) "{}".format(file_, ", ".join(x for x in modules_not_imported))
)
def clean(file_, imports): def clean(file_, imports):
@ -432,30 +466,38 @@ def dynamic_versioning(scheme, imports):
def init(args): def init(args):
encoding = args.get('--encoding') encoding = args.get("--encoding")
extra_ignore_dirs = args.get('--ignore') extra_ignore_dirs = args.get("--ignore")
follow_links = not args.get('--no-follow-links') follow_links = not args.get("--no-follow-links")
input_path = args['<path>'] input_path = args["<path>"]
if input_path is None: if input_path is None:
input_path = os.path.abspath(os.curdir) input_path = os.path.abspath(os.curdir)
if extra_ignore_dirs: if extra_ignore_dirs:
extra_ignore_dirs = extra_ignore_dirs.split(',') extra_ignore_dirs = extra_ignore_dirs.split(",")
path = (args["--savepath"] if args["--savepath"] else path = (
os.path.join(input_path, "requirements.txt")) args["--savepath"]
if (not args["--print"] if args["--savepath"]
else os.path.join(input_path, "requirements.txt")
)
if (
not args["--print"]
and not args["--savepath"] and not args["--savepath"]
and not args["--force"] and not args["--force"]
and os.path.exists(path)): and os.path.exists(path)
logging.warning("requirements.txt already exists, " ):
"use --force to overwrite it") logging.warning(
"requirements.txt already exists, " "use --force to overwrite it"
)
return return
candidates = get_all_imports(input_path, candidates = get_all_imports(
input_path,
encoding=encoding, encoding=encoding,
extra_ignore_dirs=extra_ignore_dirs, extra_ignore_dirs=extra_ignore_dirs,
follow_links=follow_links) follow_links=follow_links,
)
candidates = get_pkg_names(candidates) candidates = get_pkg_names(candidates)
logging.debug("Found imports: " + ", ".join(candidates)) logging.debug("Found imports: " + ", ".join(candidates))
pypi_server = "https://pypi.python.org/pypi/" pypi_server = "https://pypi.python.org/pypi/"
@ -464,11 +506,10 @@ def init(args):
pypi_server = args["--pypi-server"] pypi_server = args["--pypi-server"]
if args["--proxy"]: if args["--proxy"]:
proxy = {'http': args["--proxy"], 'https': args["--proxy"]} proxy = {"http": args["--proxy"], "https": args["--proxy"]}
if args["--use-local"]: if args["--use-local"]:
logging.debug( logging.debug("Getting package information ONLY from local installation.")
"Getting package information ONLY from local installation.")
imports = get_import_local(candidates, encoding=encoding) imports = get_import_local(candidates, encoding=encoding)
else: else:
logging.debug("Getting packages information from Local/PyPI") logging.debug("Getting packages information from Local/PyPI")
@ -478,20 +519,23 @@ def init(args):
# the list of exported modules, installed locally # the list of exported modules, installed locally
# and the package name is not in the list of local module names # and the package name is not in the list of local module names
# it add to difference # it add to difference
difference = [x for x in candidates if difference = [
x
for x in candidates
if
# aggregate all export lists into one # aggregate all export lists into one
# flatten the list # flatten the list
# check if candidate is in exports # check if candidate is in exports
x.lower() not in [y for x in local for y in x['exports']] x.lower() not in [y for x in local for y in x["exports"]] and
and
# check if candidate is package names # check if candidate is package names
x.lower() not in [x['name'] for x in local]] x.lower() not in [x["name"] for x in local]
]
imports = local + get_imports_info(difference, imports = local + get_imports_info(
proxy=proxy, difference, proxy=proxy, pypi_server=pypi_server
pypi_server=pypi_server) )
# sort imports based on lowercase name of package, similar to `pip freeze`. # sort imports based on lowercase name of package, similar to `pip freeze`.
imports = sorted(imports, key=lambda x: x['name'].lower()) imports = sorted(imports, key=lambda x: x["name"].lower())
if args["--diff"]: if args["--diff"]:
diff(args["--diff"], imports) diff(args["--diff"], imports)
@ -506,8 +550,10 @@ def init(args):
if scheme in ["compat", "gt", "no-pin"]: if scheme in ["compat", "gt", "no-pin"]:
imports, symbol = dynamic_versioning(scheme, imports) imports, symbol = dynamic_versioning(scheme, imports)
else: else:
raise ValueError("Invalid argument for mode flag, " raise ValueError(
"use 'compat', 'gt' or 'no-pin' instead") "Invalid argument for mode flag, "
"use 'compat', 'gt' or 'no-pin' instead"
)
else: else:
symbol = "==" symbol = "=="
@ -521,8 +567,8 @@ def init(args):
def main(): # pragma: no cover def main(): # pragma: no cover
args = docopt(__doc__, version=__version__) args = docopt(__doc__, version=__version__)
log_level = logging.DEBUG if args['--debug'] else logging.INFO log_level = logging.DEBUG if args["--debug"] else logging.INFO
logging.basicConfig(level=log_level, format='%(levelname)s: %(message)s') logging.basicConfig(level=log_level, format="%(levelname)s: %(message)s")
try: try:
init(args) init(args)
@ -530,5 +576,5 @@ def main(): # pragma: no cover
sys.exit(0) sys.exit(0)
if __name__ == '__main__': if __name__ == "__main__":
main() # pragma: no cover main() # pragma: no cover