1 # Copyright (C) 2002-2007 Python Software Foundation
   2 # Author: Ben Gertzfield, Barry Warsaw
   3 # Contact: email-sig@python.org
   4 
   5 """Header encoding and decoding functionality."""
   6 
   7 __all__ = [
   8     'Header',
   9     'decode_header',
  10     'make_header',
  11     ]
  12 
  13 import re
  14 import binascii
  15 
  16 import email.quoprimime
  17 import email.base64mime
  18 
  19 from email.errors import HeaderParseError
  20 from email import charset as _charset
  21 Charset = _charset.Charset
  22 
  23 NL = '\n'
  24 SPACE = ' '
  25 BSPACE = b' '
  26 SPACE8 = ' ' * 8
  27 EMPTYSTRING = ''
  28 MAXLINELEN = 78
  29 FWS = ' \t'
  30 
  31 USASCII = Charset('us-ascii')
  32 UTF8 = Charset('utf-8')
  33 
  34 # Match encoded-word strings in the form =?charset?q?Hello_World?=
  35 ecre = re.compile(r'''
  36   =\?                   # literal =?
  37   (?P<charset>[^?]*?)   # non-greedy up to the next ? is the charset
  38   \?                    # literal ?
  39   (?P<encoding>[qb])    # either a "q" or a "b", case insensitive
  40   \?                    # literal ?
  41   (?P<encoded>.*?)      # non-greedy up to the next ?= is the encoded string
  42   \?=                   # literal ?=
  43   ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE)
  44 
  45 # Field name regexp, including trailing colon, but not separating whitespace,
  46 # according to RFC 2822.  Character range is from tilde to exclamation mark.
  47 # For use with .match()
  48 fcre = re.compile(r'[\041-\176]+:$')
  49 
  50 # Find a header embedded in a putative header value.  Used to check for
  51 # header injection attack.
  52 _embeded_header = re.compile(r'\n[^ \t]+:')
  53 
  54 
  55 
  56 # Helpers
  57 _max_append = email.quoprimime._max_append
  58 
  59 
  60 
  61 def decode_header(header):
  62     """Decode a message header value without converting charset.
  63 
  64     Returns a list of (string, charset) pairs containing each of the decoded
  65     parts of the header.  Charset is None for non-encoded parts of the header,
  66     otherwise a lower-case string containing the name of the character set
  67     specified in the encoded string.
  68 
  69     header may be a string that may or may not contain RFC2047 encoded words,
  70     or it may be a Header object.
  71 
  72     An email.errors.HeaderParseError may be raised when certain decoding error
  73     occurs (e.g. a base64 decoding exception).
  74     """
  75     # If it is a Header object, we can just return the encoded chunks.
  76     if hasattr(header, '_chunks'):
  77         return [(_charset._encode(string, str(charset)), str(charset))
  78                     for string, charset in header._chunks]
  79     # If no encoding, just return the header with no charset.
  80     if not ecre.search(header):
  81         return [(header, None)]
  82     # First step is to parse all the encoded parts into triplets of the form
  83     # (encoded_string, encoding, charset).  For unencoded strings, the last
  84     # two parts will be None.
  85     words = []
  86     for line in header.splitlines():
  87         parts = ecre.split(line)
  88         first = True
  89         while parts:
  90             unencoded = parts.pop(0)
  91             if first:
  92                 unencoded = unencoded.lstrip()
  93                 first = False
  94             if unencoded:
  95                 words.append((unencoded, None, None))
  96             if parts:
  97                 charset = parts.pop(0).lower()
  98                 encoding = parts.pop(0).lower()
  99                 encoded = parts.pop(0)
 100                 words.append((encoded, encoding, charset))
 101     # Now loop over words and remove words that consist of whitespace
 102     # between two encoded strings.
 103     import sys
 104     droplist = []
 105     for n, w in enumerate(words):
 106         if n>1 and w[1] and words[n-2][1] and words[n-1][0].isspace():
 107             droplist.append(n-1)
 108     for d in reversed(droplist):
 109         del words[d]
 110 
 111     # The next step is to decode each encoded word by applying the reverse
 112     # base64 or quopri transformation.  decoded_words is now a list of the
 113     # form (decoded_word, charset).
 114     decoded_words = []
 115     for encoded_string, encoding, charset in words:
 116         if encoding is None:
 117             # This is an unencoded word.
 118             decoded_words.append((encoded_string, charset))
 119         elif encoding == 'q':
 120             word = email.quoprimime.header_decode(encoded_string)
 121             decoded_words.append((word, charset))
 122         elif encoding == 'b':
 123             paderr = len(encoded_string) % 4   # Postel's law: add missing padding
 124             if paderr:
 125                 encoded_string += '==='[:4 - paderr]
 126             try:
 127                 word = email.base64mime.decode(encoded_string)
 128             except binascii.Error:
 129                 raise HeaderParseError('Base64 decoding error')
 130             else:
 131                 decoded_words.append((word, charset))
 132         else:
 133             raise AssertionError('Unexpected encoding: ' + encoding)
 134     # Now convert all words to bytes and collapse consecutive runs of
 135     # similarly encoded words.
 136     collapsed = []
 137     last_word = last_charset = None
 138     for word, charset in decoded_words:
 139         if isinstance(word, str):
 140             word = bytes(word, 'raw-unicode-escape')
 141         if last_word is None:
 142             last_word = word
 143             last_charset = charset
 144         elif charset != last_charset:
 145             collapsed.append((last_word, last_charset))
 146             last_word = word
 147             last_charset = charset
 148         elif last_charset is None:
 149             last_word += BSPACE + word
 150         else:
 151             last_word += word
 152     collapsed.append((last_word, last_charset))
 153     return collapsed
 154 
 155 
 156 
 157 def make_header(decoded_seq, maxlinelen=None, header_name=None,
 158                 continuation_ws=' '):
 159     """Create a Header from a sequence of pairs as returned by decode_header()
 160 
 161     decode_header() takes a header value string and returns a sequence of
 162     pairs of the format (decoded_string, charset) where charset is the string
 163     name of the character set.
 164 
 165     This function takes one of those sequence of pairs and returns a Header
 166     instance.  Optional maxlinelen, header_name, and continuation_ws are as in
 167     the Header constructor.
 168     """
 169     h = Header(maxlinelen=maxlinelen, header_name=header_name,
 170                continuation_ws=continuation_ws)
 171     for s, charset in decoded_seq:
 172         # None means us-ascii but we can simply pass it on to h.append()
 173         if charset is not None and not isinstance(charset, Charset):
 174             charset = Charset(charset)
 175         h.append(s, charset)
 176     return h
 177 
 178 
 179 
 180 class Header:
 181     def __init__(self, s=None, charset=None,
 182                  maxlinelen=None, header_name=None,
 183                  continuation_ws=' ', errors='strict'):
 184         """Create a MIME-compliant header that can contain many character sets.
 185 
 186         Optional s is the initial header value.  If None, the initial header
 187         value is not set.  You can later append to the header with .append()
 188         method calls.  s may be a byte string or a Unicode string, but see the
 189         .append() documentation for semantics.
 190 
 191         Optional charset serves two purposes: it has the same meaning as the
 192         charset argument to the .append() method.  It also sets the default
 193         character set for all subsequent .append() calls that omit the charset
 194         argument.  If charset is not provided in the constructor, the us-ascii
 195         charset is used both as s's initial charset and as the default for
 196         subsequent .append() calls.
 197 
 198         The maximum line length can be specified explicitly via maxlinelen. For
 199         splitting the first line to a shorter value (to account for the field
 200         header which isn't included in s, e.g. `Subject') pass in the name of
 201         the field in header_name.  The default maxlinelen is 78 as recommended
 202         by RFC 2822.
 203 
 204         continuation_ws must be RFC 2822 compliant folding whitespace (usually
 205         either a space or a hard tab) which will be prepended to continuation
 206         lines.
 207 
 208         errors is passed through to the .append() call.
 209         """
 210         if charset is None:
 211             charset = USASCII
 212         elif not isinstance(charset, Charset):
 213             charset = Charset(charset)
 214         self._charset = charset
 215         self._continuation_ws = continuation_ws
 216         self._chunks = []
 217         if s is not None:
 218             self.append(s, charset, errors)
 219         if maxlinelen is None:
 220             maxlinelen = MAXLINELEN
 221         self._maxlinelen = maxlinelen
 222         if header_name is None:
 223             self._headerlen = 0
 224         else:
 225             # Take the separating colon and space into account.
 226             self._headerlen = len(header_name) + 2
 227 
 228     def __str__(self):
 229         """Return the string value of the header."""
 230         self._normalize()
 231         uchunks = []
 232         lastcs = None
 233         lastspace = None
 234         for string, charset in self._chunks:
 235             # We must preserve spaces between encoded and non-encoded word
 236             # boundaries, which means for us we need to add a space when we go
 237             # from a charset to None/us-ascii, or from None/us-ascii to a
 238             # charset.  Only do this for the second and subsequent chunks.
 239             # Don't add a space if the None/us-ascii string already has
 240             # a space (trailing or leading depending on transition)
 241             nextcs = charset
 242             if nextcs == _charset.UNKNOWN8BIT:
 243                 original_bytes = string.encode('ascii', 'surrogateescape')
 244                 string = original_bytes.decode('ascii', 'replace')
 245             if uchunks:
 246                 hasspace = string and self._nonctext(string[0])
 247                 if lastcs not in (None, 'us-ascii'):
 248                     if nextcs in (None, 'us-ascii') and not hasspace:
 249                         uchunks.append(SPACE)
 250                         nextcs = None
 251                 elif nextcs not in (None, 'us-ascii') and not lastspace:
 252                     uchunks.append(SPACE)
 253             lastspace = string and self._nonctext(string[-1])
 254             lastcs = nextcs
 255             uchunks.append(string)
 256         return EMPTYSTRING.join(uchunks)
 257 
 258     # Rich comparison operators for equality only.  BAW: does it make sense to
 259     # have or explicitly disable <, <=, >, >= operators?
 260     def __eq__(self, other):
 261         # other may be a Header or a string.  Both are fine so coerce
 262         # ourselves to a unicode (of the unencoded header value), swap the
 263         # args and do another comparison.
 264         return other == str(self)
 265 
 266     def __ne__(self, other):
 267         return not self == other
 268 
 269     def append(self, s, charset=None, errors='strict'):
 270         """Append a string to the MIME header.
 271 
 272         Optional charset, if given, should be a Charset instance or the name
 273         of a character set (which will be converted to a Charset instance).  A
 274         value of None (the default) means that the charset given in the
 275         constructor is used.
 276 
 277         s may be a byte string or a Unicode string.  If it is a byte string
 278         (i.e. isinstance(s, str) is false), then charset is the encoding of
 279         that byte string, and a UnicodeError will be raised if the string
 280         cannot be decoded with that charset.  If s is a Unicode string, then
 281         charset is a hint specifying the character set of the characters in
 282         the string.  In either case, when producing an RFC 2822 compliant
 283         header using RFC 2047 rules, the string will be encoded using the
 284         output codec of the charset.  If the string cannot be encoded to the
 285         output codec, a UnicodeError will be raised.
 286 
 287         Optional `errors' is passed as the errors argument to the decode
 288         call if s is a byte string.
 289         """
 290         if charset is None:
 291             charset = self._charset
 292         elif not isinstance(charset, Charset):
 293             charset = Charset(charset)
 294         if not isinstance(s, str):
 295             input_charset = charset.input_codec or 'us-ascii'
 296             if input_charset == _charset.UNKNOWN8BIT:
 297                 s = s.decode('us-ascii', 'surrogateescape')
 298             else:
 299                 s = s.decode(input_charset, errors)
 300         # Ensure that the bytes we're storing can be decoded to the output
 301         # character set, otherwise an early error is raised.
 302         output_charset = charset.output_codec or 'us-ascii'
 303         if output_charset != _charset.UNKNOWN8BIT:
 304             try:
 305                 s.encode(output_charset, errors)
 306             except UnicodeEncodeError:
 307                 if output_charset!='us-ascii':
 308                     raise
 309                 charset = UTF8
 310         self._chunks.append((s, charset))
 311 
 312     def _nonctext(self, s):
 313         """True if string s is not a ctext character of RFC822.
 314         """
 315         return s.isspace() or s in ('(', ')', '\\')
 316 
 317     def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'):
 318         r"""Encode a message header into an RFC-compliant format.
 319 
 320         There are many issues involved in converting a given string for use in
 321         an email header.  Only certain character sets are readable in most
 322         email clients, and as header strings can only contain a subset of
 323         7-bit ASCII, care must be taken to properly convert and encode (with
 324         Base64 or quoted-printable) header strings.  In addition, there is a
 325         75-character length limit on any given encoded header field, so
 326         line-wrapping must be performed, even with double-byte character sets.
 327 
 328         Optional maxlinelen specifies the maximum length of each generated
 329         line, exclusive of the linesep string.  Individual lines may be longer
 330         than maxlinelen if a folding point cannot be found.  The first line
 331         will be shorter by the length of the header name plus ": " if a header
 332         name was specified at Header construction time.  The default value for
 333         maxlinelen is determined at header construction time.
 334 
 335         Optional splitchars is a string containing characters which should be
 336         given extra weight by the splitting algorithm during normal header
 337         wrapping.  This is in very rough support of RFC 2822's `higher level
 338         syntactic breaks':  split points preceded by a splitchar are preferred
 339         during line splitting, with the characters preferred in the order in
 340         which they appear in the string.  Space and tab may be included in the
 341         string to indicate whether preference should be given to one over the
 342         other as a split point when other split chars do not appear in the line
 343         being split.  Splitchars does not affect RFC 2047 encoded lines.
 344 
 345         Optional linesep is a string to be used to separate the lines of
 346         the value.  The default value is the most useful for typical
 347         Python applications, but it can be set to \r\n to produce RFC-compliant
 348         line separators when needed.
 349         """
 350         self._normalize()
 351         if maxlinelen is None:
 352             maxlinelen = self._maxlinelen
 353         # A maxlinelen of 0 means don't wrap.  For all practical purposes,
 354         # choosing a huge number here accomplishes that and makes the
 355         # _ValueFormatter algorithm much simpler.
 356         if maxlinelen == 0:
 357             maxlinelen = 1000000
 358         formatter = _ValueFormatter(self._headerlen, maxlinelen,
 359                                     self._continuation_ws, splitchars)
 360         lastcs = None
 361         hasspace = lastspace = None
 362         for string, charset in self._chunks:
 363             if hasspace is not None:
 364                 hasspace = string and self._nonctext(string[0])
 365                 import sys
 366                 if lastcs not in (None, 'us-ascii'):
 367                     if not hasspace or charset not in (None, 'us-ascii'):
 368                         formatter.add_transition()
 369                 elif charset not in (None, 'us-ascii') and not lastspace:
 370                     formatter.add_transition()
 371             lastspace = string and self._nonctext(string[-1])
 372             lastcs = charset
 373             hasspace = False
 374             lines = string.splitlines()
 375             if lines:
 376                 formatter.feed('', lines[0], charset)
 377             else:
 378                 formatter.feed('', '', charset)
 379             for line in lines[1:]:
 380                 formatter.newline()
 381                 if charset.header_encoding is not None:
 382                     formatter.feed(self._continuation_ws, ' ' + line.lstrip(),
 383                                    charset)
 384                 else:
 385                     sline = line.lstrip()
 386                     fws = line[:len(line)-len(sline)]
 387                     formatter.feed(fws, sline, charset)
 388             if len(lines) > 1:
 389                 formatter.newline()
 390         if self._chunks:
 391             formatter.add_transition()
 392         value = formatter._str(linesep)
 393         if _embeded_header.search(value):
 394             raise HeaderParseError("header value appears to contain "
 395                 "an embedded header: {!r}".format(value))
 396         return value
 397 
 398     def _normalize(self):
 399         # Step 1: Normalize the chunks so that all runs of identical charsets
 400         # get collapsed into a single unicode string.
 401         chunks = []
 402         last_charset = None
 403         last_chunk = []
 404         for string, charset in self._chunks:
 405             if charset == last_charset:
 406                 last_chunk.append(string)
 407             else:
 408                 if last_charset is not None:
 409                     chunks.append((SPACE.join(last_chunk), last_charset))
 410                 last_chunk = [string]
 411                 last_charset = charset
 412         if last_chunk:
 413             chunks.append((SPACE.join(last_chunk), last_charset))
 414         self._chunks = chunks
 415 
 416 
 417 
 418 class _ValueFormatter:
 419     def __init__(self, headerlen, maxlen, continuation_ws, splitchars):
 420         self._maxlen = maxlen
 421         self._continuation_ws = continuation_ws
 422         self._continuation_ws_len = len(continuation_ws)
 423         self._splitchars = splitchars
 424         self._lines = []
 425         self._current_line = _Accumulator(headerlen)
 426 
 427     def _str(self, linesep):
 428         self.newline()
 429         return linesep.join(self._lines)
 430 
 431     def __str__(self):
 432         return self._str(NL)
 433 
 434     def newline(self):
 435         end_of_line = self._current_line.pop()
 436         if end_of_line != (' ', ''):
 437             self._current_line.push(*end_of_line)
 438         if len(self._current_line) > 0:
 439             if self._current_line.is_onlyws():
 440                 self._lines[-1] += str(self._current_line)
 441             else:
 442                 self._lines.append(str(self._current_line))
 443         self._current_line.reset()
 444 
 445     def add_transition(self):
 446         self._current_line.push(' ', '')
 447 
 448     def feed(self, fws, string, charset):
 449         # If the charset has no header encoding (i.e. it is an ASCII encoding)
 450         # then we must split the header at the "highest level syntactic break"
 451         # possible. Note that we don't have a lot of smarts about field
 452         # syntax; we just try to break on semi-colons, then commas, then
 453         # whitespace.  Eventually, this should be pluggable.
 454         if charset.header_encoding is None:
 455             self._ascii_split(fws, string, self._splitchars)
 456             return
 457         # Otherwise, we're doing either a Base64 or a quoted-printable
 458         # encoding which means we don't need to split the line on syntactic
 459         # breaks.  We can basically just find enough characters to fit on the
 460         # current line, minus the RFC 2047 chrome.  What makes this trickier
 461         # though is that we have to split at octet boundaries, not character
 462         # boundaries but it's only safe to split at character boundaries so at
 463         # best we can only get close.
 464         encoded_lines = charset.header_encode_lines(string, self._maxlengths())
 465         # The first element extends the current line, but if it's None then
 466         # nothing more fit on the current line so start a new line.
 467         try:
 468             first_line = encoded_lines.pop(0)
 469         except IndexError:
 470             # There are no encoded lines, so we're done.
 471             return
 472         if first_line is not None:
 473             self._append_chunk(fws, first_line)
 474         try:
 475             last_line = encoded_lines.pop()
 476         except IndexError:
 477             # There was only one line.
 478             return
 479         self.newline()
 480         self._current_line.push(self._continuation_ws, last_line)
 481         # Everything else are full lines in themselves.
 482         for line in encoded_lines:
 483             self._lines.append(self._continuation_ws + line)
 484 
 485     def _maxlengths(self):
 486         # The first line's length.
 487         yield self._maxlen - len(self._current_line)
 488         while True:
 489             yield self._maxlen - self._continuation_ws_len
 490 
 491     def _ascii_split(self, fws, string, splitchars):
 492         # The RFC 2822 header folding algorithm is simple in principle but
 493         # complex in practice.  Lines may be folded any place where "folding
 494         # white space" appears by inserting a linesep character in front of the
 495         # FWS.  The complication is that not all spaces or tabs qualify as FWS,
 496         # and we are also supposed to prefer to break at "higher level
 497         # syntactic breaks".  We can't do either of these without intimate
 498         # knowledge of the structure of structured headers, which we don't have
 499         # here.  So the best we can do here is prefer to break at the specified
 500         # splitchars, and hope that we don't choose any spaces or tabs that
 501         # aren't legal FWS.  (This is at least better than the old algorithm,
 502         # where we would sometimes *introduce* FWS after a splitchar, or the
 503         # algorithm before that, where we would turn all white space runs into
 504         # single spaces or tabs.)
 505         parts = re.split("(["+FWS+"]+)", fws+string)
 506         if parts[0]:
 507             parts[:0] = ['']
 508         else:
 509             parts.pop(0)
 510         for fws, part in zip(*[iter(parts)]*2):
 511             self._append_chunk(fws, part)
 512 
 513     def _append_chunk(self, fws, string):
 514         self._current_line.push(fws, string)
 515         if len(self._current_line) > self._maxlen:
 516             # Find the best split point, working backward from the end.
 517             # There might be none, on a long first line.
 518             for ch in self._splitchars:
 519                 for i in range(self._current_line.part_count()-1, 0, -1):
 520                     if ch.isspace():
 521                         fws = self._current_line[i][0]
 522                         if fws and fws[0]==ch:
 523                             break
 524                     prevpart = self._current_line[i-1][1]
 525                     if prevpart and prevpart[-1]==ch:
 526                         break
 527                 else:
 528                     continue
 529                 break
 530             else:
 531                 fws, part = self._current_line.pop()
 532                 if self._current_line._initial_size > 0:
 533                     # There will be a header, so leave it on a line by itself.
 534                     self.newline()
 535                     if not fws:
 536                         # We don't use continuation_ws here because the whitespace
 537                         # after a header should always be a space.
 538                         fws = ' '
 539                 self._current_line.push(fws, part)
 540                 return
 541             remainder = self._current_line.pop_from(i)
 542             self._lines.append(str(self._current_line))
 543             self._current_line.reset(remainder)
 544 
 545 
 546 class _Accumulator(list):
 547 
 548     def __init__(self, initial_size=0):
 549         self._initial_size = initial_size
 550         super().__init__()
 551 
 552     def push(self, fws, string):
 553         self.append((fws, string))
 554 
 555     def pop_from(self, i=0):
 556         popped = self[i:]
 557         self[i:] = []
 558         return popped
 559 
 560     def pop(self):
 561         if self.part_count()==0:
 562             return ('', '')
 563         return super().pop()
 564 
 565     def __len__(self):
 566         return sum((len(fws)+len(part) for fws, part in self),
 567                    self._initial_size)
 568 
 569     def __str__(self):
 570         return EMPTYSTRING.join((EMPTYSTRING.join((fws, part))
 571                                 for fws, part in self))
 572 
 573     def reset(self, startval=None):
 574         if startval is None:
 575             startval = []
 576         self[:] = startval
 577         self._initial_size = 0
 578 
 579     def is_onlyws(self):
 580         return self._initial_size==0 and (not self or str(self).isspace())
 581 
 582     def part_count(self):
 583         return super().__len__()