refactor(pipreqs): Refactor core code, remove unnecessary prints, rename vars

This commit is contained in:
Vadim Kravcenko 2015-05-10 10:58:05 +02:00
parent dc512ae423
commit 3fc82c5743
3 changed files with 60 additions and 57 deletions

View File

@ -28,46 +28,44 @@ REGEXP = [
] ]
def get_all_imports(start_path): def get_all_imports(path):
imports = [] imports = []
packages = [] candidates = []
logging.debug('Traversing tree, start: {0}'.format(start_path))
for root, dirs, files in os.walk(start_path): for root, dirs, files in os.walk(path):
packages.append(os.path.basename(root)) candidates.append(os.path.basename(root))
files = [fn for fn in files if os.path.splitext(fn)[1] == ".py"] files = [fn for fn in files if os.path.splitext(fn)[1] == ".py"]
packages += [os.path.splitext(fn)[0] for fn in files] candidates += [os.path.splitext(fn)[0] for fn in files]
for file_name in files: for file_name in files:
with open(os.path.join(root, file_name), "r") as file_object: with open(os.path.join(root, file_name), "r") as f:
lines = filter( lines = filter(filter_line, map(lambda l: l.strip(), f))
lambda l: len(l) > 0, map(lambda l: l.strip(), file_object))
for line in lines: for line in lines:
if line[0] == "#":
continue
if "(" in line: if "(" in line:
break break
for rex in REGEXP: for rex in REGEXP:
s = rex.match(line) s = rex.findall(line)
if not s: for item in s:
continue res = map(get_name_without_alias, item.split(","))
for item in s.groups(): imports = [x for x in imports + res if len(x) > 0]
if "," in item:
for match in item.split(","): packages = set(imports) - set(set(candidates) & set(imports))
imports.append(get_import_name_without_alias(match)) logging.debug('Found packages: {0}'.format(packages))
else:
imports.append(get_import_name_without_alias(item)) with open(join("stdlib"), "r") as f:
third_party_packages = set(imports) - set(set(packages) & set(imports))
logging.debug(
'Found third-party packages: {0}'.format(third_party_packages))
with open(os.path.join(os.path.dirname(__file__), "stdlib"), "r") as f:
data = [x.strip() for x in f.readlines()] data = [x.strip() for x in f.readlines()]
return sorted(list(set(third_party_packages) - set(data))) return sorted(list(set(packages) - set(data)))
def filter_line(l):
return len(l) > 0 and l[0] != "#"
def generate_requirements_file(path, imports): def generate_requirements_file(path, imports):
with open(path, "w") as out_file: with open(path, "w") as out_file:
logging.debug('Writing {num} requirements to {file}'.format( logging.debug('Writing {num} requirements: {imports} to {file}'.format(
num=len(imports), num=len(imports),
file=path file=path,
imports=", ".join([x['name'] for x in imports])
)) ))
fmt = '{name} == {version}' fmt = '{name} == {version}'
out_file.write('\n'.join(fmt.format(**item) out_file.write('\n'.join(fmt.format(**item)
@ -80,17 +78,16 @@ def get_imports_info(imports):
try: try:
data = yarg.get(item) data = yarg.get(item)
except HTTPError: except HTTPError:
logging.debug('Package does not exist or network problems') logging.debug(
'Package %s does not exist or network problems', item)
continue continue
last_release = data.latest_release_id result.append({'name': item, 'version': data.latest_release_id})
result.append({'name': item, 'version': last_release})
return result return result
def get_locally_installed_packages(): def get_locally_installed_packages():
path = get_python_lib()
packages = {} packages = {}
for root, dirs, files in os.walk(path): for root, dirs, files in os.walk(get_python_lib()):
for item in files: for item in files:
if "top_level" in item: if "top_level" in item:
with open(os.path.join(root, item), "r") as f: with open(os.path.join(root, item), "r") as f:
@ -114,9 +111,9 @@ def get_import_local(imports):
return result return result
def get_pkg_names_from_import_names(pkgs): def get_pkg_names(pkgs):
result = [] result = []
with open(os.path.join(os.path.dirname(__file__), "mapping"), "r") as f: with open(join("mapping"), "r") as f:
data = [x.strip().split(":") for x in f.readlines()] data = [x.strip().split(":") for x in f.readlines()]
for pkg in pkgs: for pkg in pkgs:
toappend = pkg toappend = pkg
@ -128,33 +125,36 @@ def get_pkg_names_from_import_names(pkgs):
return result return result
def get_import_name_without_alias(import_name): def get_name_without_alias(name):
return import_name.partition(' as ')[0].partition('.')[0].strip() if "import" in name:
name = REGEXP[0].match(name.strip()).groups(0)[0]
return name.partition(' as ')[0].partition('.')[0].strip()
def join(f):
return os.path.join(os.path.dirname(__file__), f)
def init(args): def init(args):
print("Looking for imports") candidates = get_all_imports(args['<path>'])
imports = get_all_imports(args['<path>']) candidates = get_pkg_names(get_all_imports(args['<path>']))
imports = get_pkg_names_from_import_names(imports) logging.debug("Found imports: " + ", ".join(candidates))
print("Found third-party imports: " + ", ".join(imports))
if args['--use-local']: if args['--use-local']:
print( logging.debug(
"Getting package version information ONLY from local installation.") "Getting package information ONLY from local installation.")
imports_with_info = get_import_local(imports) imports = get_import_local(candidates)
else: else:
print( logging.debug("Getting packages information from Local/PyPI")
"Getting latest version information about packages from Local/PyPI") local = get_import_local(candidates)
imports_local = get_import_local(imports) # Get packages that were not found locally
difference = [x for x in imports if x not in [z['name'] difference = [x for x in candidates if x not in [z['name']
for z in imports_local]] for z in local]]
imports_pypi = get_imports_info(difference) imports = local + get_imports_info(difference)
imports_with_info = imports_local + imports_pypi
print("Imports written to requirements file:", ", ".join(
[x['name'] for x in imports_with_info]))
path = args[ path = args[
"--savepath"] if args["--savepath"] else os.path.join(args['<path>'], "requirements.txt") "--savepath"] if args["--savepath"] else os.path.join(args['<path>'], "requirements.txt")
generate_requirements_file(path, imports_with_info) generate_requirements_file(path, imports)
print("Successfully saved requirements file in " + path) print("Successfully saved requirements file in " + path)

View File

@ -11,6 +11,7 @@ import signal
import bs4 import bs4
import requests import requests
import nonexistendmodule import nonexistendmodule
import boto as b, import peewee as p,
# import django # import django
import flask.ext.somext import flask.ext.somext
from sqlalchemy import model from sqlalchemy import model
@ -23,3 +24,5 @@ import models
def main(): def main():
pass pass
import after_method_should_be_ignored

View File

@ -18,7 +18,7 @@ class TestPipreqs(unittest.TestCase):
def setUp(self): def setUp(self):
self.modules = ['flask', 'requests', 'sqlalchemy', self.modules = ['flask', 'requests', 'sqlalchemy',
'docopt', 'ujson', 'nonexistendmodule', 'bs4'] 'docopt', 'boto', 'peewee', 'ujson', 'nonexistendmodule', 'bs4',]
self.modules2 = ['beautifulsoup4'] self.modules2 = ['beautifulsoup4']
self.project = os.path.join(os.path.dirname(__file__), "_data") self.project = os.path.join(os.path.dirname(__file__), "_data")
self.requirements_path = os.path.join(self.project, "requirements.txt") self.requirements_path = os.path.join(self.project, "requirements.txt")
@ -27,7 +27,7 @@ class TestPipreqs(unittest.TestCase):
def test_get_all_imports(self): def test_get_all_imports(self):
imports = pipreqs.get_all_imports(self.project) imports = pipreqs.get_all_imports(self.project)
self.assertEqual(len(imports), 7, "Incorrect Imports array length") self.assertEqual(len(imports), 9)
for item in imports: for item in imports:
self.assertTrue( self.assertTrue(
item.lower() in self.modules, "Import is missing: " + item) item.lower() in self.modules, "Import is missing: " + item)
@ -43,7 +43,7 @@ class TestPipreqs(unittest.TestCase):
with_info = pipreqs.get_imports_info(imports) with_info = pipreqs.get_imports_info(imports)
# Should contain only 5 Elements without the "nonexistendmodule" # Should contain only 5 Elements without the "nonexistendmodule"
self.assertEqual( self.assertEqual(
len(with_info), 5, "Length of imports array with info is wrong") len(with_info), 7)
for item in with_info: for item in with_info:
self.assertTrue(item['name'].lower( self.assertTrue(item['name'].lower(
) in self.modules, "Import item appears to be missing " + item['name']) ) in self.modules, "Import item appears to be missing " + item['name'])
@ -84,7 +84,7 @@ class TestPipreqs(unittest.TestCase):
def test_get_import_name_without_alias(self): def test_get_import_name_without_alias(self):
import_name_with_alias = "requests as R" import_name_with_alias = "requests as R"
expected_import_name_without_alias = "requests" expected_import_name_without_alias = "requests"
import_name_without_aliases = pipreqs.get_import_name_without_alias(import_name_with_alias) import_name_without_aliases = pipreqs.get_name_without_alias(import_name_with_alias)
self.assertEqual(import_name_without_aliases, expected_import_name_without_alias) self.assertEqual(import_name_without_aliases, expected_import_name_without_alias)
def tearDown(self): def tearDown(self):