import os
import re
import math
import pathlib
import logging
import argparse
import warnings
import functools
import deprecation
from ..utils.log import getLogger
from .. import LOCAL_FS_ENCODING
from ..__about__ import __version__
if hasattr(os, "fwalk"):
os_walk = functools.partial(os.fwalk, follow_symlinks=True)
def os_walk_unpack(w):
return w[0:3]
else:
os_walk = functools.partial(os.walk, followlinks=True)
[docs] def os_walk_unpack(w):
return w
log = getLogger(__name__)
[docs]@deprecation.deprecated(deprecated_in="0.9a2", removed_in="1.0", current_version=__version__,
details="Use eyed3.mimetype.guessMimetype() instead.")
def guessMimetype(filename, with_encoding=False):
from .. import mimetype
retval = mimetype.guessMimetype(filename)
if not with_encoding:
return retval
else:
warnings.warn("File character encoding no longer returned, value is None",
UserWarning, stacklevel=2)
return retval, None
[docs]def walk(handler, path, excludes=None, fs_encoding=LOCAL_FS_ENCODING, recursive=False):
"""A wrapper around os.walk which handles exclusion patterns and multiple
path types (str, pathlib.Path, bytes).
"""
if isinstance(path, pathlib.Path):
path = str(path)
else:
path = str(path, fs_encoding) if type(path) is not str else path
excludes = excludes if excludes else []
excludes_re = []
for e in excludes:
excludes_re.append(re.compile(e))
def _isExcluded(_p):
for ex in excludes_re:
match = ex.match(_p)
if match:
return True
return False
if not os.path.exists(path):
raise IOError("file not found: %s" % path)
elif os.path.isfile(path) and not _isExcluded(path):
# If not given a directory, invoke the handler and return
handler.handleFile(os.path.abspath(path))
return
for root, dirs, files in [os_walk_unpack(w) for w in os_walk(path)]:
root = root if type(root) is str else str(root, fs_encoding)
dirs.sort()
files.sort()
for f in files:
f = f if type(f) is str else str(f, fs_encoding)
f = os.path.abspath(os.path.join(root, f))
if not _isExcluded(f):
try:
handler.handleFile(f)
except StopIteration:
return
if files:
handler.handleDirectory(root, files)
if not recursive:
break
[docs]class FileHandler(object):
"""A handler interface for :func:`eyed3.utils.walk` callbacks."""
[docs] def handleFile(self, f):
"""Called for each file walked. The file ``f`` is the full path and
the return value is ignored. If the walk should abort the method should
raise a ``StopIteration`` exception."""
pass
[docs] def handleDirectory(self, d, files):
"""Called for each directory ``d`` **after** ``handleFile`` has been
called for each file in ``files``. ``StopIteration`` may be raised to
halt iteration."""
pass
[docs] def handleDone(self):
"""Called when there are no more files to handle."""
pass
def _requireArgType(arg_type, *args):
arg_indices = []
kwarg_names = []
for a in args:
if type(a) is int:
arg_indices.append(a)
else:
kwarg_names.append(a)
assert(arg_indices or kwarg_names)
def wrapper(fn):
def wrapped_fn(*args, **kwargs):
for i in arg_indices:
if i >= len(args):
# The ith argument is not there, as in optional arguments
break
if args[i] is not None and not isinstance(args[i], arg_type):
raise TypeError("%s(argument %d) must be %s" %
(fn.__name__, i, str(arg_type)))
for name in kwarg_names:
if (name in kwargs and kwargs[name] is not None and
not isinstance(kwargs[name], arg_type)):
raise TypeError("%s(argument %s) must be %s" %
(fn.__name__, name, str(arg_type)))
return fn(*args, **kwargs)
return wrapped_fn
return wrapper
[docs]def requireUnicode(*args):
"""Function decorator to enforce str/unicode argument types.
``None`` is a valid argument value, in all cases, regardless of not being
unicode. ``*args`` Positional arguments may be numeric argument index
values (requireUnicode(1, 3) - requires argument 1 and 3 are unicode)
or keyword argument names (requireUnicode("title")) or a combination
thereof.
"""
return _requireArgType(str, *args)
[docs]def requireBytes(*args):
"""Function decorator to enforce byte string argument types.
"""
return _requireArgType(bytes, *args)
# Number of bytes per KB (2^10)
KB_BYTES = 1024
# Number of bytes per MB (2^20)
MB_BYTES = 1048576
# Number of bytes per GB (2^30)
GB_BYTES = 1073741824
# Kilobytes abbreviation
KB_UNIT = "KB"
# Megabytes abbreviation
MB_UNIT = "MB"
# Gigabytes abbreviation
GB_UNIT = "GB"
[docs]def chunkCopy(src_fp, dest_fp, chunk_sz=(1024 * 512)):
"""Copy ``src_fp`` to ``dest_fp`` in ``chunk_sz`` byte increments."""
done = False
while not done:
data = src_fp.read(chunk_sz)
if data:
dest_fp.write(data)
else:
done = True
del data
[docs]class ArgumentParser(argparse.ArgumentParser):
"""Subclass of argparse.ArgumentParser that adds version and log level
options."""
def __init__(self, *args, **kwargs):
from eyed3 import version as VERSION
from eyed3.utils.log import LEVELS
from eyed3.utils.log import MAIN_LOGGER
def pop_kwarg(name, default):
if name in kwargs:
value = kwargs.pop(name) or default
else:
value = default
return value
main_logger = pop_kwarg("main_logger", MAIN_LOGGER)
version = pop_kwarg("version", VERSION)
self.log_levels = [logging.getLevelName(l).lower() for l in LEVELS]
formatter = argparse.RawDescriptionHelpFormatter
super(ArgumentParser, self).__init__(*args, formatter_class=formatter,
**kwargs)
self.add_argument("--version", action="version", version=version,
help="Display version information and exit")
debug_group = self.add_argument_group("Debugging")
debug_group.add_argument(
"-l", "--log-level", metavar="LEVEL[:LOGGER]",
action=LoggingAction, main_logger=main_logger,
help="Set a log level. This option may be specified multiple "
"times. If a logger name is specified than the level "
"applies only to that logger, otherwise the level is set "
"on the top-level logger. Acceptable levels are %s. " %
(", ".join("'%s'" % l for l in self.log_levels)))
debug_group.add_argument("--profile", action="store_true",
default=False, dest="debug_profile",
help="Run using python profiler.")
debug_group.add_argument("--pdb", action="store_true", dest="debug_pdb",
help="Drop into 'pdb' when errors occur.")
[docs]class LoggingAction(argparse._AppendAction):
def __init__(self, *args, **kwargs):
self.main_logger = kwargs.pop("main_logger")
super(LoggingAction, self).__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
values = values.split(':')
level, logger = values if len(values) > 1 else (values[0],
self.main_logger)
logger = logging.getLogger(logger)
try:
logger.setLevel(logging._nameToLevel[level.upper()])
except KeyError:
msg = f"invalid level choice: {level} (choose from {parser.log_levels})"
raise argparse.ArgumentError(self, msg)
super(LoggingAction, self).__call__(parser, namespace, values, option_string)
[docs]def datePicker(thing, prefer_recording_date=False):
"""This function returns a date of some sort, amongst all the possible
dates (members called release_date, original_release_date,
and recording_date of type eyed3.core.Date).
The order of preference is:
1) date of original release
2) date of this versions release
3) the recording date.
Unless ``prefer_recording_date`` is ``True`` in which case the order is
3, 1, 2.
``None`` will be returned if no dates are available."""
if not prefer_recording_date:
return (thing.original_release_date or
thing.release_date or
thing.recording_date)
else:
return (thing.recording_date or
thing.original_release_date or
thing.release_date)
[docs]def makeUniqueFileName(file_path, uniq=''):
"""The ``file_path`` is the desired file name, and it is returned if the
file does not exist. In the case that it already exists the path is
adjusted to be unique. First, the ``uniq`` string is added, and then
a couter is used to find a unique name."""
path = os.path.dirname(file_path)
file = os.path.basename(file_path)
name, ext = os.path.splitext(file)
count = 1
while os.path.exists(os.path.join(path, file)):
if uniq:
name = "%s_%s" % (name, uniq)
file = "".join([name, ext])
uniq = ''
else:
file = "".join(["%s_%s" % (name, count), ext])
count += 1
return os.path.join(path, file)
[docs]def b(x, encoder=None):
"""Converts `x` to a bytes string if not already.
:param x: The string.
:param encoder: Optional codec encoder to perform the conversion. The default is
`codecs.latin_1_encode`.
:return: The byte string if conversion was needed.
"""
if isinstance(x, bytes):
return x
else:
import codecs
encoder = encoder or codecs.latin_1_encode
return encoder(x)[0]