# -*- coding: utf-8 -*-
################################################################################
# Copyright (C) 2002-2015 Travis Shirk <travis@pobox.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
#
################################################################################
from __future__ import print_function
import os
import re
import math
import pathlib
import logging
import argparse
import warnings
import magic
import functools
from ..compat import unicode, PY2
from ..utils.log import getLogger
from .. import LOCAL_ENCODING, LOCAL_FS_ENCODING
if hasattr(os, "fwalk"):
os_walk = functools.partial(os.fwalk, follow_symlinks=True)
def os_walk_unpack(w):
return w[0:3]
else:
os_walk = functools.partial(os.walk, followlinks=True)
[docs] def os_walk_unpack(w):
return w
log = getLogger(__name__)
ID3_MIME_TYPE = "application/x-id3"
ID3_MIME_TYPE_EXTENSIONS = (".id3", ".tag")
[docs]class MagicTypes(magic.Magic):
def __init__(self):
magic.Magic.__init__(self, mime=True, mime_encoding=False, keep_going=True)
[docs] def guess_type(self, filename, all_types=False):
if os.path.splitext(filename)[1] in ID3_MIME_TYPE_EXTENSIONS:
return ID3_MIME_TYPE if not all_types else [ID3_MIME_TYPE]
try:
types = self.from_file(filename)
except UnicodeEncodeError:
# https://github.com/ahupp/python-magic/pull/144
types = self.from_file(filename.encode("utf-8", 'surrogateescape'))
delim = r"\012- "
if all_types:
return types.split(delim)
else:
return types.split(delim)[0]
_mime_types = MagicTypes()
[docs]def guessMimetype(filename, with_encoding=False, all_types=False):
"""Return the mime-type for ``filename`` (or list of possible types when `all_types` is True).
If ``with_encoding`` is True the encoding is included and a 2-tuple is returned, (mine, enc).
"""
filename = str(filename) if isinstance(filename, pathlib.Path) else filename
mime = _mime_types.guess_type(filename, all_types=all_types)
if not with_encoding:
return mime
else:
warnings.warn("File character encoding no longer returned, value is None",
UserWarning, stacklevel=2)
return mime, None
[docs]def walk(handler, path, excludes=None, fs_encoding=LOCAL_FS_ENCODING):
"""A wrapper around os.walk which handles exclusion patterns and multiple
path types (unicode, pathlib.Path, bytes).
"""
if isinstance(path, pathlib.Path):
path = str(path)
else:
path = unicode(path, fs_encoding) if type(path) is not unicode else path
excludes = excludes if excludes else []
excludes_re = []
for e in excludes:
excludes_re.append(re.compile(e))
def _isExcluded(_p):
for ex in excludes_re:
match = ex.match(_p)
if match:
return True
return False
if not os.path.exists(path):
raise IOError("file not found: %s" % path)
elif os.path.isfile(path) and not _isExcluded(path):
# If not given a directory, invoke the handler and return
handler.handleFile(os.path.abspath(path))
return
for root, dirs, files in [os_walk_unpack(w) for w in os_walk(path)]:
root = root if type(root) is unicode else unicode(root, fs_encoding)
dirs.sort()
files.sort()
for f in files:
f = f if type(f) is unicode else unicode(f, fs_encoding)
f = os.path.abspath(os.path.join(root, f))
if not _isExcluded(f):
try:
handler.handleFile(f)
except StopIteration:
return
if files:
handler.handleDirectory(root, files)
[docs]class FileHandler(object):
"""A handler interface for :func:`eyed3.utils.walk` callbacks."""
[docs] def handleFile(self, f):
"""Called for each file walked. The file ``f`` is the full path and
the return value is ignored. If the walk should abort the method should
raise a ``StopIteration`` exception."""
pass
[docs] def handleDirectory(self, d, files):
"""Called for each directory ``d`` **after** ``handleFile`` has been
called for each file in ``files``. ``StopIteration`` may be raised to
halt iteration."""
pass
[docs] def handleDone(self):
"""Called when there are no more files to handle."""
pass
def _requireArgType(arg_type, *args):
arg_indices = []
kwarg_names = []
for a in args:
if type(a) is int:
arg_indices.append(a)
else:
kwarg_names.append(a)
assert(arg_indices or kwarg_names)
def wrapper(fn):
def wrapped_fn(*args, **kwargs):
for i in arg_indices:
if i >= len(args):
# The ith argument is not there, as in optional arguments
break
if args[i] is not None and not isinstance(args[i], arg_type):
raise TypeError("%s(argument %d) must be %s" %
(fn.__name__, i, str(arg_type)))
for name in kwarg_names:
if (name in kwargs and kwargs[name] is not None and
not isinstance(kwargs[name], arg_type)):
raise TypeError("%s(argument %s) must be %s" %
(fn.__name__, name, str(arg_type)))
return fn(*args, **kwargs)
return wrapped_fn
return wrapper
[docs]def requireUnicode(*args):
"""Function decorator to enforce unicode argument types.
``None`` is a valid argument value, in all cases, regardless of not being
unicode. ``*args`` Positional arguments may be numeric argument index
values (requireUnicode(1, 3) - requires argument 1 and 3 are unicode)
or keyword argument names (requireUnicode("title")) or a combination
thereof.
"""
return _requireArgType(unicode, *args)
[docs]def requireBytes(*args):
"""Function decorator to enforce unicode argument types.
``None`` is a valid argument value, in all cases, regardless of not being
unicode. ``*args`` Positional arguments may be numeric argument index
values (requireUnicode(1, 3) - requires argument 1 and 3 are unicode)
or keyword argument names (requireUnicode("title")) or a combination
thereof.
"""
return _requireArgType(bytes, *args)
[docs]def encodeUnicode(replace=True):
warnings.warn("use compat PY2 and be more python3", DeprecationWarning,
stacklevel=2)
enc_err = "replace" if replace else "strict"
if PY2:
def wrapper(fn):
def wrapped_fn(*args, **kwargs):
new_args = []
for a in args:
if type(a) is unicode:
new_args.append(a.encode(LOCAL_ENCODING, enc_err))
else:
new_args.append(a)
args = tuple(new_args)
for kw in kwargs:
if type(kwargs[kw]) is unicode:
kwargs[kw] = kwargs[kw].encode(LOCAL_ENCODING, enc_err)
return fn(*args, **kwargs)
return wrapped_fn
return wrapper
else:
# This decorator is used to encode unicode to bytes for sys.std*
# write calls. In python3 unicode (or str) is required by these
# functions, the encodig happens internally.. So return a noop
def noop(fn):
def call(*args, **kwargs):
return fn(*args, **kwargs)
return noop
KB_BYTES = 1024
"""Number of bytes per KB (2^10)"""
MB_BYTES = 1048576
"""Number of bytes per MB (2^20)"""
GB_BYTES = 1073741824
"""Number of bytes per GB (2^30)"""
KB_UNIT = "KB"
"""Kilobytes abbreviation"""
MB_UNIT = "MB"
"""Megabytes abbreviation"""
GB_UNIT = "GB"
"""Gigabytes abbreviation"""
[docs]def chunkCopy(src_fp, dest_fp, chunk_sz=(1024 * 512)):
"""Copy ``src_fp`` to ``dest_fp`` in ``chunk_sz`` byte increments."""
done = False
while not done:
data = src_fp.read(chunk_sz)
if data:
dest_fp.write(data)
else:
done = True
del data
[docs]class ArgumentParser(argparse.ArgumentParser):
"""Subclass of argparse.ArgumentParser that adds version and log level
options."""
def __init__(self, *args, **kwargs):
from eyed3 import version as VERSION
from eyed3.utils.log import LEVELS
from eyed3.utils.log import MAIN_LOGGER
def pop_kwarg(name, default):
if name in kwargs:
value = kwargs.pop(name) or default
else:
value = default
return value
main_logger = pop_kwarg("main_logger", MAIN_LOGGER)
version = pop_kwarg("version", VERSION)
self.log_levels = [logging.getLevelName(l).lower() for l in LEVELS]
formatter = argparse.RawDescriptionHelpFormatter
super(ArgumentParser, self).__init__(*args, formatter_class=formatter,
**kwargs)
self.add_argument("--version", action="version", version=version,
help="Display version information and exit")
debug_group = self.add_argument_group("Debugging")
debug_group.add_argument(
"-l", "--log-level", metavar="LEVEL[:LOGGER]",
action=LoggingAction, main_logger=main_logger,
help="Set a log level. This option may be specified multiple "
"times. If a logger name is specified than the level "
"applies only to that logger, otherwise the level is set "
"on the top-level logger. Acceptable levels are %s. " %
(", ".join("'%s'" % l for l in self.log_levels)))
debug_group.add_argument("--profile", action="store_true",
default=False, dest="debug_profile",
help="Run using python profiler.")
debug_group.add_argument("--pdb", action="store_true", dest="debug_pdb",
help="Drop into 'pdb' when errors occur.")
[docs]class LoggingAction(argparse._AppendAction):
def __init__(self, *args, **kwargs):
self.main_logger = kwargs.pop("main_logger")
super(LoggingAction, self).__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
values = values.split(':')
level, logger = values if len(values) > 1 else (values[0],
self.main_logger)
logger = logging.getLogger(logger)
try:
logger.setLevel(logging._nameToLevel[level.upper()])
except KeyError:
msg = "invalid level choice: %s (choose from %s)" % \
(level, parser.log_levels)
raise argparse.ArgumentError(self, msg)
super(LoggingAction, self).__call__(parser, namespace, values,
option_string)
[docs]def datePicker(thing, prefer_recording_date=False):
"""This function returns a date of some sort, amongst all the possible
dates (members called release_date, original_release_date,
and recording_date of type eyed3.core.Date).
The order of preference is:
1) date of original release
2) date of this versions release
3) the recording date.
Unless ``prefer_recording_date`` is ``True`` in which case the order is
3, 1, 2.
``None`` will be returned if no dates are available."""
if not prefer_recording_date:
return (thing.original_release_date or
thing.release_date or
thing.recording_date)
else:
return (thing.recording_date or
thing.original_release_date or
thing.release_date)
[docs]def makeUniqueFileName(file_path, uniq=u''):
"""The ``file_path`` is the desired file name, and it is returned if the
file does not exist. In the case that it already exists the path is
adjusted to be unique. First, the ``uniq`` string is added, and then
a couter is used to find a unique name."""
path = os.path.dirname(file_path)
file = os.path.basename(file_path)
name, ext = os.path.splitext(file)
count = 1
while os.path.exists(os.path.join(path, file)):
if uniq:
name = "%s_%s" % (name, uniq)
file = "".join([name, ext])
uniq = u''
else:
file = "".join(["%s_%s" % (name, count), ext])
count += 1
return os.path.join(path, file)