handle FileNotFoundError in parse_requirements function

This commit is contained in:
mateuslatrova 2023-10-19 19:52:08 -03:00 committed by Alan Barzilay
parent 55eee298ec
commit 368e9ae7e7
2 changed files with 124 additions and 99 deletions

View File

@ -50,14 +50,11 @@ from yarg.exceptions import HTTPError
from pipreqs import __version__ from pipreqs import __version__
REGEXP = [ REGEXP = [re.compile(r"^import (.+)$"), re.compile(r"^from ((?!\.+).*?) import (?:.*)$")]
re.compile(r'^import (.+)$'),
re.compile(r'^from ((?!\.+).*?) import (?:.*)$')
]
@contextmanager @contextmanager
def _open(filename=None, mode='r'): def _open(filename=None, mode="r"):
"""Open a file or ``sys.stdout`` depending on the provided filename. """Open a file or ``sys.stdout`` depending on the provided filename.
Args: Args:
@ -70,13 +67,13 @@ def _open(filename=None, mode='r'):
A file handle. A file handle.
""" """
if not filename or filename == '-': if not filename or filename == "-":
if not mode or 'r' in mode: if not mode or "r" in mode:
file = sys.stdin file = sys.stdin
elif 'w' in mode: elif "w" in mode:
file = sys.stdout file = sys.stdout
else: else:
raise ValueError('Invalid mode for file: {}'.format(mode)) raise ValueError("Invalid mode for file: {}".format(mode))
else: else:
file = open(filename, mode) file = open(filename, mode)
@ -87,8 +84,7 @@ def _open(filename=None, mode='r'):
file.close() file.close()
def get_all_imports( def get_all_imports(path, encoding=None, extra_ignore_dirs=None, follow_links=True):
path, encoding=None, extra_ignore_dirs=None, follow_links=True):
imports = set() imports = set()
raw_imports = set() raw_imports = set()
candidates = [] candidates = []
@ -137,11 +133,11 @@ def get_all_imports(
# Cleanup: We only want to first part of the import. # Cleanup: We only want to first part of the import.
# Ex: from django.conf --> django.conf. But we only want django # Ex: from django.conf --> django.conf. But we only want django
# as an import. # as an import.
cleaned_name, _, _ = name.partition('.') cleaned_name, _, _ = name.partition(".")
imports.add(cleaned_name) imports.add(cleaned_name)
packages = imports - (set(candidates) & imports) packages = imports - (set(candidates) & imports)
logging.debug('Found packages: {0}'.format(packages)) logging.debug("Found packages: {0}".format(packages))
with open(join("stdlib"), "r") as f: with open(join("stdlib"), "r") as f:
data = {x.strip() for x in f} data = {x.strip() for x in f}
@ -151,56 +147,55 @@ def get_all_imports(
def generate_requirements_file(path, imports, symbol): def generate_requirements_file(path, imports, symbol):
with _open(path, "w") as out_file: with _open(path, "w") as out_file:
logging.debug('Writing {num} requirements: {imports} to {file}'.format( logging.debug(
num=len(imports), "Writing {num} requirements: {imports} to {file}".format(
file=path, num=len(imports), file=path, imports=", ".join([x["name"] for x in imports])
imports=", ".join([x['name'] for x in imports]) )
)) )
fmt = '{name}' + symbol + '{version}' fmt = "{name}" + symbol + "{version}"
out_file.write('\n'.join( out_file.write(
fmt.format(**item) if item['version'] else '{name}'.format(**item) "\n".join(
for item in imports) + '\n') fmt.format(**item) if item["version"] else "{name}".format(**item)
for item in imports
)
+ "\n"
)
def output_requirements(imports, symbol): def output_requirements(imports, symbol):
generate_requirements_file('-', imports, symbol) generate_requirements_file("-", imports, symbol)
def get_imports_info( def get_imports_info(imports, pypi_server="https://pypi.python.org/pypi/", proxy=None):
imports, pypi_server="https://pypi.python.org/pypi/", proxy=None):
result = [] result = []
for item in imports: for item in imports:
try: try:
logging.warning( logging.warning(
'Import named "%s" not found locally. ' 'Import named "%s" not found locally. ' "Trying to resolve it at the PyPI server.",
'Trying to resolve it at the PyPI server.', item,
item
) )
response = requests.get( response = requests.get("{0}{1}/json".format(pypi_server, item), proxies=proxy)
"{0}{1}/json".format(pypi_server, item), proxies=proxy)
if response.status_code == 200: if response.status_code == 200:
if hasattr(response.content, 'decode'): if hasattr(response.content, "decode"):
data = json2package(response.content.decode()) data = json2package(response.content.decode())
else: else:
data = json2package(response.content) data = json2package(response.content)
elif response.status_code >= 300: elif response.status_code >= 300:
raise HTTPError(status_code=response.status_code, raise HTTPError(status_code=response.status_code, reason=response.reason)
reason=response.reason)
except HTTPError: except HTTPError:
logging.warning( logging.warning('Package "%s" does not exist or network problems', item)
'Package "%s" does not exist or network problems', item)
continue continue
logging.warning( logging.warning(
'Import named "%s" was resolved to "%s:%s" package (%s).\n' 'Import named "%s" was resolved to "%s:%s" package (%s).\n'
'Please, verify manually the final list of requirements.txt ' "Please, verify manually the final list of requirements.txt "
'to avoid possible dependency confusions.', "to avoid possible dependency confusions.",
item, item,
data.name, data.name,
data.latest_release_id, data.latest_release_id,
data.pypi_url data.pypi_url,
) )
result.append({'name': item, 'version': data.latest_release_id}) result.append({"name": item, "version": data.latest_release_id})
return result return result
@ -225,25 +220,23 @@ def get_locally_installed_packages(encoding=None):
filtered_top_level_modules = list() filtered_top_level_modules = list()
for module in top_level_modules: for module in top_level_modules:
if ( if (module not in ignore) and (package[0] not in ignore):
(module not in ignore) and
(package[0] not in ignore)
):
# append exported top level modules to the list # append exported top level modules to the list
filtered_top_level_modules.append(module) filtered_top_level_modules.append(module)
version = None version = None
if len(package) > 1: if len(package) > 1:
version = package[1].replace( version = package[1].replace(".dist", "").replace(".egg", "")
".dist", "").replace(".egg", "")
# append package: top_level_modules pairs # append package: top_level_modules pairs
# instead of top_level_module: package pairs # instead of top_level_module: package pairs
packages.append({ packages.append(
'name': package[0], {
'version': version, "name": package[0],
'exports': filtered_top_level_modules "version": version,
}) "exports": filtered_top_level_modules,
}
)
return packages return packages
@ -256,14 +249,14 @@ def get_import_local(imports, encoding=None):
# if candidate import name matches export name # if candidate import name matches export name
# or candidate import name equals to the package name # or candidate import name equals to the package name
# append it to the result # append it to the result
if item in package['exports'] or item == package['name']: if item in package["exports"] or item == package["name"]:
result.append(package) result.append(package)
# removing duplicates of package/version # removing duplicates of package/version
# had to use second method instead of the previous one, # had to use second method instead of the previous one,
# because we have a list in the 'exports' field # because we have a list in the 'exports' field
# https://stackoverflow.com/questions/9427163/remove-duplicate-dict-in-list-in-python # https://stackoverflow.com/questions/9427163/remove-duplicate-dict-in-list-in-python
result_unique = [i for n, i in enumerate(result) if i not in result[n+1:]] result_unique = [i for n, i in enumerate(result) if i not in result[n + 1:]]
return result_unique return result_unique
@ -294,7 +287,7 @@ def get_name_without_alias(name):
match = REGEXP[0].match(name.strip()) match = REGEXP[0].match(name.strip())
if match: if match:
name = match.groups(0)[0] name = match.groups(0)[0]
return name.partition(' as ')[0].partition('.')[0].strip() return name.partition(" as ")[0].partition(".")[0].strip()
def join(f): def join(f):
@ -308,6 +301,9 @@ def parse_requirements(file_):
delimiter, get module name by element index, create a dict consisting of delimiter, get module name by element index, create a dict consisting of
module:version, and add dict to list of parsed modules. module:version, and add dict to list of parsed modules.
If file ´file_´ is not found in the system, the program will print a
helpful message and end its execution immediately.
Args: Args:
file_: File to parse. file_: File to parse.
@ -324,9 +320,12 @@ def parse_requirements(file_):
try: try:
f = open(file_, "r") f = open(file_, "r")
except OSError: except FileNotFoundError:
logging.error("Failed on file: {}".format(file_)) print(f"File {file_} was not found. Please, fix it and run again.")
raise sys.exit(1)
except OSError as error:
logging.error(f"There was an error opening the file {file_}: {str(error)}")
raise error
else: else:
try: try:
data = [x.strip() for x in f.readlines() if x != "\n"] data = [x.strip() for x in f.readlines() if x != "\n"]
@ -353,6 +352,7 @@ def parse_requirements(file_):
return modules return modules
def compare_modules(file_, imports): def compare_modules(file_, imports):
"""Compare modules in a file to imported modules in a project. """Compare modules in a file to imported modules in a project.
@ -379,7 +379,8 @@ def diff(file_, imports):
logging.info( logging.info(
"The following modules are in {} but do not seem to be imported: " "The following modules are in {} but do not seem to be imported: "
"{}".format(file_, ", ".join(x for x in modules_not_imported))) "{}".format(file_, ", ".join(x for x in modules_not_imported))
)
def clean(file_, imports): def clean(file_, imports):
@ -427,30 +428,34 @@ def dynamic_versioning(scheme, imports):
def init(args): def init(args):
encoding = args.get('--encoding') encoding = args.get("--encoding")
extra_ignore_dirs = args.get('--ignore') extra_ignore_dirs = args.get("--ignore")
follow_links = not args.get('--no-follow-links') follow_links = not args.get("--no-follow-links")
input_path = args['<path>'] input_path = args["<path>"]
if input_path is None: if input_path is None:
input_path = os.path.abspath(os.curdir) input_path = os.path.abspath(os.curdir)
if extra_ignore_dirs: if extra_ignore_dirs:
extra_ignore_dirs = extra_ignore_dirs.split(',') extra_ignore_dirs = extra_ignore_dirs.split(",")
path = (args["--savepath"] if args["--savepath"] else path = (
os.path.join(input_path, "requirements.txt")) args["--savepath"] if args["--savepath"] else os.path.join(input_path, "requirements.txt")
if (not args["--print"] )
and not args["--savepath"] if (
and not args["--force"] not args["--print"]
and os.path.exists(path)): and not args["--savepath"]
logging.warning("requirements.txt already exists, " and not args["--force"]
"use --force to overwrite it") and os.path.exists(path)
):
logging.warning("requirements.txt already exists, " "use --force to overwrite it")
return return
candidates = get_all_imports(input_path, candidates = get_all_imports(
encoding=encoding, input_path,
extra_ignore_dirs=extra_ignore_dirs, encoding=encoding,
follow_links=follow_links) extra_ignore_dirs=extra_ignore_dirs,
follow_links=follow_links,
)
candidates = get_pkg_names(candidates) candidates = get_pkg_names(candidates)
logging.debug("Found imports: " + ", ".join(candidates)) logging.debug("Found imports: " + ", ".join(candidates))
pypi_server = "https://pypi.python.org/pypi/" pypi_server = "https://pypi.python.org/pypi/"
@ -459,11 +464,10 @@ def init(args):
pypi_server = args["--pypi-server"] pypi_server = args["--pypi-server"]
if args["--proxy"]: if args["--proxy"]:
proxy = {'http': args["--proxy"], 'https': args["--proxy"]} proxy = {"http": args["--proxy"], "https": args["--proxy"]}
if args["--use-local"]: if args["--use-local"]:
logging.debug( logging.debug("Getting package information ONLY from local installation.")
"Getting package information ONLY from local installation.")
imports = get_import_local(candidates, encoding=encoding) imports = get_import_local(candidates, encoding=encoding)
else: else:
logging.debug("Getting packages information from Local/PyPI") logging.debug("Getting packages information from Local/PyPI")
@ -473,20 +477,21 @@ def init(args):
# the list of exported modules, installed locally # the list of exported modules, installed locally
# and the package name is not in the list of local module names # and the package name is not in the list of local module names
# it add to difference # it add to difference
difference = [x for x in candidates if difference = [
# aggregate all export lists into one x
# flatten the list for x in candidates
# check if candidate is in exports if
x.lower() not in [y for x in local for y in x['exports']] # aggregate all export lists into one
and # flatten the list
# check if candidate is package names # check if candidate is in exports
x.lower() not in [x['name'] for x in local]] x.lower() not in [y for x in local for y in x["exports"]] and
# check if candidate is package names
x.lower() not in [x["name"] for x in local]
]
imports = local + get_imports_info(difference, imports = local + get_imports_info(difference, proxy=proxy, pypi_server=pypi_server)
proxy=proxy,
pypi_server=pypi_server)
# sort imports based on lowercase name of package, similar to `pip freeze`. # sort imports based on lowercase name of package, similar to `pip freeze`.
imports = sorted(imports, key=lambda x: x['name'].lower()) imports = sorted(imports, key=lambda x: x["name"].lower())
if args["--diff"]: if args["--diff"]:
diff(args["--diff"], imports) diff(args["--diff"], imports)
@ -501,8 +506,9 @@ def init(args):
if scheme in ["compat", "gt", "no-pin"]: if scheme in ["compat", "gt", "no-pin"]:
imports, symbol = dynamic_versioning(scheme, imports) imports, symbol = dynamic_versioning(scheme, imports)
else: else:
raise ValueError("Invalid argument for mode flag, " raise ValueError(
"use 'compat', 'gt' or 'no-pin' instead") "Invalid argument for mode flag, " "use 'compat', 'gt' or 'no-pin' instead"
)
else: else:
symbol = "==" symbol = "=="
@ -516,8 +522,8 @@ def init(args):
def main(): # pragma: no cover def main(): # pragma: no cover
args = docopt(__doc__, version=__version__) args = docopt(__doc__, version=__version__)
log_level = logging.DEBUG if args['--debug'] else logging.INFO log_level = logging.DEBUG if args["--debug"] else logging.INFO
logging.basicConfig(level=log_level, format='%(levelname)s: %(message)s') logging.basicConfig(level=log_level, format="%(levelname)s: %(message)s")
try: try:
init(args) init(args)
@ -525,5 +531,5 @@ def main(): # pragma: no cover
sys.exit(0) sys.exit(0)
if __name__ == '__main__': if __name__ == "__main__":
main() # pragma: no cover main() # pragma: no cover

View File

@ -8,11 +8,12 @@ test_pipreqs
Tests for `pipreqs` module. Tests for `pipreqs` module.
""" """
import io from io import StringIO
import sys from unittest.mock import patch
import unittest import unittest
import os import os
import requests import requests
import sys
from pipreqs import pipreqs from pipreqs import pipreqs
@ -79,6 +80,7 @@ class TestPipreqs(unittest.TestCase):
self.requirements_path = os.path.join(self.project, "requirements.txt") self.requirements_path = os.path.join(self.project, "requirements.txt")
self.alt_requirement_path = os.path.join(self.project, "requirements2.txt") self.alt_requirement_path = os.path.join(self.project, "requirements2.txt")
self.non_existing_filepath = "xpto"
def test_get_all_imports(self): def test_get_all_imports(self):
imports = pipreqs.get_all_imports(self.project) imports = pipreqs.get_all_imports(self.project)
@ -478,7 +480,7 @@ class TestPipreqs(unittest.TestCase):
It should print to stdout the same content as requeriments.txt It should print to stdout the same content as requeriments.txt
""" """
capturedOutput = io.StringIO() capturedOutput = StringIO()
sys.stdout = capturedOutput sys.stdout = capturedOutput
pipreqs.init( pipreqs.init(
@ -540,6 +542,23 @@ class TestPipreqs(unittest.TestCase):
self.assertListEqual(parsed_requirements, expected_parsed_requirements) self.assertListEqual(parsed_requirements, expected_parsed_requirements)
@patch("sys.exit")
def test_parse_requirements_handles_file_not_found(self, exit_mock):
captured_output = StringIO()
sys.stdout = captured_output
# This assertion is needed, because since "sys.exit" is mocked, the program won't end,
# and the code that is after the except block will be run
with self.assertRaises(UnboundLocalError):
pipreqs.parse_requirements(self.non_existing_filepath)
exit_mock.assert_called_once_with(1)
printed_text = captured_output.getvalue().strip()
sys.stdout = sys.__stdout__
self.assertEqual(printed_text, "File xpto was not found. Please, fix it and run again.")
def tearDown(self): def tearDown(self):
""" """
Remove requiremnts.txt files that were written Remove requiremnts.txt files that were written