Réutilisation de la connexion Solr, pour ne pas en ouvrir plus que nécessaire.
[Plinn.git] / catalog.py
1 # -*- coding: utf-8 -*-
2 from App.class_init import InitializeClass
3 from AccessControl import ClassSecurityInfo
4 from Products.CMFCore.interfaces import IIndexableObject
5 from Products.CMFCore.CatalogTool import CatalogTool as BaseCatalogTool
6 from Products.CMFCore.CatalogTool import IndexableObjectWrapper
7 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
8 from Products.CMFCore.permissions import ModifyPortalContent
9 from zope.component import queryMultiAdapter
10 from Products.ZCatalog.Catalog import Catalog
11 import transaction
12 from solr import *
13
14 # imports for Catalog class
15 from Products.PluginIndexes.interfaces import ILimitedResultIndex
16 from Products.ZCatalog.Lazy import LazyMap, LazyCat, LazyValues
17 from BTrees.IIBTree import intersection, IISet
18 from BTrees.IIBTree import weightedIntersection
19 import warnings
20
21 _VOLATILE_SOLR_NAME = '_v_solrConnection'
22
23 class SolrTransactionHook :
24 ''' commit solr couplé sur le commit de la ZODB '''
25 def __init__(self, context) :
26 self.context = context
27
28 def __call__(self, status) :
29 con = getattr(self.context, _VOLATILE_SOLR_NAME)
30 if status :
31 con.commit()
32 con.close()
33 else :
34 con.close()
35 delattr(self.context, _VOLATILE_SOLR_NAME)
36
37 class CatalogTool(BaseCatalogTool) :
38 meta_type = 'Plinn Catalog'
39 security = ClassSecurityInfo()
40 manage_options = (BaseCatalogTool.manage_options[:5] +
41 ({'label' : 'Solr', 'action' : 'manage_solr'},) +
42 BaseCatalogTool.manage_options[5:])
43 manage_solr = PageTemplateFile('www/manage_solr', globals())
44
45
46 def __init__(self, idxs=[]) :
47 super(CatalogTool, self).__init__()
48 self._catalog = DelegatedCatalog(self)
49 self.solr_url = 'http://localhost:8983/solr'
50 self.delegatedIndexes = ('Title', 'Description', 'SearchableText')
51
52 def _getSolrConnection(self) :
53 if not hasattr(self, _VOLATILE_SOLR_NAME) :
54 con = SolrConnection(self.solr_url)
55 setattr(self, _VOLATILE_SOLR_NAME, con)
56 txn = transaction.get()
57 txn.addAfterCommitHook(SolrTransactionHook(self))
58 return getattr(self, _VOLATILE_SOLR_NAME)
59
60 security.declarePrivate('solrAdd')
61 def solrAdd(self, object, idxs=[], uid=None) :
62 if IIndexableObject.providedBy(object):
63 w = object
64 else:
65 w = queryMultiAdapter( (object, self), IIndexableObject )
66 if w is None:
67 # BBB
68 w = IndexableObjectWrapper(object, self)
69
70 uid = uid if uid else self.__url(object)
71 idxs = idxs if idxs !=[] else self.delegatedIndexes
72 data = {'id' : uid}
73 for name in idxs :
74 attr = getattr(w, name, '')
75 data[name] = attr() if callable(attr) else attr
76 c = self._getSolrConnection()
77 c.add(**data)
78
79
80 # PortalCatalog api overloads
81 security.declareProtected(ModifyPortalContent, 'indexObject')
82 def indexObject(self, object) :
83 """ Add to catalog and send to Solr """
84 super(CatalogTool, self).indexObject(object)
85 self.solrAdd(object)
86
87 security.declarePrivate('reindexObject')
88 def reindexObject(self, object, idxs=[], update_metadata=1, uid=None):
89 super(CatalogTool, self).reindexObject(object,
90 idxs=idxs,
91 update_metadata=update_metadata,
92 uid=uid)
93 if idxs != []:
94 # Filter out invalid indexes.
95 valid_indexes = self._catalog.indexes.keys()
96 idxs = [i for i in idxs if i in valid_indexes and i in self.delegatedIndexes]
97 else :
98 idxs = self.delegatedIndexes
99
100 if idxs :
101 self.solrAdd(object, idxs=idxs, uid=uid)
102
103 security.declarePrivate('unindexObject')
104 def unindexObject(self, object):
105 """Remove from catalog.
106 """
107 super(CatalogTool, self).unindexObject(object)
108 c = self._getSolrConnection()
109 url = self.__url(object)
110 c.delete(id=url)
111
112 InitializeClass(CatalogTool)
113
114
115 class DelegatedCatalog(Catalog) :
116 '''C'est ici qu'on délègue effectivement à Solr '''
117
118 def __init__(self, zcat, brains=None) :
119 Catalog.__init__(self, brains=brains)
120 self.zcat = zcat
121
122 def delegateSearch(self, query, plan) :
123 '''
124 retours faux :
125 None signifie : pas de délégation, il faut continuer à interroger les autres index.
126 IISet() vide : pas de résultat lors de la délégation, on peut arrêter la recherche.
127 '''
128 indexes = set(query.keys()).intersection(set(self.zcat.delegatedIndexes))
129 if not indexes :
130 return None
131 delegatedQuery = {}
132 for i in indexes :
133 delegatedQuery[i] = query.pop(i)
134 try : plan.remove(i)
135 except ValueError : pass
136 c = SolrConnection(self.zcat.solr_url)
137 q =' AND '.join(['%s:"%s"' % item for item in delegatedQuery.items()])
138 resp = c.query(q, fields='id', rows=len(self))
139 c.close()
140 return IISet(filter(None, [self.uids.get(r['id']) for r in resp.results]))
141
142 def search(self, query, sort_index=None, reverse=0, limit=None, merge=1):
143 """Iterate through the indexes, applying the query to each one. If
144 merge is true then return a lazy result set (sorted if appropriate)
145 otherwise return the raw (possibly scored) results for later merging.
146 Limit is used in conjuntion with sorting or scored results to inform
147 the catalog how many results you are really interested in. The catalog
148 can then use optimizations to save time and memory. The number of
149 results is not guaranteed to fall within the limit however, you should
150 still slice or batch the results as usual."""
151
152 rs = None # resultset
153
154 # Indexes fulfill a fairly large contract here. We hand each
155 # index the query mapping we are given (which may be composed
156 # of some combination of web request, kw mappings or plain old dicts)
157 # and the index decides what to do with it. If the index finds work
158 # for itself in the query, it returns the results and a tuple of
159 # the attributes that were used. If the index finds nothing for it
160 # to do then it returns None.
161
162 # Canonicalize the request into a sensible query before passing it on
163 query = self.make_query(query)
164
165 cr = self.getCatalogPlan(query)
166 cr.start()
167
168 plan = cr.plan()
169 if not plan:
170 plan = self._sorted_search_indexes(query)
171
172 # délégation
173 rs = self.delegateSearch(query, plan)
174 if rs is not None and not rs :
175 return LazyCat([])
176
177 indexes = self.indexes.keys()
178 for i in plan:
179 if i not in indexes:
180 # We can have bogus keys or the plan can contain index names
181 # that have been removed in the meantime
182 continue
183
184 index = self.getIndex(i)
185 _apply_index = getattr(index, "_apply_index", None)
186 if _apply_index is None:
187 continue
188
189 cr.start_split(i)
190 limit_result = ILimitedResultIndex.providedBy(index)
191 if limit_result:
192 r = _apply_index(query, rs)
193 else:
194 r = _apply_index(query)
195
196 if r is not None:
197 r, u = r
198 # Short circuit if empty result
199 # BBB: We can remove the "r is not None" check in Zope 2.14
200 # once we don't need to support the "return everything" case
201 # anymore
202 if r is not None and not r:
203 cr.stop_split(i, result=None, limit=limit_result)
204 return LazyCat([])
205
206 # provide detailed info about the pure intersection time
207 intersect_id = i + '#intersection'
208 cr.start_split(intersect_id)
209 # weightedIntersection preserves the values from any mappings
210 # we get, as some indexes don't return simple sets
211 if hasattr(rs, 'items') or hasattr(r, 'items'):
212 _, rs = weightedIntersection(rs, r)
213 else:
214 rs = intersection(rs, r)
215
216 cr.stop_split(intersect_id)
217
218 # consider the time it takes to intersect the index result with
219 # the total resultset to be part of the index time
220 cr.stop_split(i, result=r, limit=limit_result)
221 if not rs:
222 break
223 else:
224 cr.stop_split(i, result=None, limit=limit_result)
225
226 # Try to deduce the sort limit from batching arguments
227 b_start = int(query.get('b_start', 0))
228 b_size = query.get('b_size', None)
229 if b_size is not None:
230 b_size = int(b_size)
231
232 if b_size is not None:
233 limit = b_start + b_size
234 elif limit and b_size is None:
235 b_size = limit
236
237 if rs is None:
238 # None of the indexes found anything to do with the query
239 # We take this to mean that the query was empty (an empty filter)
240 # and so we return everything in the catalog
241 warnings.warn('Your query %s produced no query restriction. '
242 'Currently the entire catalog content is returned. '
243 'In Zope 2.14 this will result in an empty LazyCat '
244 'to be returned.' % repr(cr.make_key(query)),
245 DeprecationWarning, stacklevel=3)
246
247 rlen = len(self)
248 if sort_index is None:
249 sequence, slen = self._limit_sequence(self.data.items(), rlen,
250 b_start, b_size)
251 result = LazyMap(self.instantiate, sequence, slen,
252 actual_result_count=rlen)
253 else:
254 cr.start_split('sort_on')
255 result = self.sortResults(
256 self.data, sort_index, reverse, limit, merge,
257 actual_result_count=rlen, b_start=b_start,
258 b_size=b_size)
259 cr.stop_split('sort_on', None)
260 elif rs:
261 # We got some results from the indexes.
262 # Sort and convert to sequences.
263 # XXX: The check for 'values' is really stupid since we call
264 # items() and *not* values()
265 rlen = len(rs)
266 if sort_index is None and hasattr(rs, 'items'):
267 # having a 'items' means we have a data structure with
268 # scores. Build a new result set, sort it by score, reverse
269 # it, compute the normalized score, and Lazify it.
270
271 if not merge:
272 # Don't bother to sort here, return a list of
273 # three tuples to be passed later to mergeResults
274 # note that data_record_normalized_score_ cannot be
275 # calculated and will always be 1 in this case
276 getitem = self.__getitem__
277 result = [(score, (1, score, rid), getitem)
278 for rid, score in rs.items()]
279 else:
280 cr.start_split('sort_on')
281
282 rs = rs.byValue(0) # sort it by score
283 max = float(rs[0][0])
284
285 # Here we define our getter function inline so that
286 # we can conveniently store the max value as a default arg
287 # and make the normalized score computation lazy
288 def getScoredResult(item, max=max, self=self):
289 """
290 Returns instances of self._v_brains, or whatever is
291 passed into self.useBrains.
292 """
293 score, key = item
294 r=self._v_result_class(self.data[key])\
295 .__of__(aq_parent(self))
296 r.data_record_id_ = key
297 r.data_record_score_ = score
298 r.data_record_normalized_score_ = int(100. * score / max)
299 return r
300
301 sequence, slen = self._limit_sequence(rs, rlen, b_start,
302 b_size)
303 result = LazyMap(getScoredResult, sequence, slen,
304 actual_result_count=rlen)
305 cr.stop_split('sort_on', None)
306
307 elif sort_index is None and not hasattr(rs, 'values'):
308 # no scores
309 if hasattr(rs, 'keys'):
310 rs = rs.keys()
311 sequence, slen = self._limit_sequence(rs, rlen, b_start,
312 b_size)
313 result = LazyMap(self.__getitem__, sequence, slen,
314 actual_result_count=rlen)
315 else:
316 # sort. If there are scores, then this block is not
317 # reached, therefore 'sort-on' does not happen in the
318 # context of a text index query. This should probably
319 # sort by relevance first, then the 'sort-on' attribute.
320 cr.start_split('sort_on')
321 result = self.sortResults(rs, sort_index, reverse, limit,
322 merge, actual_result_count=rlen, b_start=b_start,
323 b_size=b_size)
324 cr.stop_split('sort_on', None)
325 else:
326 # Empty result set
327 result = LazyCat([])
328 cr.stop()
329 return result