Basic support for threading. Rather messy -- roll on CSP for Python! v2.5rc1
authorAdam Sampson <ats@offog.org>
Sat, 8 Oct 2005 21:20:08 +0000 (21:20 +0000)
committerAdam Sampson <ats@offog.org>
Sat, 8 Oct 2005 21:20:08 +0000 (21:20 +0000)
NEWS
PLUGINS
config
rawdoglib/rawdog.py

diff --git a/NEWS b/NEWS
index 3f1505567ba5a188f6d079e8db1762ddda64cbec..80273a2d6926e27c2b8a46693715399dc9b046b1 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -44,6 +44,14 @@ Change the "Error parsing feed" message to "Error fetching or parsing
 feed", since it really just indicates an error somewhere within
 feedparser (reported by Fred Barnes).
 
+Add support for using multiple threads when fetching feeds, which makes
+updates go much faster if you've got lots of feeds. (The state-updating
+part of the update is still done sequentially, since parallelising it
+would mean adding lots of locking and making the code very messy.) To
+use this, set "numthreads" to be greater than 0 in your config file.
+Since it changes the semantics of one of the plugin hooks, it's off by
+default.
+
 - rawdog 2.4
 
 Provide guid in item templates (suggested by Rick van Rein).
diff --git a/PLUGINS b/PLUGINS
index bae44c0e832861291c7e8be74bf6003bb9530e31..e0bb7486818755ee36d7a034ece8b0b3d506b047 100644 (file)
--- a/PLUGINS
+++ b/PLUGINS
@@ -180,12 +180,23 @@ be used to add extra template parameters.
 
 * feed: the Feed about to be updated
 
-Called before a feed is updated. This hook can be used to perform extra
-actions before fetching a feed.
+Called before a feed's content is fetched. This hook can be used to
+perform extra actions before fetching a feed. Note that if `usethreads`
+is set to a positive number in the config file, this hook may be called
+from a worker thread.
+
+### mid_update_feed(rawdog, config, feed, content)
+
+* feed: the Feed being updated
+* content: the feedparser output from the feed (may be None)
+
+Called after a feed's content has been fetched, but before rawdog's
+internal state has been updated. This hook can be used to modify
+feedparser's output.
 
 ### post_update_feed(rawdog, config, feed, seen_articles)
 
-* feed: the Feed about to be updated
+* feed: the Feed that has been updated
 * seen_articles: a boolean indicating whether any articles were read
   from the feed
 
diff --git a/config b/config
index b49209e3e8214b034d9b92b44d577a19a5478d9a..7801547230333ec736842381c7a958691c737db8 100644 (file)
--- a/config
+++ b/config
@@ -127,6 +127,13 @@ userefresh true
 # available.)
 showfeeds true
 
+# The number of concurrent threads that rawdog will use when fetching
+# feeds -- i.e. the number of feeds that rawdog will attempt to fetch at
+# the same time.  If you have a lot of feeds, setting this to be 20 or
+# so will significantly speed up updates. If this is set to 0, rawdog
+# will not use threads at all.
+numthreads 0
+
 # The time that rawdog will wait before considering a feed unreachable
 # when trying to connect. If you're getting lots of timeout errors and
 # are on a slow connection, increase this.
index 10c83d30cb7acb97ddffba4e8acf01d62d06d2c8..0e44accff9333473b1bea65dc25c3c44292fc7ed 100644 (file)
@@ -21,7 +21,7 @@ STATE_VERSION = 2
 import feedparser, feedfinder, plugins
 from persister import Persistable, Persister
 import os, time, sha, getopt, sys, re, cgi, socket, urllib2, calendar
-import string
+import string, threading
 from StringIO import StringIO
 
 def set_socket_timeout(n):
@@ -267,13 +267,9 @@ class Feed:
                        return 0
                else:
                        return 1
-       
-       def update(self, rawdog, now, config):
-               """Fetch articles from a feed and add them to the collection.
-               Returns True if any articles were read, False otherwise."""
 
-               articles = rawdog.articles
-               handlers = []
+       def fetch(self, rawdog, config):
+               """Fetch the current set of articles from the feed."""
 
                class DummyPasswordMgr:
                        def __init__(self, creds):
@@ -283,6 +279,8 @@ class Feed:
                        def find_user_password(self, realm, authuri):
                                return self.creds
 
+               handlers = []
+
                if self.args.has_key("user") and self.args.has_key("password"):
                        mgr = DummyPasswordMgr((self.args["user"], self.args["password"]))
                        handlers.append(urllib2.HTTPBasicAuthHandler(mgr))
@@ -300,19 +298,24 @@ class Feed:
 
                plugins.call_hook("add_urllib2_handlers", rawdog, config, self, handlers)
 
-               feedparser._FeedParserMixin.can_contain_relative_uris = ["url"]
-               feedparser._FeedParserMixin.can_contain_dangerous_markup = []
                try:
-                       p = feedparser.parse(self.url,
+                       return feedparser.parse(self.url,
                                etag = self.etag,
                                modified = self.modified,
                                agent = "rawdog/" + VERSION,
                                handlers = handlers)
-                       status = p.get("status")
                except:
-                       p = None
-                       status = None
+                       return None
 
+       def update(self, rawdog, now, config, p = None):
+               """Fetch articles from a feed and add them to the collection.
+               Returns True if any articles were read, False otherwise."""
+
+               if p is None:
+                       p = self.fetch(rawdog, config)
+               status = None
+               if p is not None:
+                       status = p.get("status")
                self.last_update = now
 
                error = None
@@ -372,6 +375,7 @@ class Feed:
 
                self.feed_info = p["feed"]
                feed = self.url
+               articles = rawdog.articles
 
                seen = {}
                sequence = 0
@@ -562,6 +566,7 @@ class Config:
 
        def __init__(self):
                self.files_loaded = []
+               self.loglock = threading.Lock()
                self.reset()
 
        def reset(self):
@@ -592,6 +597,7 @@ class Config:
                        "hideduplicates" : "",
                        "newfeedperiod" : "3h",
                        "changeconfig": 0,
+                       "numthreads": 0,
                        }
 
        def __getitem__(self, key): return self.config[key]
@@ -702,6 +708,8 @@ class Config:
                        self["newfeedperiod"] = l[1]
                elif l[0] == "changeconfig":
                        self["changeconfig"] = parse_bool(l[1])
+               elif l[0] == "numthreads":
+                       self["numthreads"] = int(l[1])
                elif l[0] == "include":
                        self.load(l[1], False)
                elif plugins.call_hook("config_option_arglines", self, l[0], l[1], arglines):
@@ -717,7 +725,9 @@ class Config:
        def log(self, *args):
                """If running in verbose mode, print a status message."""
                if self["verbose"]:
+                       self.loglock.acquire()
                        print >>sys.stderr, "".join(map(str, args))
+                       self.loglock.release()
 
        def bug(self, *args):
                """Report detection of a bug in rawdog."""
@@ -770,6 +780,60 @@ class ChangeFeedEditor:
                                line = line.replace(self.oldurl, self.newurl, 1)
                        outputfile.write(line)
 
+class FeedFetcher:
+       """Class that will handle fetching a set of feeds in parallel."""
+
+       def __init__(self, rawdog, feedlist, config):
+               self.rawdog = rawdog
+               self.config = config
+               self.lock = threading.Lock()
+               self.jobs = {}
+               for feed in feedlist:
+                       self.jobs[feed] = 1
+               self.results = {}
+
+       def worker(self, num):
+               rawdog = self.rawdog
+               config = self.config
+
+               config.log("Thread ", num, " starting")
+               while 1:
+                       self.lock.acquire()
+                       if self.jobs == {}:
+                               job = None
+                       else:
+                               job = self.jobs.keys()[0]
+                               del self.jobs[job]
+                       self.lock.release()
+                       if job is None:
+                               break
+
+                       config.log("Thread ", num, " fetching feed: ", job)
+                       feed = rawdog.feeds[job]
+                       plugins.call_hook("pre_update_feed", rawdog, config, feed)
+                       self.results[job] = feed.fetch(rawdog, config)
+               config.log("Thread ", num, " done")
+
+       def run(self, numworkers):
+               self.config.log("Thread farm starting with ", len(self.jobs), " jobs")
+               workers = []
+               for i in range(numworkers):
+                       self.lock.acquire()
+                       isempty = (self.jobs == {})
+                       self.lock.release()
+                       if isempty:
+                               # No jobs left in the queue -- don't bother
+                               # starting any more workers.
+                               break
+
+                       t = threading.Thread(target = self.worker, args = (i,))
+                       t.start()
+                       workers.append(t)
+               for worker in workers:
+                       worker.join()
+               self.config.log("Thread farm finished with ", len(self.results), " results")
+               return self.results
+
 class Rawdog(Persistable):
        """The aggregator itself."""
 
@@ -866,6 +930,8 @@ class Rawdog(Persistable):
                config.log("Starting update")
                now = time.time()
 
+               feedparser._FeedParserMixin.can_contain_relative_uris = ["url"]
+               feedparser._FeedParserMixin.can_contain_dangerous_markup = []
                set_socket_timeout(config["timeout"])
 
                if feedurl is None:
@@ -882,14 +948,25 @@ class Rawdog(Persistable):
                numfeeds = len(update_feeds)
                config.log("Will update ", numfeeds, " feeds")
 
+               if config["numthreads"] > 0:
+                       fetcher = FeedFetcher(self, update_feeds, config)
+                       prefetched = fetcher.run(config["numthreads"])
+               else:
+                       prefetched = {}
+
                count = 0
                seen_some_items = {}
                for url in update_feeds:
                        count += 1
                        config.log("Updating feed ", count, " of " , numfeeds, ": ", url)
                        feed = self.feeds[url]
-                       plugins.call_hook("pre_update_feed", self, config, feed)
-                       rc = feed.update(self, now, config)
+                       if url in prefetched:
+                               content = prefetched[url]
+                       else:
+                               plugins.call_hook("pre_update_feed", self, config, feed)
+                               content = feed.fetch(self, config)
+                       plugins.call_hook("mid_update_feed", self, config, feed, content)
+                       rc = feed.update(self, now, config, content)
                        plugins.call_hook("post_update_feed", self, config, feed, rc)
                        if rc:
                                seen_some_items[url] = 1