4b925bfdf23047067b6c0793b1a3be2948b4dc77
[Plinn.git] / catalog.py
1 # -*- coding: utf-8 -*-
2 from App.class_init import InitializeClass
3 from AccessControl import ClassSecurityInfo
4 from Products.CMFCore.interfaces import IIndexableObject
5 from Products.CMFCore.CatalogTool import CatalogTool as BaseCatalogTool
6 from Products.CMFCore.CatalogTool import IndexableObjectWrapper
7 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
8 from Products.CMFCore.permissions import ModifyPortalContent
9 from zope.component import queryMultiAdapter
10 from Products.ZCatalog.Catalog import Catalog
11 import transaction
12 from solr import *
13
14 # imports for Catalog class
15 from Products.PluginIndexes.interfaces import ILimitedResultIndex
16 from Products.ZCatalog.Lazy import LazyMap, LazyCat, LazyValues
17 from BTrees.IIBTree import intersection, IISet
18 from BTrees.IIBTree import weightedIntersection
19 import warnings
20
21 class SolrTransactionHook :
22 ''' commit solr couplé sur le commit de la ZODB '''
23 def __init__(self, connection) :
24 self.connection = connection
25
26 def __call__(self, status) :
27 if status :
28 self.connection.commit()
29 self.connection.close()
30 else :
31 self.connection.close()
32
33 class CatalogTool(BaseCatalogTool) :
34 meta_type = 'Plinn Catalog'
35 security = ClassSecurityInfo()
36 manage_options = (BaseCatalogTool.manage_options[:5] +
37 ({'label' : 'Solr', 'action' : 'manage_solr'},) +
38 BaseCatalogTool.manage_options[5:])
39 manage_solr = PageTemplateFile('www/manage_solr', globals())
40
41
42 def __init__(self, idxs=[]) :
43 super(CatalogTool, self).__init__()
44 self._catalog = DelegatedCatalog(self)
45 self.solr_url = 'http://localhost:8983/solr'
46 self.delegatedIndexes = ('Title', 'Description', 'SearchableText')
47
48 security.declarePrivate('solrAdd')
49 def solrAdd(self, object, idxs=[], uid=None) :
50 if IIndexableObject.providedBy(object):
51 w = object
52 else:
53 w = queryMultiAdapter( (object, self), IIndexableObject )
54 if w is None:
55 # BBB
56 w = IndexableObjectWrapper(object, self)
57
58 uid = uid if uid else self.__url(object)
59 idxs = idxs if idxs !=[] else self.delegatedIndexes
60 data = {'id' : uid}
61 for name in idxs :
62 attr = getattr(w, name, '')
63 data[name] = attr() if callable(attr) else attr
64 c = SolrConnection(self.solr_url)
65 c.add(**data)
66 txn = transaction.get()
67 txn.addAfterCommitHook(SolrTransactionHook(c))
68
69
70 # PortalCatalog api overloads
71 security.declareProtected(ModifyPortalContent, 'indexObject')
72 def indexObject(self, object) :
73 """ Add to catalog and send to Solr """
74 super(CatalogTool, self).indexObject(object)
75 self.solrAdd(object)
76
77 security.declarePrivate('reindexObject')
78 def reindexObject(self, object, idxs=[], update_metadata=1, uid=None):
79 super(CatalogTool, self).reindexObject(object,
80 idxs=idxs,
81 update_metadata=update_metadata,
82 uid=uid)
83 if idxs != []:
84 # Filter out invalid indexes.
85 valid_indexes = self._catalog.indexes.keys()
86 idxs = [i for i in idxs if i in valid_indexes and i in self.delegatedIndexes]
87 else :
88 idxs = self.delegatedIndexes
89
90 if idxs :
91 self.solrAdd(object, idxs=idxs, uid=uid)
92
93 security.declarePrivate('unindexObject')
94 def unindexObject(self, object):
95 """Remove from catalog.
96 """
97 super(CatalogTool, self).unindexObject(object)
98 c = SolrConnection(self.solr_url)
99 url = self.__url(object)
100 c.delete(id=url)
101 txn = transaction.get()
102 txn.addAfterCommitHook(SolrTransactionHook(c))
103
104 InitializeClass(CatalogTool)
105
106
107 class DelegatedCatalog(Catalog) :
108 '''C'est ici qu'on délègue effectivement à Solr '''
109
110 def __init__(self, zcat, brains=None) :
111 Catalog.__init__(self, brains=brains)
112 self.zcat = zcat
113
114 def delegateSearch(self, query, plan) :
115 '''
116 retours faux :
117 None signifie : pas de délégation, il faut continuer à interroger les autres index.
118 IISet() vide : pas de résultat lors de la délégation, on peut arrêter la recherche.
119 '''
120 indexes = set(query.keys()).intersection(set(self.zcat.delegatedIndexes))
121 if not indexes :
122 return None
123 delegatedQuery = {}
124 for i in indexes :
125 delegatedQuery[i] = query.pop(i)
126 try : plan.remove(i)
127 except ValueError : pass
128 c = SolrConnection(self.zcat.solr_url)
129 q =' AND '.join(['%s:"%s"' % item for item in delegatedQuery.items()])
130 resp = c.query(q, fields='id', rows=len(self))
131 c.close()
132 return IISet(filter(None, [self.uids.get(r['id']) for r in resp.results]))
133
134 def search(self, query, sort_index=None, reverse=0, limit=None, merge=1):
135 """Iterate through the indexes, applying the query to each one. If
136 merge is true then return a lazy result set (sorted if appropriate)
137 otherwise return the raw (possibly scored) results for later merging.
138 Limit is used in conjuntion with sorting or scored results to inform
139 the catalog how many results you are really interested in. The catalog
140 can then use optimizations to save time and memory. The number of
141 results is not guaranteed to fall within the limit however, you should
142 still slice or batch the results as usual."""
143
144 rs = None # resultset
145
146 # Indexes fulfill a fairly large contract here. We hand each
147 # index the query mapping we are given (which may be composed
148 # of some combination of web request, kw mappings or plain old dicts)
149 # and the index decides what to do with it. If the index finds work
150 # for itself in the query, it returns the results and a tuple of
151 # the attributes that were used. If the index finds nothing for it
152 # to do then it returns None.
153
154 # Canonicalize the request into a sensible query before passing it on
155 query = self.make_query(query)
156
157 cr = self.getCatalogPlan(query)
158 cr.start()
159
160 plan = cr.plan()
161 if not plan:
162 plan = self._sorted_search_indexes(query)
163
164 # délégation
165 rs = self.delegateSearch(query, plan)
166 if rs is not None and not rs :
167 return LazyCat([])
168
169 indexes = self.indexes.keys()
170 for i in plan:
171 if i not in indexes:
172 # We can have bogus keys or the plan can contain index names
173 # that have been removed in the meantime
174 continue
175
176 index = self.getIndex(i)
177 _apply_index = getattr(index, "_apply_index", None)
178 if _apply_index is None:
179 continue
180
181 cr.start_split(i)
182 limit_result = ILimitedResultIndex.providedBy(index)
183 if limit_result:
184 r = _apply_index(query, rs)
185 else:
186 r = _apply_index(query)
187
188 if r is not None:
189 r, u = r
190 # Short circuit if empty result
191 # BBB: We can remove the "r is not None" check in Zope 2.14
192 # once we don't need to support the "return everything" case
193 # anymore
194 if r is not None and not r:
195 cr.stop_split(i, result=None, limit=limit_result)
196 return LazyCat([])
197
198 # provide detailed info about the pure intersection time
199 intersect_id = i + '#intersection'
200 cr.start_split(intersect_id)
201 # weightedIntersection preserves the values from any mappings
202 # we get, as some indexes don't return simple sets
203 if hasattr(rs, 'items') or hasattr(r, 'items'):
204 _, rs = weightedIntersection(rs, r)
205 else:
206 rs = intersection(rs, r)
207
208 cr.stop_split(intersect_id)
209
210 # consider the time it takes to intersect the index result with
211 # the total resultset to be part of the index time
212 cr.stop_split(i, result=r, limit=limit_result)
213 if not rs:
214 break
215 else:
216 cr.stop_split(i, result=None, limit=limit_result)
217
218 # Try to deduce the sort limit from batching arguments
219 b_start = int(query.get('b_start', 0))
220 b_size = query.get('b_size', None)
221 if b_size is not None:
222 b_size = int(b_size)
223
224 if b_size is not None:
225 limit = b_start + b_size
226 elif limit and b_size is None:
227 b_size = limit
228
229 if rs is None:
230 # None of the indexes found anything to do with the query
231 # We take this to mean that the query was empty (an empty filter)
232 # and so we return everything in the catalog
233 warnings.warn('Your query %s produced no query restriction. '
234 'Currently the entire catalog content is returned. '
235 'In Zope 2.14 this will result in an empty LazyCat '
236 'to be returned.' % repr(cr.make_key(query)),
237 DeprecationWarning, stacklevel=3)
238
239 rlen = len(self)
240 if sort_index is None:
241 sequence, slen = self._limit_sequence(self.data.items(), rlen,
242 b_start, b_size)
243 result = LazyMap(self.instantiate, sequence, slen,
244 actual_result_count=rlen)
245 else:
246 cr.start_split('sort_on')
247 result = self.sortResults(
248 self.data, sort_index, reverse, limit, merge,
249 actual_result_count=rlen, b_start=b_start,
250 b_size=b_size)
251 cr.stop_split('sort_on', None)
252 elif rs:
253 # We got some results from the indexes.
254 # Sort and convert to sequences.
255 # XXX: The check for 'values' is really stupid since we call
256 # items() and *not* values()
257 rlen = len(rs)
258 if sort_index is None and hasattr(rs, 'items'):
259 # having a 'items' means we have a data structure with
260 # scores. Build a new result set, sort it by score, reverse
261 # it, compute the normalized score, and Lazify it.
262
263 if not merge:
264 # Don't bother to sort here, return a list of
265 # three tuples to be passed later to mergeResults
266 # note that data_record_normalized_score_ cannot be
267 # calculated and will always be 1 in this case
268 getitem = self.__getitem__
269 result = [(score, (1, score, rid), getitem)
270 for rid, score in rs.items()]
271 else:
272 cr.start_split('sort_on')
273
274 rs = rs.byValue(0) # sort it by score
275 max = float(rs[0][0])
276
277 # Here we define our getter function inline so that
278 # we can conveniently store the max value as a default arg
279 # and make the normalized score computation lazy
280 def getScoredResult(item, max=max, self=self):
281 """
282 Returns instances of self._v_brains, or whatever is
283 passed into self.useBrains.
284 """
285 score, key = item
286 r=self._v_result_class(self.data[key])\
287 .__of__(aq_parent(self))
288 r.data_record_id_ = key
289 r.data_record_score_ = score
290 r.data_record_normalized_score_ = int(100. * score / max)
291 return r
292
293 sequence, slen = self._limit_sequence(rs, rlen, b_start,
294 b_size)
295 result = LazyMap(getScoredResult, sequence, slen,
296 actual_result_count=rlen)
297 cr.stop_split('sort_on', None)
298
299 elif sort_index is None and not hasattr(rs, 'values'):
300 # no scores
301 if hasattr(rs, 'keys'):
302 rs = rs.keys()
303 sequence, slen = self._limit_sequence(rs, rlen, b_start,
304 b_size)
305 result = LazyMap(self.__getitem__, sequence, slen,
306 actual_result_count=rlen)
307 else:
308 # sort. If there are scores, then this block is not
309 # reached, therefore 'sort-on' does not happen in the
310 # context of a text index query. This should probably
311 # sort by relevance first, then the 'sort-on' attribute.
312 cr.start_split('sort_on')
313 result = self.sortResults(rs, sort_index, reverse, limit,
314 merge, actual_result_count=rlen, b_start=b_start,
315 b_size=b_size)
316 cr.stop_split('sort_on', None)
317 else:
318 # Empty result set
319 result = LazyCat([])
320 cr.stop()
321 return result