Source code for eyed3.utils

# -*- coding: utf-8 -*-
################################################################################
#  Copyright (C) 2002-2015  Travis Shirk <travis@pobox.com>
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, see <http://www.gnu.org/licenses/>.
#
################################################################################
from __future__ import print_function
import os
import re
import math
import pathlib
import logging
import argparse
import warnings
import magic
import functools

from ..compat import unicode, PY2
from ..utils.log import getLogger
from .. import LOCAL_ENCODING, LOCAL_FS_ENCODING

if hasattr(os, "fwalk"):
    os_walk = functools.partial(os.fwalk, follow_symlinks=True)

    def os_walk_unpack(w):
        return w[0:3]

else:
    os_walk = functools.partial(os.walk, followlinks=True)

[docs] def os_walk_unpack(w): return w
log = getLogger(__name__) ID3_MIME_TYPE = "application/x-id3" ID3_MIME_TYPE_EXTENSIONS = (".id3", ".tag")
[docs]class MagicTypes(magic.Magic): def __init__(self): magic.Magic.__init__(self, mime=True, mime_encoding=False, keep_going=True)
[docs] def guess_type(self, filename, all_types=False): if os.path.splitext(filename)[1] in ID3_MIME_TYPE_EXTENSIONS: return ID3_MIME_TYPE if not all_types else [ID3_MIME_TYPE] try: types = self.from_file(filename) except UnicodeEncodeError: # https://github.com/ahupp/python-magic/pull/144 types = self.from_file(filename.encode("utf-8", 'surrogateescape')) delim = r"\012- " if all_types: return types.split(delim) else: return types.split(delim)[0]
_mime_types = MagicTypes()
[docs]def guessMimetype(filename, with_encoding=False, all_types=False): """Return the mime-type for ``filename`` (or list of possible types when `all_types` is True). If ``with_encoding`` is True the encoding is included and a 2-tuple is returned, (mine, enc). """ filename = str(filename) if isinstance(filename, pathlib.Path) else filename mime = _mime_types.guess_type(filename, all_types=all_types) if not with_encoding: return mime else: warnings.warn("File character encoding no longer returned, value is None", UserWarning, stacklevel=2) return mime, None
[docs]def walk(handler, path, excludes=None, fs_encoding=LOCAL_FS_ENCODING): """A wrapper around os.walk which handles exclusion patterns and multiple path types (unicode, pathlib.Path, bytes). """ if isinstance(path, pathlib.Path): path = str(path) else: path = unicode(path, fs_encoding) if type(path) is not unicode else path excludes = excludes if excludes else [] excludes_re = [] for e in excludes: excludes_re.append(re.compile(e)) def _isExcluded(_p): for ex in excludes_re: match = ex.match(_p) if match: return True return False if not os.path.exists(path): raise IOError("file not found: %s" % path) elif os.path.isfile(path) and not _isExcluded(path): # If not given a directory, invoke the handler and return handler.handleFile(os.path.abspath(path)) return for root, dirs, files in [os_walk_unpack(w) for w in os_walk(path)]: root = root if type(root) is unicode else unicode(root, fs_encoding) dirs.sort() files.sort() for f in files: f = f if type(f) is unicode else unicode(f, fs_encoding) f = os.path.abspath(os.path.join(root, f)) if not _isExcluded(f): try: handler.handleFile(f) except StopIteration: return if files: handler.handleDirectory(root, files)
[docs]class FileHandler(object): """A handler interface for :func:`eyed3.utils.walk` callbacks."""
[docs] def handleFile(self, f): """Called for each file walked. The file ``f`` is the full path and the return value is ignored. If the walk should abort the method should raise a ``StopIteration`` exception.""" pass
[docs] def handleDirectory(self, d, files): """Called for each directory ``d`` **after** ``handleFile`` has been called for each file in ``files``. ``StopIteration`` may be raised to halt iteration.""" pass
[docs] def handleDone(self): """Called when there are no more files to handle.""" pass
def _requireArgType(arg_type, *args): arg_indices = [] kwarg_names = [] for a in args: if type(a) is int: arg_indices.append(a) else: kwarg_names.append(a) assert(arg_indices or kwarg_names) def wrapper(fn): def wrapped_fn(*args, **kwargs): for i in arg_indices: if i >= len(args): # The ith argument is not there, as in optional arguments break if args[i] is not None and not isinstance(args[i], arg_type): raise TypeError("%s(argument %d) must be %s" % (fn.__name__, i, str(arg_type))) for name in kwarg_names: if (name in kwargs and kwargs[name] is not None and not isinstance(kwargs[name], arg_type)): raise TypeError("%s(argument %s) must be %s" % (fn.__name__, name, str(arg_type))) return fn(*args, **kwargs) return wrapped_fn return wrapper
[docs]def requireUnicode(*args): """Function decorator to enforce unicode argument types. ``None`` is a valid argument value, in all cases, regardless of not being unicode. ``*args`` Positional arguments may be numeric argument index values (requireUnicode(1, 3) - requires argument 1 and 3 are unicode) or keyword argument names (requireUnicode("title")) or a combination thereof. """ return _requireArgType(unicode, *args)
[docs]def requireBytes(*args): """Function decorator to enforce unicode argument types. ``None`` is a valid argument value, in all cases, regardless of not being unicode. ``*args`` Positional arguments may be numeric argument index values (requireUnicode(1, 3) - requires argument 1 and 3 are unicode) or keyword argument names (requireUnicode("title")) or a combination thereof. """ return _requireArgType(bytes, *args)
[docs]def encodeUnicode(replace=True): warnings.warn("use compat PY2 and be more python3", DeprecationWarning, stacklevel=2) enc_err = "replace" if replace else "strict" if PY2: def wrapper(fn): def wrapped_fn(*args, **kwargs): new_args = [] for a in args: if type(a) is unicode: new_args.append(a.encode(LOCAL_ENCODING, enc_err)) else: new_args.append(a) args = tuple(new_args) for kw in kwargs: if type(kwargs[kw]) is unicode: kwargs[kw] = kwargs[kw].encode(LOCAL_ENCODING, enc_err) return fn(*args, **kwargs) return wrapped_fn return wrapper else: # This decorator is used to encode unicode to bytes for sys.std* # write calls. In python3 unicode (or str) is required by these # functions, the encodig happens internally.. So return a noop def noop(fn): def call(*args, **kwargs): return fn(*args, **kwargs) return noop
[docs]def formatTime(seconds, total=None, short=False): """ Format ``seconds`` (number of seconds) as a string representation. When ``short`` is False (the default) the format is: HH:MM:SS. Otherwise, the format is exacly 6 characters long and of the form: 1w 3d 2d 4h 1h 5m 1m 4s 15s If ``total`` is not None it will also be formatted and appended to the result seperated by ' / '. """ seconds = round(seconds) def time_tuple(ts): if ts is None or ts < 0: ts = 0 hours = ts / 3600 mins = (ts % 3600) / 60 secs = (ts % 3600) % 60 tstr = '%02d:%02d' % (mins, secs) if int(hours): tstr = '%02d:%s' % (hours, tstr) return (int(hours), int(mins), int(secs), tstr) if not short: hours, mins, secs, curr_str = time_tuple(seconds) retval = curr_str if total: hours, mins, secs, total_str = time_tuple(total) retval += ' / %s' % total_str return retval else: units = [ (u'y', 60 * 60 * 24 * 7 * 52), (u'w', 60 * 60 * 24 * 7), (u'd', 60 * 60 * 24), (u'h', 60 * 60), (u'm', 60), (u's', 1), ] seconds = int(seconds) if seconds < 60: return u' {0:02d}s'.format(seconds) for i in range(len(units) - 1): unit1, limit1 = units[i] unit2, limit2 = units[i + 1] if seconds >= limit1: return u'{0:02d}{1}{2:02d}{3}'.format( seconds // limit1, unit1, (seconds % limit1) // limit2, unit2) return u' ~inf'
KB_BYTES = 1024 """Number of bytes per KB (2^10)""" MB_BYTES = 1048576 """Number of bytes per MB (2^20)""" GB_BYTES = 1073741824 """Number of bytes per GB (2^30)""" KB_UNIT = "KB" """Kilobytes abbreviation""" MB_UNIT = "MB" """Megabytes abbreviation""" GB_UNIT = "GB" """Gigabytes abbreviation"""
[docs]def formatSize(size, short=False): """Format ``size`` (nuber of bytes) into string format doing KB, MB, or GB conversion where necessary. When ``short`` is False (the default) the format is smallest unit of bytes and largest gigabytes; '234 GB'. The short version is 2-4 characters long and of the form 256b 64k 1.1G """ if not short: unit = "Bytes" if size >= GB_BYTES: size = float(size) / float(GB_BYTES) unit = GB_UNIT elif size >= MB_BYTES: size = float(size) / float(MB_BYTES) unit = MB_UNIT elif size >= KB_BYTES: size = float(size) / float(KB_BYTES) unit = KB_UNIT return "%.2f %s" % (size, unit) else: suffixes = u' kMGTPEH' if size == 0: num_scale = 0 else: num_scale = int(math.floor(math.log(size) / math.log(1000))) if num_scale > 7: suffix = '?' else: suffix = suffixes[num_scale] num_scale = int(math.pow(1000, num_scale)) value = size / num_scale str_value = str(value) if len(str_value) >= 3 and str_value[2] == '.': str_value = str_value[:2] else: str_value = str_value[:3] return "{0:>3s}{1}".format(str_value, suffix)
[docs]def formatTimeDelta(td): """Format a timedelta object ``td`` into a string. """ days = td.days hours = td.seconds / 3600 mins = (td.seconds % 3600) / 60 secs = (td.seconds % 3600) % 60 tstr = "%02d:%02d:%02d" % (hours, mins, secs) if days: tstr = "%d days %s" % (days, tstr) return tstr
[docs]def chunkCopy(src_fp, dest_fp, chunk_sz=(1024 * 512)): """Copy ``src_fp`` to ``dest_fp`` in ``chunk_sz`` byte increments.""" done = False while not done: data = src_fp.read(chunk_sz) if data: dest_fp.write(data) else: done = True del data
[docs]class ArgumentParser(argparse.ArgumentParser): """Subclass of argparse.ArgumentParser that adds version and log level options.""" def __init__(self, *args, **kwargs): from eyed3 import version as VERSION from eyed3.utils.log import LEVELS from eyed3.utils.log import MAIN_LOGGER def pop_kwarg(name, default): if name in kwargs: value = kwargs.pop(name) or default else: value = default return value main_logger = pop_kwarg("main_logger", MAIN_LOGGER) version = pop_kwarg("version", VERSION) self.log_levels = [logging.getLevelName(l).lower() for l in LEVELS] formatter = argparse.RawDescriptionHelpFormatter super(ArgumentParser, self).__init__(*args, formatter_class=formatter, **kwargs) self.add_argument("--version", action="version", version=version, help="Display version information and exit") debug_group = self.add_argument_group("Debugging") debug_group.add_argument( "-l", "--log-level", metavar="LEVEL[:LOGGER]", action=LoggingAction, main_logger=main_logger, help="Set a log level. This option may be specified multiple " "times. If a logger name is specified than the level " "applies only to that logger, otherwise the level is set " "on the top-level logger. Acceptable levels are %s. " % (", ".join("'%s'" % l for l in self.log_levels))) debug_group.add_argument("--profile", action="store_true", default=False, dest="debug_profile", help="Run using python profiler.") debug_group.add_argument("--pdb", action="store_true", dest="debug_pdb", help="Drop into 'pdb' when errors occur.")
[docs]class LoggingAction(argparse._AppendAction): def __init__(self, *args, **kwargs): self.main_logger = kwargs.pop("main_logger") super(LoggingAction, self).__init__(*args, **kwargs) def __call__(self, parser, namespace, values, option_string=None): values = values.split(':') level, logger = values if len(values) > 1 else (values[0], self.main_logger) logger = logging.getLogger(logger) try: logger.setLevel(logging._nameToLevel[level.upper()]) except KeyError: msg = "invalid level choice: %s (choose from %s)" % \ (level, parser.log_levels) raise argparse.ArgumentError(self, msg) super(LoggingAction, self).__call__(parser, namespace, values, option_string)
[docs]def datePicker(thing, prefer_recording_date=False): """This function returns a date of some sort, amongst all the possible dates (members called release_date, original_release_date, and recording_date of type eyed3.core.Date). The order of preference is: 1) date of original release 2) date of this versions release 3) the recording date. Unless ``prefer_recording_date`` is ``True`` in which case the order is 3, 1, 2. ``None`` will be returned if no dates are available.""" if not prefer_recording_date: return (thing.original_release_date or thing.release_date or thing.recording_date) else: return (thing.recording_date or thing.original_release_date or thing.release_date)
[docs]def makeUniqueFileName(file_path, uniq=u''): """The ``file_path`` is the desired file name, and it is returned if the file does not exist. In the case that it already exists the path is adjusted to be unique. First, the ``uniq`` string is added, and then a couter is used to find a unique name.""" path = os.path.dirname(file_path) file = os.path.basename(file_path) name, ext = os.path.splitext(file) count = 1 while os.path.exists(os.path.join(path, file)): if uniq: name = "%s_%s" % (name, uniq) file = "".join([name, ext]) uniq = u'' else: file = "".join(["%s_%s" % (name, count), ext]) count += 1 return os.path.join(path, file)