1 # Copyright (C) 2001-2010 Python Software Foundation
   2 # Author: Barry Warsaw
   3 # Contact: email-sig@python.org
   4 
   5 """Miscellaneous utilities."""
   6 
   7 __all__ = [
   8     'collapse_rfc2231_value',
   9     'decode_params',
  10     'decode_rfc2231',
  11     'encode_rfc2231',
  12     'formataddr',
  13     'formatdate',
  14     'format_datetime',
  15     'getaddresses',
  16     'make_msgid',
  17     'mktime_tz',
  18     'parseaddr',
  19     'parsedate',
  20     'parsedate_tz',
  21     'parsedate_to_datetime',
  22     'unquote',
  23     ]
  24 
  25 import os
  26 import re
  27 import time
  28 import base64
  29 import random
  30 import socket
  31 import datetime
  32 import urllib.parse
  33 import warnings
  34 from io import StringIO
  35 
  36 from email._parseaddr import quote
  37 from email._parseaddr import AddressList as _AddressList
  38 from email._parseaddr import mktime_tz
  39 
  40 from email._parseaddr import parsedate, parsedate_tz, _parsedate_tz
  41 
  42 from quopri import decodestring as _qdecode
  43 
  44 # Intrapackage imports
  45 from email.encoders import _bencode, _qencode
  46 from email.charset import Charset
  47 
  48 COMMASPACE = ', '
  49 EMPTYSTRING = ''
  50 UEMPTYSTRING = ''
  51 CRLF = '\r\n'
  52 TICK = "'"
  53 
  54 specialsre = re.compile(r'[][\\()<>@,:;".]')
  55 escapesre = re.compile(r'[\\"]')
  56 
  57 def _has_surrogates(s):
  58     """Return True if s contains surrogate-escaped binary data."""
  59     # This check is based on the fact that unless there are surrogates, utf8
  60     # (Python's default encoding) can encode any string.  This is the fastest
  61     # way to check for surrogates, see issue 11454 for timings.
  62     try:
  63         s.encode()
  64         return False
  65     except UnicodeEncodeError:
  66         return True
  67 
  68 # How to deal with a string containing bytes before handing it to the
  69 # application through the 'normal' interface.
  70 def _sanitize(string):
  71     # Turn any escaped bytes into unicode 'unknown' char.  If the escaped
  72     # bytes happen to be utf-8 they will instead get decoded, even if they
  73     # were invalid in the charset the source was supposed to be in.  This
  74     # seems like it is not a bad thing; a defect was still registered.
  75     original_bytes = string.encode('utf-8', 'surrogateescape')
  76     return original_bytes.decode('utf-8', 'replace')
  77 
  78 
  79 
  80 # Helpers
  81 
  82 def formataddr(pair, charset='utf-8'):
  83     """The inverse of parseaddr(), this takes a 2-tuple of the form
  84     (realname, email_address) and returns the string value suitable
  85     for an RFC 2822 From, To or Cc header.
  86 
  87     If the first element of pair is false, then the second element is
  88     returned unmodified.
  89 
  90     Optional charset if given is the character set that is used to encode
  91     realname in case realname is not ASCII safe.  Can be an instance of str or
  92     a Charset-like object which has a header_encode method.  Default is
  93     'utf-8'.
  94     """
  95     name, address = pair
  96     # The address MUST (per RFC) be ascii, so raise an UnicodeError if it isn't.
  97     address.encode('ascii')
  98     if name:
  99         try:
 100             name.encode('ascii')
 101         except UnicodeEncodeError:
 102             if isinstance(charset, str):
 103                 charset = Charset(charset)
 104             encoded_name = charset.header_encode(name)
 105             return "%s <%s>" % (encoded_name, address)
 106         else:
 107             quotes = ''
 108             if specialsre.search(name):
 109                 quotes = '"'
 110             name = escapesre.sub(r'\\\g<0>', name)
 111             return '%s%s%s <%s>' % (quotes, name, quotes, address)
 112     return address
 113 
 114 
 115 
 116 def getaddresses(fieldvalues):
 117     """Return a list of (REALNAME, EMAIL) for each fieldvalue."""
 118     all = COMMASPACE.join(fieldvalues)
 119     a = _AddressList(all)
 120     return a.addresslist
 121 
 122 
 123 
 124 ecre = re.compile(r'''
 125   =\?                   # literal =?
 126   (?P<charset>[^?]*?)   # non-greedy up to the next ? is the charset
 127   \?                    # literal ?
 128   (?P<encoding>[qb])    # either a "q" or a "b", case insensitive
 129   \?                    # literal ?
 130   (?P<atom>.*?)         # non-greedy up to the next ?= is the atom
 131   \?=                   # literal ?=
 132   ''', re.VERBOSE | re.IGNORECASE)
 133 
 134 
 135 def _format_timetuple_and_zone(timetuple, zone):
 136     return '%s, %02d %s %04d %02d:%02d:%02d %s' % (
 137         ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'][timetuple[6]],
 138         timetuple[2],
 139         ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
 140          'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][timetuple[1] - 1],
 141         timetuple[0], timetuple[3], timetuple[4], timetuple[5],
 142         zone)
 143 
 144 def formatdate(timeval=None, localtime=False, usegmt=False):
 145     """Returns a date string as specified by RFC 2822, e.g.:
 146 
 147     Fri, 09 Nov 2001 01:08:47 -0000
 148 
 149     Optional timeval if given is a floating point time value as accepted by
 150     gmtime() and localtime(), otherwise the current time is used.
 151 
 152     Optional localtime is a flag that when True, interprets timeval, and
 153     returns a date relative to the local timezone instead of UTC, properly
 154     taking daylight savings time into account.
 155 
 156     Optional argument usegmt means that the timezone is written out as
 157     an ascii string, not numeric one (so "GMT" instead of "+0000"). This
 158     is needed for HTTP, and is only used when localtime==False.
 159     """
 160     # Note: we cannot use strftime() because that honors the locale and RFC
 161     # 2822 requires that day and month names be the English abbreviations.
 162     if timeval is None:
 163         timeval = time.time()
 164     if localtime:
 165         now = time.localtime(timeval)
 166         # Calculate timezone offset, based on whether the local zone has
 167         # daylight savings time, and whether DST is in effect.
 168         if time.daylight and now[-1]:
 169             offset = time.altzone
 170         else:
 171             offset = time.timezone
 172         hours, minutes = divmod(abs(offset), 3600)
 173         # Remember offset is in seconds west of UTC, but the timezone is in
 174         # minutes east of UTC, so the signs differ.
 175         if offset > 0:
 176             sign = '-'
 177         else:
 178             sign = '+'
 179         zone = '%s%02d%02d' % (sign, hours, minutes // 60)
 180     else:
 181         now = time.gmtime(timeval)
 182         # Timezone offset is always -0000
 183         if usegmt:
 184             zone = 'GMT'
 185         else:
 186             zone = '-0000'
 187     return _format_timetuple_and_zone(now, zone)
 188 
 189 def format_datetime(dt, usegmt=False):
 190     """Turn a datetime into a date string as specified in RFC 2822.
 191 
 192     If usegmt is True, dt must be an aware datetime with an offset of zero.  In
 193     this case 'GMT' will be rendered instead of the normal +0000 required by
 194     RFC2822.  This is to support HTTP headers involving date stamps.
 195     """
 196     now = dt.timetuple()
 197     if usegmt:
 198         if dt.tzinfo is None or dt.tzinfo != datetime.timezone.utc:
 199             raise ValueError("usegmt option requires a UTC datetime")
 200         zone = 'GMT'
 201     elif dt.tzinfo is None:
 202         zone = '-0000'
 203     else:
 204         zone = dt.strftime("%z")
 205     return _format_timetuple_and_zone(now, zone)
 206 
 207 
 208 def make_msgid(idstring=None, domain=None):
 209     """Returns a string suitable for RFC 2822 compliant Message-ID, e.g:
 210 
 211     <20020201195627.33539.96671@nightshade.la.mastaler.com>
 212 
 213     Optional idstring if given is a string used to strengthen the
 214     uniqueness of the message id.  Optional domain if given provides the
 215     portion of the message id after the '@'.  It defaults to the locally
 216     defined hostname.
 217     """
 218     timeval = time.time()
 219     utcdate = time.strftime('%Y%m%d%H%M%S', time.gmtime(timeval))
 220     pid = os.getpid()
 221     randint = random.randrange(100000)
 222     if idstring is None:
 223         idstring = ''
 224     else:
 225         idstring = '.' + idstring
 226     if domain is None:
 227         domain = socket.getfqdn()
 228     msgid = '<%s.%s.%s%s@%s>' % (utcdate, pid, randint, idstring, domain)
 229     return msgid
 230 
 231 
 232 def parsedate_to_datetime(data):
 233     *dtuple, tz = _parsedate_tz(data)
 234     if tz is None:
 235         return datetime.datetime(*dtuple[:6])
 236     return datetime.datetime(*dtuple[:6],
 237             tzinfo=datetime.timezone(datetime.timedelta(seconds=tz)))
 238 
 239 
 240 def parseaddr(addr):
 241     addrs = _AddressList(addr).addresslist
 242     if not addrs:
 243         return '', ''
 244     return addrs[0]
 245 
 246 
 247 # rfc822.unquote() doesn't properly de-backslash-ify in Python pre-2.3.
 248 def unquote(str):
 249     """Remove quotes from a string."""
 250     if len(str) > 1:
 251         if str.startswith('"') and str.endswith('"'):
 252             return str[1:-1].replace('\\\\', '\\').replace('\\"', '"')
 253         if str.startswith('<') and str.endswith('>'):
 254             return str[1:-1]
 255     return str
 256 
 257 
 258 
 259 # RFC2231-related functions - parameter encoding and decoding
 260 def decode_rfc2231(s):
 261     """Decode string according to RFC 2231"""
 262     parts = s.split(TICK, 2)
 263     if len(parts) <= 2:
 264         return None, None, s
 265     return parts
 266 
 267 
 268 def encode_rfc2231(s, charset=None, language=None):
 269     """Encode string according to RFC 2231.
 270 
 271     If neither charset nor language is given, then s is returned as-is.  If
 272     charset is given but not language, the string is encoded using the empty
 273     string for language.
 274     """
 275     s = urllib.parse.quote(s, safe='', encoding=charset or 'ascii')
 276     if charset is None and language is None:
 277         return s
 278     if language is None:
 279         language = ''
 280     return "%s'%s'%s" % (charset, language, s)
 281 
 282 
 283 rfc2231_continuation = re.compile(r'^(?P<name>\w+)\*((?P<num>[0-9]+)\*?)?$',
 284     re.ASCII)
 285 
 286 def decode_params(params):
 287     """Decode parameters list according to RFC 2231.
 288 
 289     params is a sequence of 2-tuples containing (param name, string value).
 290     """
 291     # Copy params so we don't mess with the original
 292     params = params[:]
 293     new_params = []
 294     # Map parameter's name to a list of continuations.  The values are a
 295     # 3-tuple of the continuation number, the string value, and a flag
 296     # specifying whether a particular segment is %-encoded.
 297     rfc2231_params = {}
 298     name, value = params.pop(0)
 299     new_params.append((name, value))
 300     while params:
 301         name, value = params.pop(0)
 302         if name.endswith('*'):
 303             encoded = True
 304         else:
 305             encoded = False
 306         value = unquote(value)
 307         mo = rfc2231_continuation.match(name)
 308         if mo:
 309             name, num = mo.group('name', 'num')
 310             if num is not None:
 311                 num = int(num)
 312             rfc2231_params.setdefault(name, []).append((num, value, encoded))
 313         else:
 314             new_params.append((name, '"%s"' % quote(value)))
 315     if rfc2231_params:
 316         for name, continuations in rfc2231_params.items():
 317             value = []
 318             extended = False
 319             # Sort by number
 320             continuations.sort()
 321             # And now append all values in numerical order, converting
 322             # %-encodings for the encoded segments.  If any of the
 323             # continuation names ends in a *, then the entire string, after
 324             # decoding segments and concatenating, must have the charset and
 325             # language specifiers at the beginning of the string.
 326             for num, s, encoded in continuations:
 327                 if encoded:
 328                     # Decode as "latin-1", so the characters in s directly
 329                     # represent the percent-encoded octet values.
 330                     # collapse_rfc2231_value treats this as an octet sequence.
 331                     s = urllib.parse.unquote(s, encoding="latin-1")
 332                     extended = True
 333                 value.append(s)
 334             value = quote(EMPTYSTRING.join(value))
 335             if extended:
 336                 charset, language, value = decode_rfc2231(value)
 337                 new_params.append((name, (charset, language, '"%s"' % value)))
 338             else:
 339                 new_params.append((name, '"%s"' % value))
 340     return new_params
 341 
 342 def collapse_rfc2231_value(value, errors='replace',
 343                            fallback_charset='us-ascii'):
 344     if not isinstance(value, tuple) or len(value) != 3:
 345         return unquote(value)
 346     # While value comes to us as a unicode string, we need it to be a bytes
 347     # object.  We do not want bytes() normal utf-8 decoder, we want a straight
 348     # interpretation of the string as character bytes.
 349     charset, language, text = value
 350     rawbytes = bytes(text, 'raw-unicode-escape')
 351     try:
 352         return str(rawbytes, charset, errors)
 353     except LookupError:
 354         # charset is not a known codec.
 355         return unquote(text)
 356 
 357 
 358 #
 359 # datetime doesn't provide a localtime function yet, so provide one.  Code
 360 # adapted from the patch in issue 9527.  This may not be perfect, but it is
 361 # better than not having it.
 362 #
 363 
 364 def localtime(dt=None, isdst=-1):
 365     """Return local time as an aware datetime object.
 366 
 367     If called without arguments, return current time.  Otherwise *dt*
 368     argument should be a datetime instance, and it is converted to the
 369     local time zone according to the system time zone database.  If *dt* is
 370     naive (that is, dt.tzinfo is None), it is assumed to be in local time.
 371     In this case, a positive or zero value for *isdst* causes localtime to
 372     presume initially that summer time (for example, Daylight Saving Time)
 373     is or is not (respectively) in effect for the specified time.  A
 374     negative value for *isdst* causes the localtime() function to attempt
 375     to divine whether summer time is in effect for the specified time.
 376 
 377     """
 378     if dt is None:
 379         return datetime.datetime.now(datetime.timezone.utc).astimezone()
 380     if dt.tzinfo is not None:
 381         return dt.astimezone()
 382     # We have a naive datetime.  Convert to a (localtime) timetuple and pass to
 383     # system mktime together with the isdst hint.  System mktime will return
 384     # seconds since epoch.
 385     tm = dt.timetuple()[:-1] + (isdst,)
 386     seconds = time.mktime(tm)
 387     localtm = time.localtime(seconds)
 388     try:
 389         delta = datetime.timedelta(seconds=localtm.tm_gmtoff)
 390         tz = datetime.timezone(delta, localtm.tm_zone)
 391     except AttributeError:
 392         # Compute UTC offset and compare with the value implied by tm_isdst.
 393         # If the values match, use the zone name implied by tm_isdst.
 394         delta = dt - datetime.datetime(*time.gmtime(seconds)[:6])
 395         dst = time.daylight and localtm.tm_isdst > 0
 396         gmtoff = -(time.altzone if dst else time.timezone)
 397         if delta == datetime.timedelta(seconds=gmtoff):
 398             tz = datetime.timezone(delta, time.tzname[dst])
 399         else:
 400             tz = datetime.timezone(delta)
 401     return dt.replace(tzinfo=tz)