1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
"""Procedures that operate on files/ directories"""
import errno
import hashlib
import json
import os
import random
import string
import tarfile
from functools import partial
from typing import Dict
from typing import List
from typing import ValuesView
from werkzeug.utils import secure_filename
from flask import current_app
def get_tmpdir() -> str:
"""Get the temp directory from the FLASK tmpdir setting. If it is
not set, set it to /tmp. Note that the app should check for
environment settings to initialize its TMPDIR.
"""
tmpdir = current_app.config.get("TMPDIR")
if not tmpdir:
tmpdir = "/tmp"
if not os.path.isdir(tmpdir):
raise FileNotFoundError(
errno.ENOENT,
os.strerror(errno.ENOENT),
f"TMPDIR {tmpdir} is not a valid directory"
)
return tmpdir
def assert_path_exists(path: str, throw_error: bool = True) -> bool:
"""Throw error if any of them do not exist."""
if not os.path.isfile(path):
if throw_error:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path)
return False
return True
def assert_paths_exist(paths: ValuesView, throw_error: bool = True) -> bool:
"""Given a list of PATHS, throw error if any of them do not exist."""
for path in paths:
assert_path_exists(path,throw_error)
return True
def get_hash_of_files(files: List[str]) -> str:
"""Given a list of valid of FILES, return their hash as a string"""
md5hash = hashlib.md5()
for file_path in sorted(files):
if not os.path.exists(file_path):
raise FileNotFoundError
with open(file_path, "rb") as file_:
for buf in iter(partial(file_.read, 4096), b''):
md5hash.update(bytearray(
hashlib.md5(buf).hexdigest(), "utf-8"))
return md5hash.hexdigest()
def get_dir_hash(directory: str) -> str:
"""Return the hash of a DIRECTORY"""
if not os.path.exists(directory):
raise FileNotFoundError
all_files = [
os.path.join(root, names) for root, _, files in os.walk(directory)
for names in sorted(files)
]
return get_hash_of_files(all_files)
def jsonfile_to_dict(json_file: str) -> Dict:
"""Give a JSON_FILE, return a python dict"""
with open(json_file, encoding="utf-8") as _file:
data = json.load(_file)
return data
raise FileNotFoundError
def generate_random_n_string(n_length: int) -> str:
"""Generate a random string that is N chars long"""
return ''.join(
random.choice(string.ascii_uppercase + string.digits)
for _ in range(n_length))
def extract_uploaded_file(gzipped_file,
target_dir: str,
token: str = "") -> Dict:
"""Get the (directory) hash of extracted contents of GZIPPED_FILE; and move
contents to TARGET_DIR/<dir-hash>.
"""
if not token:
token = (f"{generate_random_n_string(6)}-"
f"{generate_random_n_string(6)}")
tar_target_loc = os.path.join(target_dir, token,
secure_filename(gzipped_file.filename))
try:
if not os.path.exists(os.path.join(target_dir, token)):
os.mkdir(os.path.join(target_dir, token))
gzipped_file.save(tar_target_loc)
# Extract to "tar_target_loc/token"
with tarfile.open(tar_target_loc) as tar:
tar.extractall(path=os.path.join(target_dir, token))
# pylint: disable=W0703
except Exception:
return {"status": 128, "error": "gzip failed to unpack file"}
return {"status": 0, "token": token}
# pylint: disable=unused-argument
def cache_ipfs_file(ipfs_file: str,
cache_dir: str,
ipfs_addr: str = "/ip4/127.0.0.1/tcp/5001") -> str:
"""Check if a file exists in cache; if it doesn't, cache it. Return the
cached file location
"""
# IPFS httpclient doesn't work in Python3
return ""
|