aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gn3/file_utils.py28
1 files changed, 15 insertions, 13 deletions
diff --git a/gn3/file_utils.py b/gn3/file_utils.py
index 06a1b4c..c1c9d53 100644
--- a/gn3/file_utils.py
+++ b/gn3/file_utils.py
@@ -5,10 +5,10 @@ import os
import random
import string
import tarfile
-
from functools import partial
from typing import Dict
from typing import List
+
from werkzeug.utils import secure_filename
@@ -20,8 +20,8 @@ def get_hash_of_files(files: List[str]) -> str:
raise FileNotFoundError
with open(file_path, "rb") as file_:
for buf in iter(partial(file_.read, 4096), b''):
- md5hash.update(bytearray(hashlib.md5(buf).hexdigest(),
- "utf-8"))
+ md5hash.update(bytearray(
+ hashlib.md5(buf).hexdigest(), "utf-8"))
return md5hash.hexdigest()
@@ -29,9 +29,10 @@ def get_dir_hash(directory: str) -> str:
"""Return the hash of a DIRECTORY"""
if not os.path.exists(directory):
raise FileNotFoundError
- all_files = [os.path.join(root, names)
- for root, _, files in os.walk(directory)
- for names in sorted(files)]
+ all_files = [
+ os.path.join(root, names) for root, _, files in os.walk(directory)
+ for names in sorted(files)
+ ]
return get_hash_of_files(all_files)
@@ -45,11 +46,14 @@ def jsonfile_to_dict(json_file: str) -> Dict:
def generate_random_n_string(n_length: int) -> str:
"""Generate a random string that is N chars long"""
- return ''.join(random.choice(string.ascii_uppercase + string.digits)
- for _ in range(n_length))
+ return ''.join(
+ random.choice(string.ascii_uppercase + string.digits)
+ for _ in range(n_length))
-def extract_uploaded_file(gzipped_file, target_dir: str, token="") -> Dict:
+def extract_uploaded_file(gzipped_file,
+ target_dir: str,
+ token: str = "") -> Dict:
"""Get the (directory) hash of extracted contents of GZIPPED_FILE; and move
contents to TARGET_DIR/<dir-hash>.
@@ -57,10 +61,8 @@ contents to TARGET_DIR/<dir-hash>.
if not token:
token = (f"{generate_random_n_string(6)}-"
f"{generate_random_n_string(6)}")
- tar_target_loc = os.path.join(
- target_dir,
- token,
- secure_filename(gzipped_file.filename))
+ tar_target_loc = os.path.join(target_dir, token,
+ secure_filename(gzipped_file.filename))
try:
if not os.path.exists(os.path.join(target_dir, token)):
os.mkdir(os.path.join(target_dir, token))