Bugfix : la connexion Solr peut se volatiliser plus vite que le hook de transaction…
[Plinn.git] / catalog.py
1 # -*- coding: utf-8 -*-
2 from App.class_init import InitializeClass
3 from AccessControl import ClassSecurityInfo
4 from Products.CMFCore.interfaces import IIndexableObject
5 from Products.CMFCore.CatalogTool import CatalogTool as BaseCatalogTool
6 from Products.CMFCore.CatalogTool import IndexableObjectWrapper
7 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
8 from Products.CMFCore.permissions import ModifyPortalContent
9 from zope.component import queryMultiAdapter
10 from Products.ZCatalog.Catalog import Catalog
11 import transaction
12 from solr import *
13
14 # imports for Catalog class
15 from Products.PluginIndexes.interfaces import ILimitedResultIndex
16 from Products.ZCatalog.Lazy import LazyMap, LazyCat, LazyValues
17 from BTrees.IIBTree import intersection, IISet
18 from BTrees.IIBTree import weightedIntersection
19 import warnings
20
21 _VOLATILE_SOLR_NAME = '_v_solrConnection'
22
23 class SolrTransactionHook :
24 ''' commit solr couplé sur le commit de la ZODB '''
25 def __init__(self, context, con) :
26 self.context = context
27 self.con = con
28
29 def __call__(self, status) :
30 if status :
31 self.con.commit()
32 self.con.close()
33 else :
34 self.con.close()
35 try :
36 delattr(self.context, _VOLATILE_SOLR_NAME)
37 except AttributeError :
38 pass
39
40 class CatalogTool(BaseCatalogTool) :
41 meta_type = 'Plinn Catalog'
42 security = ClassSecurityInfo()
43 manage_options = (BaseCatalogTool.manage_options[:5] +
44 ({'label' : 'Solr', 'action' : 'manage_solr'},) +
45 BaseCatalogTool.manage_options[5:])
46 manage_solr = PageTemplateFile('www/manage_solr', globals())
47
48
49 def __init__(self, idxs=[]) :
50 super(CatalogTool, self).__init__()
51 self._catalog = DelegatedCatalog(self)
52 self.solr_url = 'http://localhost:8983/solr'
53 self.delegatedIndexes = ('Title', 'Description', 'SearchableText')
54
55 def _getSolrConnection(self) :
56 if not hasattr(self, _VOLATILE_SOLR_NAME) :
57 con = SolrConnection(self.solr_url)
58 setattr(self, _VOLATILE_SOLR_NAME, con)
59 txn = transaction.get()
60 txn.addAfterCommitHook(SolrTransactionHook(self, con))
61 return getattr(self, _VOLATILE_SOLR_NAME)
62
63 security.declarePrivate('solrAdd')
64 def solrAdd(self, object, idxs=[], uid=None) :
65 if IIndexableObject.providedBy(object):
66 w = object
67 else:
68 w = queryMultiAdapter( (object, self), IIndexableObject )
69 if w is None:
70 # BBB
71 w = IndexableObjectWrapper(object, self)
72
73 uid = uid if uid else self.__url(object)
74 idxs = idxs if idxs !=[] else self.delegatedIndexes
75 data = {'id' : uid}
76 for name in idxs :
77 attr = getattr(w, name, '')
78 data[name] = attr() if callable(attr) else attr
79 c = self._getSolrConnection()
80 c.add(**data)
81
82
83 # PortalCatalog api overloads
84 security.declareProtected(ModifyPortalContent, 'indexObject')
85 def indexObject(self, object) :
86 """ Add to catalog and send to Solr """
87 super(CatalogTool, self).indexObject(object)
88 self.solrAdd(object)
89
90 security.declarePrivate('reindexObject')
91 def reindexObject(self, object, idxs=[], update_metadata=1, uid=None):
92 super(CatalogTool, self).reindexObject(object,
93 idxs=idxs,
94 update_metadata=update_metadata,
95 uid=uid)
96 if idxs != []:
97 # Filter out invalid indexes.
98 valid_indexes = self._catalog.indexes.keys()
99 idxs = [i for i in idxs if i in valid_indexes and i in self.delegatedIndexes]
100 else :
101 idxs = self.delegatedIndexes
102
103 if idxs :
104 self.solrAdd(object, idxs=idxs, uid=uid)
105
106 security.declarePrivate('unindexObject')
107 def unindexObject(self, object):
108 """Remove from catalog.
109 """
110 super(CatalogTool, self).unindexObject(object)
111 c = self._getSolrConnection()
112 url = self.__url(object)
113 c.delete(id=url)
114
115 InitializeClass(CatalogTool)
116
117
118 class DelegatedCatalog(Catalog) :
119 '''C'est ici qu'on délègue effectivement à Solr '''
120
121 def __init__(self, zcat, brains=None) :
122 Catalog.__init__(self, brains=brains)
123 self.zcat = zcat
124
125 def delegateSearch(self, query, plan) :
126 '''
127 retours faux :
128 None signifie : pas de délégation, il faut continuer à interroger les autres index.
129 IISet() vide : pas de résultat lors de la délégation, on peut arrêter la recherche.
130 '''
131 indexes = set(query.keys()).intersection(set(self.zcat.delegatedIndexes))
132 if not indexes :
133 return None
134 delegatedQuery = {}
135 for i in indexes :
136 delegatedQuery[i] = query.pop(i)
137 try : plan.remove(i)
138 except ValueError : pass
139 c = SolrConnection(self.zcat.solr_url)
140 q =' AND '.join(['%s:"%s"' % item for item in delegatedQuery.items()])
141 resp = c.query(q, fields='id', rows=len(self))
142 c.close()
143 return IISet(filter(None, [self.uids.get(r['id']) for r in resp.results]))
144
145 def search(self, query, sort_index=None, reverse=0, limit=None, merge=1):
146 """Iterate through the indexes, applying the query to each one. If
147 merge is true then return a lazy result set (sorted if appropriate)
148 otherwise return the raw (possibly scored) results for later merging.
149 Limit is used in conjuntion with sorting or scored results to inform
150 the catalog how many results you are really interested in. The catalog
151 can then use optimizations to save time and memory. The number of
152 results is not guaranteed to fall within the limit however, you should
153 still slice or batch the results as usual."""
154
155 rs = None # resultset
156
157 # Indexes fulfill a fairly large contract here. We hand each
158 # index the query mapping we are given (which may be composed
159 # of some combination of web request, kw mappings or plain old dicts)
160 # and the index decides what to do with it. If the index finds work
161 # for itself in the query, it returns the results and a tuple of
162 # the attributes that were used. If the index finds nothing for it
163 # to do then it returns None.
164
165 # Canonicalize the request into a sensible query before passing it on
166 query = self.make_query(query)
167
168 cr = self.getCatalogPlan(query)
169 cr.start()
170
171 plan = cr.plan()
172 if not plan:
173 plan = self._sorted_search_indexes(query)
174
175 # délégation
176 rs = self.delegateSearch(query, plan)
177 if rs is not None and not rs :
178 return LazyCat([])
179
180 indexes = self.indexes.keys()
181 for i in plan:
182 if i not in indexes:
183 # We can have bogus keys or the plan can contain index names
184 # that have been removed in the meantime
185 continue
186
187 index = self.getIndex(i)
188 _apply_index = getattr(index, "_apply_index", None)
189 if _apply_index is None:
190 continue
191
192 cr.start_split(i)
193 limit_result = ILimitedResultIndex.providedBy(index)
194 if limit_result:
195 r = _apply_index(query, rs)
196 else:
197 r = _apply_index(query)
198
199 if r is not None:
200 r, u = r
201 # Short circuit if empty result
202 # BBB: We can remove the "r is not None" check in Zope 2.14
203 # once we don't need to support the "return everything" case
204 # anymore
205 if r is not None and not r:
206 cr.stop_split(i, result=None, limit=limit_result)
207 return LazyCat([])
208
209 # provide detailed info about the pure intersection time
210 intersect_id = i + '#intersection'
211 cr.start_split(intersect_id)
212 # weightedIntersection preserves the values from any mappings
213 # we get, as some indexes don't return simple sets
214 if hasattr(rs, 'items') or hasattr(r, 'items'):
215 _, rs = weightedIntersection(rs, r)
216 else:
217 rs = intersection(rs, r)
218
219 cr.stop_split(intersect_id)
220
221 # consider the time it takes to intersect the index result with
222 # the total resultset to be part of the index time
223 cr.stop_split(i, result=r, limit=limit_result)
224 if not rs:
225 break
226 else:
227 cr.stop_split(i, result=None, limit=limit_result)
228
229 # Try to deduce the sort limit from batching arguments
230 b_start = int(query.get('b_start', 0))
231 b_size = query.get('b_size', None)
232 if b_size is not None:
233 b_size = int(b_size)
234
235 if b_size is not None:
236 limit = b_start + b_size
237 elif limit and b_size is None:
238 b_size = limit
239
240 if rs is None:
241 # None of the indexes found anything to do with the query
242 # We take this to mean that the query was empty (an empty filter)
243 # and so we return everything in the catalog
244 warnings.warn('Your query %s produced no query restriction. '
245 'Currently the entire catalog content is returned. '
246 'In Zope 2.14 this will result in an empty LazyCat '
247 'to be returned.' % repr(cr.make_key(query)),
248 DeprecationWarning, stacklevel=3)
249
250 rlen = len(self)
251 if sort_index is None:
252 sequence, slen = self._limit_sequence(self.data.items(), rlen,
253 b_start, b_size)
254 result = LazyMap(self.instantiate, sequence, slen,
255 actual_result_count=rlen)
256 else:
257 cr.start_split('sort_on')
258 result = self.sortResults(
259 self.data, sort_index, reverse, limit, merge,
260 actual_result_count=rlen, b_start=b_start,
261 b_size=b_size)
262 cr.stop_split('sort_on', None)
263 elif rs:
264 # We got some results from the indexes.
265 # Sort and convert to sequences.
266 # XXX: The check for 'values' is really stupid since we call
267 # items() and *not* values()
268 rlen = len(rs)
269 if sort_index is None and hasattr(rs, 'items'):
270 # having a 'items' means we have a data structure with
271 # scores. Build a new result set, sort it by score, reverse
272 # it, compute the normalized score, and Lazify it.
273
274 if not merge:
275 # Don't bother to sort here, return a list of
276 # three tuples to be passed later to mergeResults
277 # note that data_record_normalized_score_ cannot be
278 # calculated and will always be 1 in this case
279 getitem = self.__getitem__
280 result = [(score, (1, score, rid), getitem)
281 for rid, score in rs.items()]
282 else:
283 cr.start_split('sort_on')
284
285 rs = rs.byValue(0) # sort it by score
286 max = float(rs[0][0])
287
288 # Here we define our getter function inline so that
289 # we can conveniently store the max value as a default arg
290 # and make the normalized score computation lazy
291 def getScoredResult(item, max=max, self=self):
292 """
293 Returns instances of self._v_brains, or whatever is
294 passed into self.useBrains.
295 """
296 score, key = item
297 r=self._v_result_class(self.data[key])\
298 .__of__(aq_parent(self))
299 r.data_record_id_ = key
300 r.data_record_score_ = score
301 r.data_record_normalized_score_ = int(100. * score / max)
302 return r
303
304 sequence, slen = self._limit_sequence(rs, rlen, b_start,
305 b_size)
306 result = LazyMap(getScoredResult, sequence, slen,
307 actual_result_count=rlen)
308 cr.stop_split('sort_on', None)
309
310 elif sort_index is None and not hasattr(rs, 'values'):
311 # no scores
312 if hasattr(rs, 'keys'):
313 rs = rs.keys()
314 sequence, slen = self._limit_sequence(rs, rlen, b_start,
315 b_size)
316 result = LazyMap(self.__getitem__, sequence, slen,
317 actual_result_count=rlen)
318 else:
319 # sort. If there are scores, then this block is not
320 # reached, therefore 'sort-on' does not happen in the
321 # context of a text index query. This should probably
322 # sort by relevance first, then the 'sort-on' attribute.
323 cr.start_split('sort_on')
324 result = self.sortResults(rs, sort_index, reverse, limit,
325 merge, actual_result_count=rlen, b_start=b_start,
326 b_size=b_size)
327 cr.stop_split('sort_on', None)
328 else:
329 # Empty result set
330 result = LazyCat([])
331 cr.stop()
332 return result