lint
[Plinn.git] / catalog.py
1 # -*- coding: utf-8 -*-
2 from App.class_init import InitializeClass
3 from AccessControl import ClassSecurityInfo
4 from Products.CMFCore.interfaces import IIndexableObject
5 from Products.CMFCore.CatalogTool import CatalogTool as BaseCatalogTool
6 from Products.CMFCore.CatalogTool import IndexableObjectWrapper
7 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
8 from Products.CMFCore.permissions import ModifyPortalContent, ManagePortal
9 from zope.component import queryMultiAdapter
10 from Products.ZCatalog.Catalog import Catalog
11 import transaction
12 from solr import *
13
14 # imports for Catalog class
15 from Products.PluginIndexes.interfaces import ILimitedResultIndex
16 from Products.ZCatalog.Lazy import LazyMap, LazyCat, LazyValues
17 from BTrees.IIBTree import intersection, IISet
18 from BTrees.IIBTree import weightedIntersection
19 import warnings
20
21 _VOLATILE_SOLR_NAME = '_v_solrConnection'
22
23 class SolrTransactionHook :
24 ''' commit solr couplé sur le commit de la ZODB '''
25 def __init__(self, context, con) :
26 self.context = context
27 self.con = con
28
29 def __call__(self, status) :
30 if status :
31 self.con.commit()
32 self.con.close()
33 else :
34 self.con.close()
35 try :
36 delattr(self.context, _VOLATILE_SOLR_NAME)
37 except AttributeError :
38 pass
39
40 class CatalogTool(BaseCatalogTool) :
41 meta_type = 'Plinn Catalog'
42 security = ClassSecurityInfo()
43 manage_options = (BaseCatalogTool.manage_options[:5] +
44 ({'label' : 'Solr', 'action' : 'manage_solr'},) +
45 BaseCatalogTool.manage_options[5:])
46 manage_solr = PageTemplateFile('www/manage_solr.pt', globals(), __name__='manage_solr')
47
48
49
50 def __init__(self, idxs=[]) :
51 super(CatalogTool, self).__init__()
52 self._catalog = DelegatedCatalog(self)
53 self.solr_url = 'http://localhost:8983/solr'
54 self.delegatedIndexes = ('Title', 'Description', 'SearchableText')
55
56 security.declarePublic('getDelegatedIndexes')
57 def getDelegatedIndexes(self) :
58 """ read the method name """
59 return self.delegatedIndexes
60
61 security.declareProtected(ManagePortal, 'setDelegatedIndexes')
62 def setDelegatedIndexes(self, indexes, REQUEST=None) :
63 """setDelegatedIndexes documentation"""
64 self.delegatedIndexes = tuple([i.strip() for i in indexes if i.strip()])
65 if REQUEST :
66 REQUEST.RESPONSE.redirect(self.absolute_url() + '/manage_solr?manage_tabs_message=Saved changes.')
67
68 def _getSolrConnection(self) :
69 if not hasattr(self, _VOLATILE_SOLR_NAME) :
70 con = SolrConnection(self.solr_url)
71 setattr(self, _VOLATILE_SOLR_NAME, con)
72 txn = transaction.get()
73 txn.addAfterCommitHook(SolrTransactionHook(self, con))
74 return getattr(self, _VOLATILE_SOLR_NAME)
75
76 security.declarePrivate('solrAdd')
77 def solrAdd(self, object, idxs=[], uid=None) :
78 if IIndexableObject.providedBy(object):
79 w = object
80 else:
81 w = queryMultiAdapter( (object, self), IIndexableObject )
82 if w is None:
83 # BBB
84 w = IndexableObjectWrapper(object, self)
85
86 uid = uid if uid else self.__url(object)
87 idxs = idxs if idxs !=[] else self.delegatedIndexes
88 data = {'id' : uid}
89 for name in idxs :
90 attr = getattr(w, name, '')
91 data[name] = attr() if callable(attr) else attr
92 c = self._getSolrConnection()
93 c.add(**data)
94
95
96 # PortalCatalog api overloads
97 security.declareProtected(ModifyPortalContent, 'indexObject')
98 def indexObject(self, object) :
99 """ Add to catalog and send to Solr """
100 super(CatalogTool, self).indexObject(object)
101 self.solrAdd(object)
102
103 security.declarePrivate('reindexObject')
104 def reindexObject(self, object, idxs=[], update_metadata=1, uid=None):
105 super(CatalogTool, self).reindexObject(object,
106 idxs=idxs,
107 update_metadata=update_metadata,
108 uid=uid)
109 if idxs != []:
110 # Filter out invalid indexes.
111 valid_indexes = self._catalog.indexes.keys()
112 idxs = [i for i in idxs if i in valid_indexes and i in self.delegatedIndexes]
113 else :
114 idxs = self.delegatedIndexes
115
116 if idxs :
117 self.solrAdd(object, idxs=idxs, uid=uid)
118
119 security.declarePrivate('unindexObject')
120 def unindexObject(self, object):
121 """Remove from catalog.
122 """
123 super(CatalogTool, self).unindexObject(object)
124 c = self._getSolrConnection()
125 url = self.__url(object)
126 c.delete(id=url)
127
128 InitializeClass(CatalogTool)
129
130
131 class DelegatedCatalog(Catalog) :
132 '''C'est ici qu'on délègue effectivement à Solr '''
133
134 def __init__(self, zcat, brains=None) :
135 Catalog.__init__(self, brains=brains)
136 self.zcat = zcat
137
138 def delegateSearch(self, query, plan) :
139 '''
140 retours faux :
141 None signifie : pas de délégation, il faut continuer à interroger les autres index.
142 IISet() vide : pas de résultat lors de la délégation, on peut arrêter la recherche.
143 '''
144 indexes = set(query.keys()).intersection(set(self.zcat.delegatedIndexes))
145 if not indexes :
146 return None
147 delegatedQuery = {}
148 for i in indexes :
149 delegatedQuery[i] = query.pop(i)
150 try : plan.remove(i)
151 except ValueError : pass
152 c = SolrConnection(self.zcat.solr_url)
153 q =' AND '.join(['%s:"%s"' % item for item in delegatedQuery.items()])
154 resp = c.query(q, fields='id', rows=len(self))
155 c.close()
156 return IISet(filter(None, [self.uids.get(r['id']) for r in resp.results]))
157
158 def search(self, query, sort_index=None, reverse=0, limit=None, merge=1):
159 """Iterate through the indexes, applying the query to each one. If
160 merge is true then return a lazy result set (sorted if appropriate)
161 otherwise return the raw (possibly scored) results for later merging.
162 Limit is used in conjuntion with sorting or scored results to inform
163 the catalog how many results you are really interested in. The catalog
164 can then use optimizations to save time and memory. The number of
165 results is not guaranteed to fall within the limit however, you should
166 still slice or batch the results as usual."""
167
168 rs = None # resultset
169
170 # Indexes fulfill a fairly large contract here. We hand each
171 # index the query mapping we are given (which may be composed
172 # of some combination of web request, kw mappings or plain old dicts)
173 # and the index decides what to do with it. If the index finds work
174 # for itself in the query, it returns the results and a tuple of
175 # the attributes that were used. If the index finds nothing for it
176 # to do then it returns None.
177
178 # Canonicalize the request into a sensible query before passing it on
179 query = self.make_query(query)
180
181 cr = self.getCatalogPlan(query)
182 cr.start()
183
184 plan = cr.plan()
185 if not plan:
186 plan = self._sorted_search_indexes(query)
187
188 # délégation
189 rs = self.delegateSearch(query, plan)
190 if rs is not None and not rs :
191 return LazyCat([])
192
193 indexes = self.indexes.keys()
194 for i in plan:
195 if i not in indexes:
196 # We can have bogus keys or the plan can contain index names
197 # that have been removed in the meantime
198 continue
199
200 index = self.getIndex(i)
201 _apply_index = getattr(index, "_apply_index", None)
202 if _apply_index is None:
203 continue
204
205 cr.start_split(i)
206 limit_result = ILimitedResultIndex.providedBy(index)
207 if limit_result:
208 r = _apply_index(query, rs)
209 else:
210 r = _apply_index(query)
211
212 if r is not None:
213 r, u = r
214 # Short circuit if empty result
215 # BBB: We can remove the "r is not None" check in Zope 2.14
216 # once we don't need to support the "return everything" case
217 # anymore
218 if r is not None and not r:
219 cr.stop_split(i, result=None, limit=limit_result)
220 return LazyCat([])
221
222 # provide detailed info about the pure intersection time
223 intersect_id = i + '#intersection'
224 cr.start_split(intersect_id)
225 # weightedIntersection preserves the values from any mappings
226 # we get, as some indexes don't return simple sets
227 if hasattr(rs, 'items') or hasattr(r, 'items'):
228 _, rs = weightedIntersection(rs, r)
229 else:
230 rs = intersection(rs, r)
231
232 cr.stop_split(intersect_id)
233
234 # consider the time it takes to intersect the index result with
235 # the total resultset to be part of the index time
236 cr.stop_split(i, result=r, limit=limit_result)
237 if not rs:
238 break
239 else:
240 cr.stop_split(i, result=None, limit=limit_result)
241
242 # Try to deduce the sort limit from batching arguments
243 b_start = int(query.get('b_start', 0))
244 b_size = query.get('b_size', None)
245 if b_size is not None:
246 b_size = int(b_size)
247
248 if b_size is not None:
249 limit = b_start + b_size
250 elif limit and b_size is None:
251 b_size = limit
252
253 if rs is None:
254 # None of the indexes found anything to do with the query
255 # We take this to mean that the query was empty (an empty filter)
256 # and so we return everything in the catalog
257 warnings.warn('Your query %s produced no query restriction. '
258 'Currently the entire catalog content is returned. '
259 'In Zope 2.14 this will result in an empty LazyCat '
260 'to be returned.' % repr(cr.make_key(query)),
261 DeprecationWarning, stacklevel=3)
262
263 rlen = len(self)
264 if sort_index is None:
265 sequence, slen = self._limit_sequence(self.data.items(), rlen,
266 b_start, b_size)
267 result = LazyMap(self.instantiate, sequence, slen,
268 actual_result_count=rlen)
269 else:
270 cr.start_split('sort_on')
271 result = self.sortResults(
272 self.data, sort_index, reverse, limit, merge,
273 actual_result_count=rlen, b_start=b_start,
274 b_size=b_size)
275 cr.stop_split('sort_on', None)
276 elif rs:
277 # We got some results from the indexes.
278 # Sort and convert to sequences.
279 # XXX: The check for 'values' is really stupid since we call
280 # items() and *not* values()
281 rlen = len(rs)
282 if sort_index is None and hasattr(rs, 'items'):
283 # having a 'items' means we have a data structure with
284 # scores. Build a new result set, sort it by score, reverse
285 # it, compute the normalized score, and Lazify it.
286
287 if not merge:
288 # Don't bother to sort here, return a list of
289 # three tuples to be passed later to mergeResults
290 # note that data_record_normalized_score_ cannot be
291 # calculated and will always be 1 in this case
292 getitem = self.__getitem__
293 result = [(score, (1, score, rid), getitem)
294 for rid, score in rs.items()]
295 else:
296 cr.start_split('sort_on')
297
298 rs = rs.byValue(0) # sort it by score
299 max = float(rs[0][0])
300
301 # Here we define our getter function inline so that
302 # we can conveniently store the max value as a default arg
303 # and make the normalized score computation lazy
304 def getScoredResult(item, max=max, self=self):
305 """
306 Returns instances of self._v_brains, or whatever is
307 passed into self.useBrains.
308 """
309 score, key = item
310 r=self._v_result_class(self.data[key])\
311 .__of__(aq_parent(self))
312 r.data_record_id_ = key
313 r.data_record_score_ = score
314 r.data_record_normalized_score_ = int(100. * score / max)
315 return r
316
317 sequence, slen = self._limit_sequence(rs, rlen, b_start,
318 b_size)
319 result = LazyMap(getScoredResult, sequence, slen,
320 actual_result_count=rlen)
321 cr.stop_split('sort_on', None)
322
323 elif sort_index is None and not hasattr(rs, 'values'):
324 # no scores
325 if hasattr(rs, 'keys'):
326 rs = rs.keys()
327 sequence, slen = self._limit_sequence(rs, rlen, b_start,
328 b_size)
329 result = LazyMap(self.__getitem__, sequence, slen,
330 actual_result_count=rlen)
331 else:
332 # sort. If there are scores, then this block is not
333 # reached, therefore 'sort-on' does not happen in the
334 # context of a text index query. This should probably
335 # sort by relevance first, then the 'sort-on' attribute.
336 cr.start_split('sort_on')
337 result = self.sortResults(rs, sort_index, reverse, limit,
338 merge, actual_result_count=rlen, b_start=b_start,
339 b_size=b_size)
340 cr.stop_split('sort_on', None)
341 else:
342 # Empty result set
343 result = LazyCat([])
344 cr.stop()
345 return result