import feedparser, feedfinder, plugins
from persister import Persistable, Persister
import os, time, sha, getopt, sys, re, cgi, socket, urllib2, calendar
-import string
+import string, threading
from StringIO import StringIO
def set_socket_timeout(n):
return 0
else:
return 1
-
- def update(self, rawdog, now, config):
- """Fetch articles from a feed and add them to the collection.
- Returns True if any articles were read, False otherwise."""
- articles = rawdog.articles
- handlers = []
+ def fetch(self, rawdog, config):
+ """Fetch the current set of articles from the feed."""
class DummyPasswordMgr:
def __init__(self, creds):
def find_user_password(self, realm, authuri):
return self.creds
+ handlers = []
+
if self.args.has_key("user") and self.args.has_key("password"):
mgr = DummyPasswordMgr((self.args["user"], self.args["password"]))
handlers.append(urllib2.HTTPBasicAuthHandler(mgr))
plugins.call_hook("add_urllib2_handlers", rawdog, config, self, handlers)
- feedparser._FeedParserMixin.can_contain_relative_uris = ["url"]
- feedparser._FeedParserMixin.can_contain_dangerous_markup = []
try:
- p = feedparser.parse(self.url,
+ return feedparser.parse(self.url,
etag = self.etag,
modified = self.modified,
agent = "rawdog/" + VERSION,
handlers = handlers)
- status = p.get("status")
except:
- p = None
- status = None
+ return None
+ def update(self, rawdog, now, config, p = None):
+ """Fetch articles from a feed and add them to the collection.
+ Returns True if any articles were read, False otherwise."""
+
+ if p is None:
+ p = self.fetch(rawdog, config)
+ status = None
+ if p is not None:
+ status = p.get("status")
self.last_update = now
error = None
self.feed_info = p["feed"]
feed = self.url
+ articles = rawdog.articles
seen = {}
sequence = 0
def __init__(self):
self.files_loaded = []
+ self.loglock = threading.Lock()
self.reset()
def reset(self):
"hideduplicates" : "",
"newfeedperiod" : "3h",
"changeconfig": 0,
+ "numthreads": 0,
}
def __getitem__(self, key): return self.config[key]
self["newfeedperiod"] = l[1]
elif l[0] == "changeconfig":
self["changeconfig"] = parse_bool(l[1])
+ elif l[0] == "numthreads":
+ self["numthreads"] = int(l[1])
elif l[0] == "include":
self.load(l[1], False)
elif plugins.call_hook("config_option_arglines", self, l[0], l[1], arglines):
def log(self, *args):
"""If running in verbose mode, print a status message."""
if self["verbose"]:
+ self.loglock.acquire()
print >>sys.stderr, "".join(map(str, args))
+ self.loglock.release()
def bug(self, *args):
"""Report detection of a bug in rawdog."""
line = line.replace(self.oldurl, self.newurl, 1)
outputfile.write(line)
+class FeedFetcher:
+ """Class that will handle fetching a set of feeds in parallel."""
+
+ def __init__(self, rawdog, feedlist, config):
+ self.rawdog = rawdog
+ self.config = config
+ self.lock = threading.Lock()
+ self.jobs = {}
+ for feed in feedlist:
+ self.jobs[feed] = 1
+ self.results = {}
+
+ def worker(self, num):
+ rawdog = self.rawdog
+ config = self.config
+
+ config.log("Thread ", num, " starting")
+ while 1:
+ self.lock.acquire()
+ if self.jobs == {}:
+ job = None
+ else:
+ job = self.jobs.keys()[0]
+ del self.jobs[job]
+ self.lock.release()
+ if job is None:
+ break
+
+ config.log("Thread ", num, " fetching feed: ", job)
+ feed = rawdog.feeds[job]
+ plugins.call_hook("pre_update_feed", rawdog, config, feed)
+ self.results[job] = feed.fetch(rawdog, config)
+ config.log("Thread ", num, " done")
+
+ def run(self, numworkers):
+ self.config.log("Thread farm starting with ", len(self.jobs), " jobs")
+ workers = []
+ for i in range(numworkers):
+ self.lock.acquire()
+ isempty = (self.jobs == {})
+ self.lock.release()
+ if isempty:
+ # No jobs left in the queue -- don't bother
+ # starting any more workers.
+ break
+
+ t = threading.Thread(target = self.worker, args = (i,))
+ t.start()
+ workers.append(t)
+ for worker in workers:
+ worker.join()
+ self.config.log("Thread farm finished with ", len(self.results), " results")
+ return self.results
+
class Rawdog(Persistable):
"""The aggregator itself."""
config.log("Starting update")
now = time.time()
+ feedparser._FeedParserMixin.can_contain_relative_uris = ["url"]
+ feedparser._FeedParserMixin.can_contain_dangerous_markup = []
set_socket_timeout(config["timeout"])
if feedurl is None:
numfeeds = len(update_feeds)
config.log("Will update ", numfeeds, " feeds")
+ if config["numthreads"] > 0:
+ fetcher = FeedFetcher(self, update_feeds, config)
+ prefetched = fetcher.run(config["numthreads"])
+ else:
+ prefetched = {}
+
count = 0
seen_some_items = {}
for url in update_feeds:
count += 1
config.log("Updating feed ", count, " of " , numfeeds, ": ", url)
feed = self.feeds[url]
- plugins.call_hook("pre_update_feed", self, config, feed)
- rc = feed.update(self, now, config)
+ if url in prefetched:
+ content = prefetched[url]
+ else:
+ plugins.call_hook("pre_update_feed", self, config, feed)
+ content = feed.fetch(self, config)
+ plugins.call_hook("mid_update_feed", self, config, feed, content)
+ rc = feed.update(self, now, config, content)
plugins.call_hook("post_update_feed", self, config, feed, rc)
if rc:
seen_some_items[url] = 1