Délégation effective de la recherche à Solr.
[Plinn.git] / catalog.py
1 # -*- coding: utf-8 -*-
2 from App.class_init import InitializeClass
3 from AccessControl import ClassSecurityInfo
4 from Products.CMFCore.interfaces import IIndexableObject
5 from Products.CMFCore.CatalogTool import CatalogTool as BaseCatalogTool
6 from Products.CMFCore.CatalogTool import IndexableObjectWrapper
7 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
8 from Products.CMFCore.permissions import ModifyPortalContent
9 from zope.component import queryMultiAdapter
10 from Products.ZCatalog.Catalog import Catalog
11 import transaction
12 from solr import *
13
14 # imports for Catalog class
15 from Products.PluginIndexes.interfaces import ILimitedResultIndex
16 from Products.ZCatalog.Lazy import LazyMap, LazyCat, LazyValues
17 from BTrees.IIBTree import intersection, IISet
18 from BTrees.IIBTree import weightedIntersection
19 import warnings
20
21 class SolrTransactionHook :
22 ''' commit solr couplé sur le commit de la ZODB '''
23 def __init__(self, connection) :
24 self.connection = connection
25
26 def __call__(self, status) :
27 if status :
28 self.connection.commit()
29 self.connection.close()
30 else :
31 self.connection.close()
32
33 class CatalogTool(BaseCatalogTool) :
34 meta_type = 'Legivoc Catalog'
35 security = ClassSecurityInfo()
36 manage_options = (BaseCatalogTool.manage_options[:5] +
37 ({'label' : 'Solr', 'action' : 'manage_solr'},) +
38 BaseCatalogTool.manage_options[5:])
39 manage_solr = PageTemplateFile('www/manage_solr', globals())
40
41
42 def __init__(self, idxs=[]) :
43 super(CatalogTool, self).__init__()
44 self._catalog = DelegatedCatalog(self)
45 self.solr_url = 'http://localhost:8983/solr'
46 self.delegatedIndexes = ('Title', 'Description', 'SearchableText')
47
48 security.declarePrivate('solrAdd')
49 def solrAdd(self, object, idxs=[], uid=None) :
50 if IIndexableObject.providedBy(object):
51 w = object
52 else:
53 w = queryMultiAdapter( (object, self), IIndexableObject )
54 if w is None:
55 # BBB
56 w = IndexableObjectWrapper(object, self)
57
58 uid = uid if uid else self.__url(object)
59 idxs = idxs if idxs !=[] else self.delegatedIndexes
60 data = {'id' : uid}
61 for name in idxs :
62 attr = getattr(w, name, '')
63 data[name] = attr() if callable(attr) else attr
64 c = SolrConnection(self.solr_url)
65 c.add(**data)
66 txn = transaction.get()
67 txn.addAfterCommitHook(SolrTransactionHook(c))
68
69
70 # PortalCatalog api overloads
71 security.declareProtected(ModifyPortalContent, 'indexObject')
72 def indexObject(self, object) :
73 """ Add to catalog and send to Solr """
74 super(CatalogTool, self).indexObject(object)
75 self.solrAdd(object)
76
77 security.declarePrivate('reindexObject')
78 def reindexObject(self, object, idxs=[], update_metadata=1, uid=None):
79 super(CatalogTool, self).reindexObject(object,
80 idxs=idxs,
81 update_metadata=update_metadata,
82 uid=uid)
83 if idxs != []:
84 # Filter out invalid indexes.
85 valid_indexes = self._catalog.indexes.keys()
86 idxs = [i for i in idxs if i in valid_indexes and i in self.delegatedIndexes]
87 else :
88 idxs = self.delegatedIndexes
89
90 if idxs :
91 self.solrAdd(object, idxs=idxs, uid=uid)
92
93 security.declarePrivate('unindexObject')
94 def unindexObject(self, object):
95 """Remove from catalog.
96 """
97 super(CatalogTool, self).unindexObject(object)
98 c = SolrConnection(self.solr_url)
99 url = self.__url(object)
100 c.delete(id=url)
101 txn = transaction.get()
102 txn.addAfterCommitHook(SolrTransactionHook(c))
103
104 InitializeClass(CatalogTool)
105
106
107 class DelegatedCatalog(Catalog) :
108 '''C'est ici qu'on délègue effectivement à Solr '''
109
110 def __init__(self, zcat, brains=None) :
111 Catalog.__init__(self, brains=brains)
112 self.zcat = zcat
113
114 def getDelegatedIndexes(self) :
115 return ('Title', 'Description', 'SearchableText') # <= TODO virer cette ligne
116 return self.zcat.delegatedIndexes
117
118 def delegateSearch(self, query, plan) :
119 '''
120 retours faux :
121 None signifie : pas de délégation, il faut continue à interroger les autres index
122 IISet() vide : pas de résultat lors de la délégation, on peut arrêter la recherche.
123 '''
124 indexes = set(plan).intersection(set(self.getDelegatedIndexes()))
125 delegatedQuery = {}
126 for i in indexes :
127 delegatedQuery[i] = query.pop(i)
128 plan.remove(i)
129 if not delegatedQuery :
130 return None
131 c = SolrConnection('http://localhost:8983/solr')
132 q =' AND '.join(['%s:"%s"' % item for item in delegatedQuery.items()])
133 resp = c.query(q, fields='id')
134 return IISet(filter(None, [self.uids.get(r['id']) for r in resp.results]))
135
136 def search(self, query, sort_index=None, reverse=0, limit=None, merge=1):
137 """Iterate through the indexes, applying the query to each one. If
138 merge is true then return a lazy result set (sorted if appropriate)
139 otherwise return the raw (possibly scored) results for later merging.
140 Limit is used in conjuntion with sorting or scored results to inform
141 the catalog how many results you are really interested in. The catalog
142 can then use optimizations to save time and memory. The number of
143 results is not guaranteed to fall within the limit however, you should
144 still slice or batch the results as usual."""
145
146 rs = None # resultset
147
148 # Indexes fulfill a fairly large contract here. We hand each
149 # index the query mapping we are given (which may be composed
150 # of some combination of web request, kw mappings or plain old dicts)
151 # and the index decides what to do with it. If the index finds work
152 # for itself in the query, it returns the results and a tuple of
153 # the attributes that were used. If the index finds nothing for it
154 # to do then it returns None.
155
156 # Canonicalize the request into a sensible query before passing it on
157 query = self.make_query(query)
158
159 cr = self.getCatalogPlan(query)
160 cr.start()
161
162 plan = cr.plan()
163 if not plan:
164 plan = self._sorted_search_indexes(query)
165
166 # délégation
167 rs = self.delegateSearch(query, plan)
168 if rs is not None and not rs :
169 return LazyCat([])
170
171 indexes = self.indexes.keys()
172 for i in plan:
173 if i not in indexes:
174 # We can have bogus keys or the plan can contain index names
175 # that have been removed in the meantime
176 continue
177
178 index = self.getIndex(i)
179 _apply_index = getattr(index, "_apply_index", None)
180 if _apply_index is None:
181 continue
182
183 cr.start_split(i)
184 limit_result = ILimitedResultIndex.providedBy(index)
185 if limit_result:
186 r = _apply_index(query, rs)
187 else:
188 r = _apply_index(query)
189
190 if r is not None:
191 r, u = r
192 # Short circuit if empty result
193 # BBB: We can remove the "r is not None" check in Zope 2.14
194 # once we don't need to support the "return everything" case
195 # anymore
196 if r is not None and not r:
197 cr.stop_split(i, result=None, limit=limit_result)
198 return LazyCat([])
199
200 # provide detailed info about the pure intersection time
201 intersect_id = i + '#intersection'
202 cr.start_split(intersect_id)
203 # weightedIntersection preserves the values from any mappings
204 # we get, as some indexes don't return simple sets
205 if hasattr(rs, 'items') or hasattr(r, 'items'):
206 _, rs = weightedIntersection(rs, r)
207 else:
208 rs = intersection(rs, r)
209
210 cr.stop_split(intersect_id)
211
212 # consider the time it takes to intersect the index result with
213 # the total resultset to be part of the index time
214 cr.stop_split(i, result=r, limit=limit_result)
215 if not rs:
216 break
217 else:
218 cr.stop_split(i, result=None, limit=limit_result)
219
220 # Try to deduce the sort limit from batching arguments
221 b_start = int(query.get('b_start', 0))
222 b_size = query.get('b_size', None)
223 if b_size is not None:
224 b_size = int(b_size)
225
226 if b_size is not None:
227 limit = b_start + b_size
228 elif limit and b_size is None:
229 b_size = limit
230
231 if rs is None:
232 # None of the indexes found anything to do with the query
233 # We take this to mean that the query was empty (an empty filter)
234 # and so we return everything in the catalog
235 warnings.warn('Your query %s produced no query restriction. '
236 'Currently the entire catalog content is returned. '
237 'In Zope 2.14 this will result in an empty LazyCat '
238 'to be returned.' % repr(cr.make_key(query)),
239 DeprecationWarning, stacklevel=3)
240
241 rlen = len(self)
242 if sort_index is None:
243 sequence, slen = self._limit_sequence(self.data.items(), rlen,
244 b_start, b_size)
245 result = LazyMap(self.instantiate, sequence, slen,
246 actual_result_count=rlen)
247 else:
248 cr.start_split('sort_on')
249 result = self.sortResults(
250 self.data, sort_index, reverse, limit, merge,
251 actual_result_count=rlen, b_start=b_start,
252 b_size=b_size)
253 cr.stop_split('sort_on', None)
254 elif rs:
255 # We got some results from the indexes.
256 # Sort and convert to sequences.
257 # XXX: The check for 'values' is really stupid since we call
258 # items() and *not* values()
259 rlen = len(rs)
260 if sort_index is None and hasattr(rs, 'items'):
261 # having a 'items' means we have a data structure with
262 # scores. Build a new result set, sort it by score, reverse
263 # it, compute the normalized score, and Lazify it.
264
265 if not merge:
266 # Don't bother to sort here, return a list of
267 # three tuples to be passed later to mergeResults
268 # note that data_record_normalized_score_ cannot be
269 # calculated and will always be 1 in this case
270 getitem = self.__getitem__
271 result = [(score, (1, score, rid), getitem)
272 for rid, score in rs.items()]
273 else:
274 cr.start_split('sort_on')
275
276 rs = rs.byValue(0) # sort it by score
277 max = float(rs[0][0])
278
279 # Here we define our getter function inline so that
280 # we can conveniently store the max value as a default arg
281 # and make the normalized score computation lazy
282 def getScoredResult(item, max=max, self=self):
283 """
284 Returns instances of self._v_brains, or whatever is
285 passed into self.useBrains.
286 """
287 score, key = item
288 r=self._v_result_class(self.data[key])\
289 .__of__(aq_parent(self))
290 r.data_record_id_ = key
291 r.data_record_score_ = score
292 r.data_record_normalized_score_ = int(100. * score / max)
293 return r
294
295 sequence, slen = self._limit_sequence(rs, rlen, b_start,
296 b_size)
297 result = LazyMap(getScoredResult, sequence, slen,
298 actual_result_count=rlen)
299 cr.stop_split('sort_on', None)
300
301 elif sort_index is None and not hasattr(rs, 'values'):
302 # no scores
303 if hasattr(rs, 'keys'):
304 rs = rs.keys()
305 sequence, slen = self._limit_sequence(rs, rlen, b_start,
306 b_size)
307 result = LazyMap(self.__getitem__, sequence, slen,
308 actual_result_count=rlen)
309 else:
310 # sort. If there are scores, then this block is not
311 # reached, therefore 'sort-on' does not happen in the
312 # context of a text index query. This should probably
313 # sort by relevance first, then the 'sort-on' attribute.
314 cr.start_split('sort_on')
315 result = self.sortResults(rs, sort_index, reverse, limit,
316 merge, actual_result_count=rlen, b_start=b_start,
317 b_size=b_size)
318 cr.stop_split('sort_on', None)
319 else:
320 # Empty result set
321 result = LazyCat([])
322 cr.stop()
323 return result