source: python/trunk/Lib/mimetools.py@ 382

Last change on this file since 382 was 2, checked in by Yuri Dario, 15 years ago

Initial import for vendor code.

  • Property svn:eol-style set to native
File size: 7.0 KB
Line 
1"""Various tools used by MIME-reading or MIME-writing programs."""
2
3
4import os
5import sys
6import tempfile
7from warnings import filterwarnings, catch_warnings
8with catch_warnings():
9 if sys.py3kwarning:
10 filterwarnings("ignore", ".*rfc822 has been removed", DeprecationWarning)
11 import rfc822
12
13from warnings import warnpy3k
14warnpy3k("in 3.x, mimetools has been removed in favor of the email package",
15 stacklevel=2)
16
17__all__ = ["Message","choose_boundary","encode","decode","copyliteral",
18 "copybinary"]
19
20class Message(rfc822.Message):
21 """A derived class of rfc822.Message that knows about MIME headers and
22 contains some hooks for decoding encoded and multipart messages."""
23
24 def __init__(self, fp, seekable = 1):
25 rfc822.Message.__init__(self, fp, seekable)
26 self.encodingheader = \
27 self.getheader('content-transfer-encoding')
28 self.typeheader = \
29 self.getheader('content-type')
30 self.parsetype()
31 self.parseplist()
32
33 def parsetype(self):
34 str = self.typeheader
35 if str is None:
36 str = 'text/plain'
37 if ';' in str:
38 i = str.index(';')
39 self.plisttext = str[i:]
40 str = str[:i]
41 else:
42 self.plisttext = ''
43 fields = str.split('/')
44 for i in range(len(fields)):
45 fields[i] = fields[i].strip().lower()
46 self.type = '/'.join(fields)
47 self.maintype = fields[0]
48 self.subtype = '/'.join(fields[1:])
49
50 def parseplist(self):
51 str = self.plisttext
52 self.plist = []
53 while str[:1] == ';':
54 str = str[1:]
55 if ';' in str:
56 # XXX Should parse quotes!
57 end = str.index(';')
58 else:
59 end = len(str)
60 f = str[:end]
61 if '=' in f:
62 i = f.index('=')
63 f = f[:i].strip().lower() + \
64 '=' + f[i+1:].strip()
65 self.plist.append(f.strip())
66 str = str[end:]
67
68 def getplist(self):
69 return self.plist
70
71 def getparam(self, name):
72 name = name.lower() + '='
73 n = len(name)
74 for p in self.plist:
75 if p[:n] == name:
76 return rfc822.unquote(p[n:])
77 return None
78
79 def getparamnames(self):
80 result = []
81 for p in self.plist:
82 i = p.find('=')
83 if i >= 0:
84 result.append(p[:i].lower())
85 return result
86
87 def getencoding(self):
88 if self.encodingheader is None:
89 return '7bit'
90 return self.encodingheader.lower()
91
92 def gettype(self):
93 return self.type
94
95 def getmaintype(self):
96 return self.maintype
97
98 def getsubtype(self):
99 return self.subtype
100
101
102
103
104# Utility functions
105# -----------------
106
107try:
108 import thread
109except ImportError:
110 import dummy_thread as thread
111_counter_lock = thread.allocate_lock()
112del thread
113
114_counter = 0
115def _get_next_counter():
116 global _counter
117 _counter_lock.acquire()
118 _counter += 1
119 result = _counter
120 _counter_lock.release()
121 return result
122
123_prefix = None
124
125def choose_boundary():
126 """Return a string usable as a multipart boundary.
127
128 The string chosen is unique within a single program run, and
129 incorporates the user id (if available), process id (if available),
130 and current time. So it's very unlikely the returned string appears
131 in message text, but there's no guarantee.
132
133 The boundary contains dots so you have to quote it in the header."""
134
135 global _prefix
136 import time
137 if _prefix is None:
138 import socket
139 try:
140 hostid = socket.gethostbyname(socket.gethostname())
141 except socket.gaierror:
142 hostid = '127.0.0.1'
143 try:
144 uid = repr(os.getuid())
145 except AttributeError:
146 uid = '1'
147 try:
148 pid = repr(os.getpid())
149 except AttributeError:
150 pid = '1'
151 _prefix = hostid + '.' + uid + '.' + pid
152 return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter())
153
154
155# Subroutines for decoding some common content-transfer-types
156
157def decode(input, output, encoding):
158 """Decode common content-transfer-encodings (base64, quopri, uuencode)."""
159 if encoding == 'base64':
160 import base64
161 return base64.decode(input, output)
162 if encoding == 'quoted-printable':
163 import quopri
164 return quopri.decode(input, output)
165 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
166 import uu
167 return uu.decode(input, output)
168 if encoding in ('7bit', '8bit'):
169 return output.write(input.read())
170 if encoding in decodetab:
171 pipethrough(input, decodetab[encoding], output)
172 else:
173 raise ValueError, \
174 'unknown Content-Transfer-Encoding: %s' % encoding
175
176def encode(input, output, encoding):
177 """Encode common content-transfer-encodings (base64, quopri, uuencode)."""
178 if encoding == 'base64':
179 import base64
180 return base64.encode(input, output)
181 if encoding == 'quoted-printable':
182 import quopri
183 return quopri.encode(input, output, 0)
184 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
185 import uu
186 return uu.encode(input, output)
187 if encoding in ('7bit', '8bit'):
188 return output.write(input.read())
189 if encoding in encodetab:
190 pipethrough(input, encodetab[encoding], output)
191 else:
192 raise ValueError, \
193 'unknown Content-Transfer-Encoding: %s' % encoding
194
195# The following is no longer used for standard encodings
196
197# XXX This requires that uudecode and mmencode are in $PATH
198
199uudecode_pipe = '''(
200TEMP=/tmp/@uu.$$
201sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode
202cat $TEMP
203rm $TEMP
204)'''
205
206decodetab = {
207 'uuencode': uudecode_pipe,
208 'x-uuencode': uudecode_pipe,
209 'uue': uudecode_pipe,
210 'x-uue': uudecode_pipe,
211 'quoted-printable': 'mmencode -u -q',
212 'base64': 'mmencode -u -b',
213}
214
215encodetab = {
216 'x-uuencode': 'uuencode tempfile',
217 'uuencode': 'uuencode tempfile',
218 'x-uue': 'uuencode tempfile',
219 'uue': 'uuencode tempfile',
220 'quoted-printable': 'mmencode -q',
221 'base64': 'mmencode -b',
222}
223
224def pipeto(input, command):
225 pipe = os.popen(command, 'w')
226 copyliteral(input, pipe)
227 pipe.close()
228
229def pipethrough(input, command, output):
230 (fd, tempname) = tempfile.mkstemp()
231 temp = os.fdopen(fd, 'w')
232 copyliteral(input, temp)
233 temp.close()
234 pipe = os.popen(command + ' <' + tempname, 'r')
235 copybinary(pipe, output)
236 pipe.close()
237 os.unlink(tempname)
238
239def copyliteral(input, output):
240 while 1:
241 line = input.readline()
242 if not line: break
243 output.write(line)
244
245def copybinary(input, output):
246 BUFSIZE = 8192
247 while 1:
248 line = input.read(BUFSIZE)
249 if not line: break
250 output.write(line)
Note: See TracBrowser for help on using the repository browser.