From be8e83def7db3a53281f374f8d9df41f59faac0b Mon Sep 17 00:00:00 2001 From: Adam Sampson Date: Sat, 8 Oct 2005 21:20:08 +0000 Subject: [PATCH] Basic support for threading. Rather messy -- roll on CSP for Python! --- NEWS | 8 ++++ PLUGINS | 17 +++++-- config | 7 +++ rawdoglib/rawdog.py | 107 +++++++++++++++++++++++++++++++++++++------- 4 files changed, 121 insertions(+), 18 deletions(-) diff --git a/NEWS b/NEWS index 3f15055..80273a2 100644 --- a/NEWS +++ b/NEWS @@ -44,6 +44,14 @@ Change the "Error parsing feed" message to "Error fetching or parsing feed", since it really just indicates an error somewhere within feedparser (reported by Fred Barnes). +Add support for using multiple threads when fetching feeds, which makes +updates go much faster if you've got lots of feeds. (The state-updating +part of the update is still done sequentially, since parallelising it +would mean adding lots of locking and making the code very messy.) To +use this, set "numthreads" to be greater than 0 in your config file. +Since it changes the semantics of one of the plugin hooks, it's off by +default. + - rawdog 2.4 Provide guid in item templates (suggested by Rick van Rein). diff --git a/PLUGINS b/PLUGINS index bae44c0..e0bb748 100644 --- a/PLUGINS +++ b/PLUGINS @@ -180,12 +180,23 @@ be used to add extra template parameters. * feed: the Feed about to be updated -Called before a feed is updated. This hook can be used to perform extra -actions before fetching a feed. +Called before a feed's content is fetched. This hook can be used to +perform extra actions before fetching a feed. Note that if `usethreads` +is set to a positive number in the config file, this hook may be called +from a worker thread. + +### mid_update_feed(rawdog, config, feed, content) + +* feed: the Feed being updated +* content: the feedparser output from the feed (may be None) + +Called after a feed's content has been fetched, but before rawdog's +internal state has been updated. This hook can be used to modify +feedparser's output. ### post_update_feed(rawdog, config, feed, seen_articles) -* feed: the Feed about to be updated +* feed: the Feed that has been updated * seen_articles: a boolean indicating whether any articles were read from the feed diff --git a/config b/config index b49209e..7801547 100644 --- a/config +++ b/config @@ -127,6 +127,13 @@ userefresh true # available.) showfeeds true +# The number of concurrent threads that rawdog will use when fetching +# feeds -- i.e. the number of feeds that rawdog will attempt to fetch at +# the same time. If you have a lot of feeds, setting this to be 20 or +# so will significantly speed up updates. If this is set to 0, rawdog +# will not use threads at all. +numthreads 0 + # The time that rawdog will wait before considering a feed unreachable # when trying to connect. If you're getting lots of timeout errors and # are on a slow connection, increase this. diff --git a/rawdoglib/rawdog.py b/rawdoglib/rawdog.py index 10c83d3..0e44acc 100644 --- a/rawdoglib/rawdog.py +++ b/rawdoglib/rawdog.py @@ -21,7 +21,7 @@ STATE_VERSION = 2 import feedparser, feedfinder, plugins from persister import Persistable, Persister import os, time, sha, getopt, sys, re, cgi, socket, urllib2, calendar -import string +import string, threading from StringIO import StringIO def set_socket_timeout(n): @@ -267,13 +267,9 @@ class Feed: return 0 else: return 1 - - def update(self, rawdog, now, config): - """Fetch articles from a feed and add them to the collection. - Returns True if any articles were read, False otherwise.""" - articles = rawdog.articles - handlers = [] + def fetch(self, rawdog, config): + """Fetch the current set of articles from the feed.""" class DummyPasswordMgr: def __init__(self, creds): @@ -283,6 +279,8 @@ class Feed: def find_user_password(self, realm, authuri): return self.creds + handlers = [] + if self.args.has_key("user") and self.args.has_key("password"): mgr = DummyPasswordMgr((self.args["user"], self.args["password"])) handlers.append(urllib2.HTTPBasicAuthHandler(mgr)) @@ -300,19 +298,24 @@ class Feed: plugins.call_hook("add_urllib2_handlers", rawdog, config, self, handlers) - feedparser._FeedParserMixin.can_contain_relative_uris = ["url"] - feedparser._FeedParserMixin.can_contain_dangerous_markup = [] try: - p = feedparser.parse(self.url, + return feedparser.parse(self.url, etag = self.etag, modified = self.modified, agent = "rawdog/" + VERSION, handlers = handlers) - status = p.get("status") except: - p = None - status = None + return None + def update(self, rawdog, now, config, p = None): + """Fetch articles from a feed and add them to the collection. + Returns True if any articles were read, False otherwise.""" + + if p is None: + p = self.fetch(rawdog, config) + status = None + if p is not None: + status = p.get("status") self.last_update = now error = None @@ -372,6 +375,7 @@ class Feed: self.feed_info = p["feed"] feed = self.url + articles = rawdog.articles seen = {} sequence = 0 @@ -562,6 +566,7 @@ class Config: def __init__(self): self.files_loaded = [] + self.loglock = threading.Lock() self.reset() def reset(self): @@ -592,6 +597,7 @@ class Config: "hideduplicates" : "", "newfeedperiod" : "3h", "changeconfig": 0, + "numthreads": 0, } def __getitem__(self, key): return self.config[key] @@ -702,6 +708,8 @@ class Config: self["newfeedperiod"] = l[1] elif l[0] == "changeconfig": self["changeconfig"] = parse_bool(l[1]) + elif l[0] == "numthreads": + self["numthreads"] = int(l[1]) elif l[0] == "include": self.load(l[1], False) elif plugins.call_hook("config_option_arglines", self, l[0], l[1], arglines): @@ -717,7 +725,9 @@ class Config: def log(self, *args): """If running in verbose mode, print a status message.""" if self["verbose"]: + self.loglock.acquire() print >>sys.stderr, "".join(map(str, args)) + self.loglock.release() def bug(self, *args): """Report detection of a bug in rawdog.""" @@ -770,6 +780,60 @@ class ChangeFeedEditor: line = line.replace(self.oldurl, self.newurl, 1) outputfile.write(line) +class FeedFetcher: + """Class that will handle fetching a set of feeds in parallel.""" + + def __init__(self, rawdog, feedlist, config): + self.rawdog = rawdog + self.config = config + self.lock = threading.Lock() + self.jobs = {} + for feed in feedlist: + self.jobs[feed] = 1 + self.results = {} + + def worker(self, num): + rawdog = self.rawdog + config = self.config + + config.log("Thread ", num, " starting") + while 1: + self.lock.acquire() + if self.jobs == {}: + job = None + else: + job = self.jobs.keys()[0] + del self.jobs[job] + self.lock.release() + if job is None: + break + + config.log("Thread ", num, " fetching feed: ", job) + feed = rawdog.feeds[job] + plugins.call_hook("pre_update_feed", rawdog, config, feed) + self.results[job] = feed.fetch(rawdog, config) + config.log("Thread ", num, " done") + + def run(self, numworkers): + self.config.log("Thread farm starting with ", len(self.jobs), " jobs") + workers = [] + for i in range(numworkers): + self.lock.acquire() + isempty = (self.jobs == {}) + self.lock.release() + if isempty: + # No jobs left in the queue -- don't bother + # starting any more workers. + break + + t = threading.Thread(target = self.worker, args = (i,)) + t.start() + workers.append(t) + for worker in workers: + worker.join() + self.config.log("Thread farm finished with ", len(self.results), " results") + return self.results + class Rawdog(Persistable): """The aggregator itself.""" @@ -866,6 +930,8 @@ class Rawdog(Persistable): config.log("Starting update") now = time.time() + feedparser._FeedParserMixin.can_contain_relative_uris = ["url"] + feedparser._FeedParserMixin.can_contain_dangerous_markup = [] set_socket_timeout(config["timeout"]) if feedurl is None: @@ -882,14 +948,25 @@ class Rawdog(Persistable): numfeeds = len(update_feeds) config.log("Will update ", numfeeds, " feeds") + if config["numthreads"] > 0: + fetcher = FeedFetcher(self, update_feeds, config) + prefetched = fetcher.run(config["numthreads"]) + else: + prefetched = {} + count = 0 seen_some_items = {} for url in update_feeds: count += 1 config.log("Updating feed ", count, " of " , numfeeds, ": ", url) feed = self.feeds[url] - plugins.call_hook("pre_update_feed", self, config, feed) - rc = feed.update(self, now, config) + if url in prefetched: + content = prefetched[url] + else: + plugins.call_hook("pre_update_feed", self, config, feed) + content = feed.fetch(self, config) + plugins.call_hook("mid_update_feed", self, config, feed, content) + rc = feed.update(self, now, config, content) plugins.call_hook("post_update_feed", self, config, feed, rc) if rc: seen_some_items[url] = 1 -- 2.35.1