Does appengine-mapreduce have a limit on operations?

masterofskull

New Member
I am working on a project that requires a big knowledgebase to be constructed based on word co-occurrences in text. As I have researched, a similar approach has not been tried in appengine. I would like to use appengine's flexibility and scalability, to be able to serve the knowledgebase and do reasoning on it to a wide scale of users.So far I have come up with a mapreduce implementation based on the demo app for the pipeline. The source texts are stored in in the blobstore as zipped files containing one xml document, each containing a variable number of articles (as much as 30000).The first step was to adapt the current \[code\]BlobstoreZipLineInputReader\[/code\], so that it parses the xml file, retrieving the relevant information from it. The XMLParser class uses the lxml iterparse approach to retrieve the xml elements to process from http://www.ibm.com/developerworks/xml/library/x-hiperfparse/, and returns an iterator.The modified class \[code\]BlobstoreXMLZipLineInputReader\[/code\] has a slightly different \[code\]next\[/code\] function:\[code\]def next(self): if not self._filestream: if not self._zip: self._zip = zipfile.ZipFile(self._reader(self._blob_key)) self._entries = self._zip.infolist()[self._start_file_index: self._end_file_index] self._entries.reverse() if not self._entries: raise StopIteration() entry = self._entries.pop() parser = XMLParser() # the result here is an iterator with the individual articles self._filestream = parser.parseXML(self._zip.open(entry.filename)) try: article = self._filestream.next() self._article_index += 1 except StopIteration: article = None if not article: self._filestream.close() self._filestream = None self._start_file_index += 1 self._initial_offset = 0 return self.next() return ((self._blob_key, self._start_file_index, self._article_index), article)\[/code\]The map function will then receive each of these articles, split by sentences, and then split by words:\[code\]def map_function(data): """Word count map function.""" (entry, article) = data for s in split_into_sentences(article.body): for w in split_into_words(s.lower()): if w not in STOPWORDS: yield (w, article.id)\[/code\]And the reducer aggregates words, and joins the ids for the articles they appear on:\[code\]def reduce_function(key, values): """Word count reduce function.""" yield "%s: %s\n" % (key, list(set(values)))\[/code\]This works beautifully on both the dev server and the live setup up to around 10000 texts (there are not that many words on them). It generally takes no more than 10 seconds. The problem is when it goes a bit over that, and mapreduce seems to hang processing the job continuously. The number of processed items per shard just increments, and my write op limits are soon reached. Q1. Is there somehow a limit in how many map operations the mapreduce pipeline can do before it starts "behaving badly"?
Q2. Would there be a better approach to my problem?
Q3. I know this has been asked before, but can I circumvent the temporary mapreduce datastore writes? they're killing me...
P.S.: here's my main mapreduce call:\[code\]class XMLArticlePipeline(base_handler.PipelineBase): def run(self, filekey, blobkey): output = yield mapreduce_pipeline.MapreducePipeline( "process_xml", "backend.build_knowledgebase.map_function", "backend.build_knowledgebase.reduce_function", "backend.build_knowledgebase.BlobstoreXMLZipLineInputReader", "mapreduce.output_writers.BlobstoreOutputWriter", mapper_params={ "blob_keys": [blobkey], }, reducer_params={ "mime_type": "text/plain", }, shards=12) yield StoreOutput(filekey, output)\[/code\]EDIT.: I get some weird errors in the dev server when running a neverending job:\[code\][App Instance] [0] [dev_appserver_multiprocess.py:821] INFO Exception in HandleRequestThreadTraceback (most recent call last): File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver_multiprocess.py", line 819, in run HandleRequestDirectly(request, client_address) File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver_multiprocess.py", line 957, in HandleRequestDirectly HttpServer(), request, client_address) File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 310, in process_request self.finish_request(request, client_address) File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 323, in finish_request self.RequestHandlerClass(request, client_address, self) File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver.py", line 2579, in __init__ BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 641, in __init__ self.finish() File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 694, in finish self.wfile.flush() File "/usr/local/Cellar/python/2.7.2/lib/python2.7/socket.py", line 303, in flush self._sock.sendall(view[write_offset:write_offset+buffer_size])error: [Errno 32] Broken pipe\[/code\]
 
Back
Top