From 2f1992f179d107f5ae338c107ee06af12ea23e0e Mon Sep 17 00:00:00 2001 From: Adam Sampson Date: Sat, 12 Jul 2003 16:10:40 +0000 Subject: [PATCH] Replaced with feedparser.py. --- rawdoglib/rssparser.py | 525 ----------------------------------------- 1 file changed, 525 deletions(-) delete mode 100644 rawdoglib/rssparser.py diff --git a/rawdoglib/rssparser.py b/rawdoglib/rssparser.py deleted file mode 100644 index 3563460..0000000 --- a/rawdoglib/rssparser.py +++ /dev/null @@ -1,525 +0,0 @@ -#!/usr/bin/python -"""Ultra-liberal RSS parser - -Visit http://diveintomark.org/projects/rss_parser/ for the latest version - -Handles RSS 0.9x and RSS 1.0 feeds - -RSS 0.9x elements: -- title, link, description, webMaster, managingEditor, language - copyright, lastBuildDate, pubDate - -RSS 1.0 elements: -- dc:rights, dc:language, dc:creator, dc:date, dc:subject, - content:encoded - -Things it handles that choke other RSS parsers: -- bastard combinations of RSS 0.9x and RSS 1.0 (most Movable Type feeds) -- illegal XML characters (most Radio feeds) -- naked and/or invalid HTML in description (The Register) -- content:encoded in item element (Aaron Swartz) -- guid in item element (Scripting News) -- fullitem in item element (Jon Udell) -- non-standard namespaces (BitWorking) - -Requires Python 2.2 or later - -Modifications made by Adam Sampson for use in rawdog: -- timeout increased to 30 seconds from 10 (since pulling Advogato's recentlog - takes about 20s, even on my ADSL!) -- when invoked as a script, print the full parser output; this is useful for - debugging -""" - -__author__ = "Mark Pilgrim (f8dy@diveintomark.org)" -__copyright__ = "Copyright 2002, Mark Pilgrim" -__contributors__ = ["Jason Diamond (jason@injektilo.org)"] -__license__ = "GPL" -__history__ = """ -1.0 - 9/27/2002 - MAP - fixed namespace processing on prefixed RSS 2.0 elements, - added Simon Fell's test suite -1.1 - 9/29/2002 - MAP - fixed infinite loop on incomplete CDATA sections -2.0 - 10/19/2002 - JD - use inchannel to watch out for image and textinput elements which can - also contain title, link, and description elements - JD - check for isPermaLink="false" attribute on guid elements - JD - replaced openAnything with open_resource supporting ETag and - If-Modified-Since request headers - JD - parse now accepts etag, modified, agent, and referrer optional - arguments - JD - modified parse to return a dictionary instead of a tuple so that any - etag or modified information can be returned and cached by the caller -2.0.1 - 10/21/2002 - MAP - changed parse() so that if we don't get anything - because of etag/modified, return the old etag/modified to the caller to - indicate why nothing is being returned -2.0.2 - 10/21/2002 - JB - added the inchannel to the if statement, otherwise its - useless. Fixes the problem JD was addressing by adding it. -2.1 - 11/14/2002 - MAP - added gzip support -2.2 - 1/27/2003 - MAP - added attribute support, admin:generatorAgent. - start_admingeneratoragent is an example of how to handle elements with - only attributes, no content. -""" - -try: - import timeoutsocket # http://www.timo-tasi.org/python/timeoutsocket.py - timeoutsocket.setDefaultSocketTimeout(30) -except ImportError: - pass -import cgi, re, sgmllib, string, StringIO, urllib, gzip -sgmllib.tagfind = re.compile('[a-zA-Z][-_.:a-zA-Z0-9]*') - -def decodeEntities(data): - data = data or '' - data = data.replace('<', '<') - data = data.replace('>', '>') - data = data.replace('"', '"') - data = data.replace(''', "'") - data = data.replace('&', '&') - return data - -class RSSParser(sgmllib.SGMLParser): - namespaces = {"http://backend.userland.com/rss": "", - "http://backend.userland.com/rss2": "", - "http://purl.org/rss/1.0/": "", - "http://purl.org/rss/1.0/modules/textinput/": "ti", - "http://purl.org/rss/1.0/modules/company/": "co", - "http://purl.org/rss/1.0/modules/syndication/": "sy", - "http://purl.org/dc/elements/1.1/": "dc", - "http://webns.net/mvcb/": "admin"} - - def reset(self): - self.channel = {} - self.items = [] - self.elementstack = [] - self.inchannel = 0 - self.initem = 0 - self.namespacemap = {} - sgmllib.SGMLParser.reset(self) - - def push(self, element, expectingText): - self.elementstack.append([element, expectingText, []]) - - def pop(self, element): - if not self.elementstack: return - if self.elementstack[-1][0] != element: return - element, expectingText, pieces = self.elementstack.pop() - if not expectingText: return - output = "".join(pieces) - output = decodeEntities(output) - if self.initem: - self.items[-1][element] = output - elif self.inchannel: - self.channel[element] = output - - def _addNamespaces(self, attrs): - for prefix, value in attrs: - if not prefix.startswith("xmlns:"): continue - prefix = prefix[6:] - if self.namespaces.has_key(value): - self.namespacemap[prefix] = self.namespaces[value] - - def _mapToStandardPrefix(self, name): - colonpos = name.find(':') - if colonpos <> -1: - prefix = name[:colonpos] - suffix = name[colonpos+1:] - prefix = self.namespacemap.get(prefix, prefix) - name = prefix + ':' + suffix - return name - - def _getAttribute(self, attrs, name): - value = [v for k, v in attrs if self._mapToStandardPrefix(k) == name] - if value: - value = value[0] - else: - value = None - return value - - def start_channel(self, attrs): - self.push('channel', 0) - self.inchannel = 1 - - def end_channel(self): - self.pop('channel') - self.inchannel = 0 - - def start_item(self, attrs): - self.items.append({}) - self.push('item', 0) - self.initem = 1 - - def end_item(self): - self.pop('item') - self.initem = 0 - - def start_dc_language(self, attrs): - self.push('language', 1) - start_language = start_dc_language - - def end_dc_language(self): - self.pop('language') - end_language = end_dc_language - - def start_dc_creator(self, attrs): - self.push('creator', 1) - start_managingeditor = start_dc_creator - start_webmaster = start_dc_creator - - def end_dc_creator(self): - self.pop('creator') - end_managingeditor = end_dc_creator - end_webmaster = end_dc_creator - - def start_dc_rights(self, attrs): - self.push('rights', 1) - start_copyright = start_dc_rights - - def end_dc_rights(self): - self.pop('rights') - end_copyright = end_dc_rights - - def start_dc_date(self, attrs): - self.push('date', 1) - start_lastbuilddate = start_dc_date - start_pubdate = start_dc_date - - def end_dc_date(self): - self.pop('date') - end_lastbuilddate = end_dc_date - end_pubdate = end_dc_date - - def start_dc_subject(self, attrs): - self.push('category', 1) - - def end_dc_subject(self): - self.pop('category') - - def start_link(self, attrs): - self.push('link', self.inchannel or self.initem) - - def end_link(self): - self.pop('link') - - def start_guid(self, attrs): - self.guidislink = ('ispermalink', 'false') not in attrs - self.push('guid', 1) - - def end_guid(self): - self.pop('guid') - if self.guidislink: - self.items[-1]['link'] = self.items[-1]['guid'] - - def start_title(self, attrs): - self.push('title', self.inchannel or self.initem) - - def start_description(self, attrs): - self.push('description', self.inchannel or self.initem) - - def start_content_encoded(self, attrs): - self.push('content_encoded', 1) - start_fullitem = start_content_encoded - - def end_content_encoded(self): - self.pop('content_encoded') - end_fullitem = end_content_encoded - - def start_admin_generatoragent(self, attrs): - self.push('generator', 1) - value = self._getAttribute(attrs, 'rdf:resource') - if value: - self.elementstack[-1][2].append(value) - self.pop('generator') - - def unknown_starttag(self, tag, attrs): - self._addNamespaces(attrs) - colonpos = tag.find(':') - if colonpos <> -1: - prefix = tag[:colonpos] - suffix = tag[colonpos+1:] - prefix = self.namespacemap.get(prefix, prefix) - if prefix: - prefix = prefix + '_' - methodname = 'start_' + prefix + suffix - try: - method = getattr(self, methodname) - return method(attrs) - except AttributeError: - return self.push(prefix + suffix, 0) - return self.push(tag, 0) - - def unknown_endtag(self, tag): - colonpos = tag.find(':') - if colonpos <> -1: - prefix = tag[:colonpos] - suffix = tag[colonpos+1:] - prefix = self.namespacemap.get(prefix, prefix) - if prefix: - prefix = prefix + '_' - methodname = 'end_' + prefix + suffix - try: - method = getattr(self, methodname) - return method() - except AttributeError: - return self.pop(prefix + suffix) - return self.pop(tag) - - def handle_charref(self, ref): - # called for each character reference, e.g. for " ", ref will be "160" - # Reconstruct the original character reference. - if not self.elementstack: return - self.elementstack[-1][2].append("&#%(ref)s;" % locals()) - - def handle_entityref(self, ref): - # called for each entity reference, e.g. for "©", ref will be "copy" - # Reconstruct the original entity reference. - if not self.elementstack: return - self.elementstack[-1][2].append("&%(ref)s;" % locals()) - - def handle_data(self, text): - # called for each block of plain text, i.e. outside of any tag and - # not containing any character or entity references - if not self.elementstack: return - self.elementstack[-1][2].append(text) - - def handle_comment(self, text): - # called for each comment, e.g. - pass - - def handle_pi(self, text): - # called for each processing instruction, e.g. - pass - - def handle_decl(self, text): - # called for the DOCTYPE, if present, e.g. - # - pass - - _new_declname_match = re.compile(r'[a-zA-Z][-_.a-zA-Z0-9:]*\s*').match - def _scan_name(self, i, declstartpos): - rawdata = self.rawdata - n = len(rawdata) - if i == n: - return None, -1 - m = self._new_declname_match(rawdata, i) - if m: - s = m.group() - name = s.strip() - if (i + len(s)) == n: - return None, -1 # end of buffer - return string.lower(name), m.end() - else: - self.updatepos(declstartpos, i) - self.error("expected name token") - - def parse_declaration(self, i): - # override internal declaration handler to handle CDATA blocks - if self.rawdata[i:i+9] == '', i) - if k == -1: k = len(self.rawdata) - self.handle_data(cgi.escape(self.rawdata[i+9:k])) - return k+3 - return sgmllib.SGMLParser.parse_declaration(self, i) - -def open_resource(source, etag=None, modified=None, agent=None, referrer=None): - """ - URI, filename, or string --> stream - - This function lets you define parsers that take any input source - (URL, pathname to local or network file, or actual data as a string) - and deal with it in a uniform manner. Returned object is guaranteed - to have all the basic stdio read methods (read, readline, readlines). - Just .close() the object when you're done with it. - - If the etag argument is supplied, it will be used as the value of an - If-None-Match request header. - - If the modified argument is supplied, it must be a tuple of 9 integers - as returned by gmtime() in the standard Python time module. This MUST - be in GMT (Greenwich Mean Time). The formatted date/time will be used - as the value of an If-Modified-Since request header. - - If the agent argument is supplied, it will be used as the value of a - User-Agent request header. - - If the referrer argument is supplied, it will be used as the value of a - Referer[sic] request header. - - The optional arguments are only used if the source argument is an HTTP - URL and the urllib2 module is importable (i.e., you must be using Python - version 2.0 or higher). - """ - - if hasattr(source, "read"): - return source - - if source == "-": - return sys.stdin - - # try to open with urllib2 (to use optional headers) - try: - import urllib2 - request = urllib2.Request(source) - if etag: - request.add_header("If-None-Match", etag) - if modified: - request.add_header("If-Modified-Since", format_http_date(modified)) - if agent: - request.add_header("User-Agent", agent) - if referrer: - # http://www.dictionary.com/search?q=referer - request.add_header("Referer", referrer) - request.add_header("Accept-encoding", "gzip") - try: - return urllib2.urlopen(request) - except urllib2.HTTPError: - # either the resource is not modified or some other HTTP - # error occurred so return an empty resource - return StringIO.StringIO("") - except: - # source must not be a valid URL but it might be a valid filename - pass - except ImportError: - # urllib2 isn't available so try to open with urllib - try: - return urllib.urlopen(source) - except: - # source still might be a filename - pass - - # try to open with native open function (if source is a filename) - try: - return open(source) - except: - pass - - # treat source as string - return StringIO.StringIO(str(source)) - -def get_etag(resource): - """ - Get the ETag associated with a response returned from a call to - open_resource(). - - If the resource was not returned from an HTTP server or the server did - not specify an ETag for the resource, this will return None. - """ - - if hasattr(resource, "info"): - return resource.info().getheader("ETag") - return None - -def get_modified(resource): - """ - Get the Last-Modified timestamp for a response returned from a call to - open_resource(). - - If the resource was not returned from an HTTP server or the server did - not specify a Last-Modified timestamp, this function will return None. - Otherwise, it returns a tuple of 9 integers as returned by gmtime() in - the standard Python time module(). - """ - - if hasattr(resource, "info"): - last_modified = resource.info().getheader("Last-Modified") - if last_modified: - return parse_http_date(last_modified) - return None - -short_weekdays = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] -long_weekdays = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] -months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] - -def format_http_date(date): - """ - Formats a tuple of 9 integers into an RFC 1123-compliant timestamp as - required in RFC 2616. We don't use time.strftime() since the %a and %b - directives can be affected by the current locale (HTTP dates have to be - in English). The date MUST be in GMT (Greenwich Mean Time). - """ - - return "%s, %02d %s %04d %02d:%02d:%02d GMT" % (short_weekdays[date[6]], date[2], months[date[1] - 1], date[0], date[3], date[4], date[5]) - -rfc1123_match = re.compile(r"(?P[A-Z][a-z]{2}), (?P\d{2}) (?P[A-Z][a-z]{2}) (?P\d{4}) (?P\d{2}):(?P\d{2}):(?P\d{2}) GMT").match -rfc850_match = re.compile(r"(?P[A-Z][a-z]+), (?P\d{2})-(?P[A-Z][a-z]{2})-(?P\d{2}) (?P\d{2}):(?P\d{2}):(?P\d{2}) GMT").match -asctime_match = re.compile(r"(?P[A-Z][a-z]{2}) (?P[A-Z][a-z]{2}) ?(?P\d\d?) (?P\d{2}):(?P\d{2}):(?P\d{2}) (?P\d{4})").match - -def parse_http_date(date): - """ - Parses any of the three HTTP date formats into a tuple of 9 integers as - returned by time.gmtime(). This should not use time.strptime() since - that function is not available on all platforms and could also be - affected by the current locale. - """ - - date = str(date) - year = 0 - weekdays = short_weekdays - - m = rfc1123_match(date) - if not m: - m = rfc850_match(date) - if m: - year = 1900 - weekdays = long_weekdays - else: - m = asctime_match(date) - if not m: - return None - - try: - year = year + int(m.group("year")) - month = months.index(m.group("month")) + 1 - day = int(m.group("day")) - hour = int(m.group("hour")) - minute = int(m.group("minute")) - second = int(m.group("second")) - weekday = weekdays.index(m.group("weekday")) - a = int((14 - month) / 12) - julian_day = (day - 32045 + int(((153 * (month + (12 * a) - 3)) + 2) / 5) + int((146097 * (year + 4800 - a)) / 400)) - (int((146097 * (year + 4799)) / 400) - 31738) + 1 - daylight_savings_flag = 0 - return (year, month, day, hour, minute, second, weekday, julian_day, daylight_savings_flag) - except: - # the month or weekday lookup probably failed indicating an invalid timestamp - return None - -def parse(uri, etag=None, modified=None, agent=None, referrer=None): - r = RSSParser() - f = open_resource(uri, etag=etag, modified=modified, agent=agent, referrer=referrer) - data = f.read() - if hasattr(f, "headers"): - if f.headers.get('content-encoding', None) == 'gzip': - data = gzip.GzipFile(fileobj=StringIO.StringIO(data)).read() - r.feed(data) - result = {"channel": r.channel, "items": r.items} - newEtag = get_etag(f) - if newEtag: result["etag"] = newEtag - elif etag: result["etag"] = etag - newModified = get_modified(f) - if newModified: result["modified"] = newModified - elif modified: result["modified"] = modified - f.close() - return result - -TEST_SUITE = ('http://www.pocketsoap.com/rssTests/rss1.0withModules.xml', - 'http://www.pocketsoap.com/rssTests/rss1.0withModulesNoDefNS.xml', - 'http://www.pocketsoap.com/rssTests/rss1.0withModulesNoDefNSLocalNameClash.xml', - 'http://www.pocketsoap.com/rssTests/rss2.0noNSwithModules.xml', - 'http://www.pocketsoap.com/rssTests/rss2.0noNSwithModulesLocalNameClash.xml', - 'http://www.pocketsoap.com/rssTests/rss2.0NSwithModules.xml', - 'http://www.pocketsoap.com/rssTests/rss2.0NSwithModulesNoDefNS.xml', - 'http://www.pocketsoap.com/rssTests/rss2.0NSwithModulesNoDefNSLocalNameClash.xml') - -if __name__ == '__main__': - import sys - if sys.argv[1:]: - urls = sys.argv[1:] - else: - urls = TEST_SUITE - from pprint import pprint - for url in urls: - print url - print - result = parse(url) - pprint(result) - print -- 2.35.1