da1e4ccdac00aa67036aa52bb6c6fb2d6d66f359
[Plinn.git] / catalog.py
1 # -*- coding: utf-8 -*-
2 from App.class_init import InitializeClass
3 from AccessControl import ClassSecurityInfo
4 from Products.CMFCore.interfaces import IIndexableObject
5 from Products.CMFCore.CatalogTool import CatalogTool as BaseCatalogTool
6 from Products.CMFCore.CatalogTool import IndexableObjectWrapper
7 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
8 from Products.CMFCore.permissions import ModifyPortalContent
9 from zope.component import queryMultiAdapter
10 from Products.ZCatalog.Catalog import Catalog
11 import transaction
12 from solr import *
13
14 # imports for Catalog class
15 from Products.PluginIndexes.interfaces import ILimitedResultIndex
16 from Products.ZCatalog.Lazy import LazyMap, LazyCat, LazyValues
17 from BTrees.IIBTree import intersection, IISet
18 from BTrees.IIBTree import weightedIntersection
19 import warnings
20
21 class SolrTransactionHook :
22 ''' commit solr couplé sur le commit de la ZODB '''
23 def __init__(self, connection) :
24 self.connection = connection
25
26 def __call__(self, status) :
27 if status :
28 self.connection.commit()
29 self.connection.close()
30 else :
31 self.connection.close()
32
33 class CatalogTool(BaseCatalogTool) :
34 meta_type = 'Legivoc Catalog'
35 security = ClassSecurityInfo()
36 manage_options = (BaseCatalogTool.manage_options[:5] +
37 ({'label' : 'Solr', 'action' : 'manage_solr'},) +
38 BaseCatalogTool.manage_options[5:])
39 manage_solr = PageTemplateFile('www/manage_solr', globals())
40
41
42 def __init__(self, idxs=[]) :
43 super(CatalogTool, self).__init__()
44 self._catalog = DelegatedCatalog(self)
45 self.solr_url = 'http://localhost:8983/solr'
46 self.delegatedIndexes = ('Title', 'Description', 'SearchableText')
47
48 security.declarePrivate('solrAdd')
49 def solrAdd(self, object, idxs=[], uid=None) :
50 if IIndexableObject.providedBy(object):
51 w = object
52 else:
53 w = queryMultiAdapter( (object, self), IIndexableObject )
54 if w is None:
55 # BBB
56 w = IndexableObjectWrapper(object, self)
57
58 uid = uid if uid else self.__url(object)
59 idxs = idxs if idxs !=[] else self.delegatedIndexes
60 data = {'id' : uid}
61 for name in idxs :
62 attr = getattr(w, name, '')
63 data[name] = attr() if callable(attr) else attr
64 c = SolrConnection(self.solr_url)
65 c.add(**data)
66 txn = transaction.get()
67 txn.addAfterCommitHook(SolrTransactionHook(c))
68
69
70 # PortalCatalog api overloads
71 security.declareProtected(ModifyPortalContent, 'indexObject')
72 def indexObject(self, object) :
73 """ Add to catalog and send to Solr """
74 super(CatalogTool, self).indexObject(object)
75 self.solrAdd(object)
76
77 security.declarePrivate('reindexObject')
78 def reindexObject(self, object, idxs=[], update_metadata=1, uid=None):
79 super(CatalogTool, self).reindexObject(object,
80 idxs=idxs,
81 update_metadata=update_metadata,
82 uid=uid)
83 if idxs != []:
84 # Filter out invalid indexes.
85 valid_indexes = self._catalog.indexes.keys()
86 idxs = [i for i in idxs if i in valid_indexes and i in self.delegatedIndexes]
87 else :
88 idxs = self.delegatedIndexes
89
90 if idxs :
91 self.solrAdd(object, idxs=idxs, uid=uid)
92
93 security.declarePrivate('unindexObject')
94 def unindexObject(self, object):
95 """Remove from catalog.
96 """
97 super(CatalogTool, self).unindexObject(object)
98 c = SolrConnection(self.solr_url)
99 url = self.__url(object)
100 c.delete(id=url)
101 txn = transaction.get()
102 txn.addAfterCommitHook(SolrTransactionHook(c))
103
104 InitializeClass(CatalogTool)
105
106
107 class DelegatedCatalog(Catalog) :
108 '''C'est ici qu'on délègue effectivement à Solr '''
109
110 def __init__(self, zcat, brains=None) :
111 Catalog.__init__(self, brains=brains)
112 self.zcat = zcat
113
114 def getDelegatedIndexes(self) :
115 return ('Title', 'Description', 'SearchableText') # <= TODO virer cette ligne
116 return self.zcat.delegatedIndexes
117
118 def delegateSearch(self, query, plan) :
119 '''
120 retours faux :
121 None signifie : pas de délégation, il faut continuer à interroger les autres index.
122 IISet() vide : pas de résultat lors de la délégation, on peut arrêter la recherche.
123 '''
124 indexes = set(plan).intersection(set(self.getDelegatedIndexes()))
125 delegatedQuery = {}
126 for i in indexes :
127 delegatedQuery[i] = query.pop(i)
128 plan.remove(i)
129 if not delegatedQuery :
130 return None
131 c = SolrConnection('http://localhost:8983/solr')
132 q =' AND '.join(['%s:"%s"' % item for item in delegatedQuery.items()])
133 resp = c.query(q, fields='id', rows=len(self))
134 c.close()
135 return IISet(filter(None, [self.uids.get(r['id']) for r in resp.results]))
136
137 def search(self, query, sort_index=None, reverse=0, limit=None, merge=1):
138 """Iterate through the indexes, applying the query to each one. If
139 merge is true then return a lazy result set (sorted if appropriate)
140 otherwise return the raw (possibly scored) results for later merging.
141 Limit is used in conjuntion with sorting or scored results to inform
142 the catalog how many results you are really interested in. The catalog
143 can then use optimizations to save time and memory. The number of
144 results is not guaranteed to fall within the limit however, you should
145 still slice or batch the results as usual."""
146
147 rs = None # resultset
148
149 # Indexes fulfill a fairly large contract here. We hand each
150 # index the query mapping we are given (which may be composed
151 # of some combination of web request, kw mappings or plain old dicts)
152 # and the index decides what to do with it. If the index finds work
153 # for itself in the query, it returns the results and a tuple of
154 # the attributes that were used. If the index finds nothing for it
155 # to do then it returns None.
156
157 # Canonicalize the request into a sensible query before passing it on
158 query = self.make_query(query)
159
160 cr = self.getCatalogPlan(query)
161 cr.start()
162
163 plan = cr.plan()
164 if not plan:
165 plan = self._sorted_search_indexes(query)
166
167 # délégation
168 rs = self.delegateSearch(query, plan)
169 if rs is not None and not rs :
170 return LazyCat([])
171
172 indexes = self.indexes.keys()
173 for i in plan:
174 if i not in indexes:
175 # We can have bogus keys or the plan can contain index names
176 # that have been removed in the meantime
177 continue
178
179 index = self.getIndex(i)
180 _apply_index = getattr(index, "_apply_index", None)
181 if _apply_index is None:
182 continue
183
184 cr.start_split(i)
185 limit_result = ILimitedResultIndex.providedBy(index)
186 if limit_result:
187 r = _apply_index(query, rs)
188 else:
189 r = _apply_index(query)
190
191 if r is not None:
192 r, u = r
193 # Short circuit if empty result
194 # BBB: We can remove the "r is not None" check in Zope 2.14
195 # once we don't need to support the "return everything" case
196 # anymore
197 if r is not None and not r:
198 cr.stop_split(i, result=None, limit=limit_result)
199 return LazyCat([])
200
201 # provide detailed info about the pure intersection time
202 intersect_id = i + '#intersection'
203 cr.start_split(intersect_id)
204 # weightedIntersection preserves the values from any mappings
205 # we get, as some indexes don't return simple sets
206 if hasattr(rs, 'items') or hasattr(r, 'items'):
207 _, rs = weightedIntersection(rs, r)
208 else:
209 rs = intersection(rs, r)
210
211 cr.stop_split(intersect_id)
212
213 # consider the time it takes to intersect the index result with
214 # the total resultset to be part of the index time
215 cr.stop_split(i, result=r, limit=limit_result)
216 if not rs:
217 break
218 else:
219 cr.stop_split(i, result=None, limit=limit_result)
220
221 # Try to deduce the sort limit from batching arguments
222 b_start = int(query.get('b_start', 0))
223 b_size = query.get('b_size', None)
224 if b_size is not None:
225 b_size = int(b_size)
226
227 if b_size is not None:
228 limit = b_start + b_size
229 elif limit and b_size is None:
230 b_size = limit
231
232 if rs is None:
233 # None of the indexes found anything to do with the query
234 # We take this to mean that the query was empty (an empty filter)
235 # and so we return everything in the catalog
236 warnings.warn('Your query %s produced no query restriction. '
237 'Currently the entire catalog content is returned. '
238 'In Zope 2.14 this will result in an empty LazyCat '
239 'to be returned.' % repr(cr.make_key(query)),
240 DeprecationWarning, stacklevel=3)
241
242 rlen = len(self)
243 if sort_index is None:
244 sequence, slen = self._limit_sequence(self.data.items(), rlen,
245 b_start, b_size)
246 result = LazyMap(self.instantiate, sequence, slen,
247 actual_result_count=rlen)
248 else:
249 cr.start_split('sort_on')
250 result = self.sortResults(
251 self.data, sort_index, reverse, limit, merge,
252 actual_result_count=rlen, b_start=b_start,
253 b_size=b_size)
254 cr.stop_split('sort_on', None)
255 elif rs:
256 # We got some results from the indexes.
257 # Sort and convert to sequences.
258 # XXX: The check for 'values' is really stupid since we call
259 # items() and *not* values()
260 rlen = len(rs)
261 if sort_index is None and hasattr(rs, 'items'):
262 # having a 'items' means we have a data structure with
263 # scores. Build a new result set, sort it by score, reverse
264 # it, compute the normalized score, and Lazify it.
265
266 if not merge:
267 # Don't bother to sort here, return a list of
268 # three tuples to be passed later to mergeResults
269 # note that data_record_normalized_score_ cannot be
270 # calculated and will always be 1 in this case
271 getitem = self.__getitem__
272 result = [(score, (1, score, rid), getitem)
273 for rid, score in rs.items()]
274 else:
275 cr.start_split('sort_on')
276
277 rs = rs.byValue(0) # sort it by score
278 max = float(rs[0][0])
279
280 # Here we define our getter function inline so that
281 # we can conveniently store the max value as a default arg
282 # and make the normalized score computation lazy
283 def getScoredResult(item, max=max, self=self):
284 """
285 Returns instances of self._v_brains, or whatever is
286 passed into self.useBrains.
287 """
288 score, key = item
289 r=self._v_result_class(self.data[key])\
290 .__of__(aq_parent(self))
291 r.data_record_id_ = key
292 r.data_record_score_ = score
293 r.data_record_normalized_score_ = int(100. * score / max)
294 return r
295
296 sequence, slen = self._limit_sequence(rs, rlen, b_start,
297 b_size)
298 result = LazyMap(getScoredResult, sequence, slen,
299 actual_result_count=rlen)
300 cr.stop_split('sort_on', None)
301
302 elif sort_index is None and not hasattr(rs, 'values'):
303 # no scores
304 if hasattr(rs, 'keys'):
305 rs = rs.keys()
306 sequence, slen = self._limit_sequence(rs, rlen, b_start,
307 b_size)
308 result = LazyMap(self.__getitem__, sequence, slen,
309 actual_result_count=rlen)
310 else:
311 # sort. If there are scores, then this block is not
312 # reached, therefore 'sort-on' does not happen in the
313 # context of a text index query. This should probably
314 # sort by relevance first, then the 'sort-on' attribute.
315 cr.start_split('sort_on')
316 result = self.sortResults(rs, sort_index, reverse, limit,
317 merge, actual_result_count=rlen, b_start=b_start,
318 b_size=b_size)
319 cr.stop_split('sort_on', None)
320 else:
321 # Empty result set
322 result = LazyCat([])
323 cr.stop()
324 return result