X-Git-Url: https://scm.cri.ensmp.fr/git/Plinn.git/blobdiff_plain/3c4367d8e03450e9a73e61f4247145d2b6c86a33..959d888c17d1403d2eeecc19bc4b5e2c8d1debf6:/Products/Plinn/utils.py?ds=sidebyside diff --git a/Products/Plinn/utils.py b/Products/Plinn/utils.py new file mode 100755 index 0000000..de68f22 --- /dev/null +++ b/Products/Plinn/utils.py @@ -0,0 +1,338 @@ +# -*- coding: utf-8 -*- +####################################################################################### +# Plinn - http://plinn.org # +# Copyright (C) 2005-2007 Benoît PIN # +# # +# This program is free software; you can redistribute it and/or # +# modify it under the terms of the GNU General Public License # +# as published by the Free Software Foundation; either version 2 # +# of the License, or (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program; if not, write to the Free Software # +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # +####################################################################################### +""" Plinn public utilities + + +""" + +import string +import re +from types import StringType +from random import randrange +from Acquisition import aq_base +from quopri import encodestring +from zope.globalrequest import getRequest +from AccessControl.PermissionRole import rolesForPermissionOn +from AccessControl import ModuleSecurityInfo +from AccessControl import getSecurityManager +from AccessControl.User import UnrestrictedUser +from OFS.CopySupport import _cb_decode, _cb_encode, cookie_path +from Products.CMFCore.utils import getToolByName, getUtilityByInterfaceName +from Products.CMFCore.exceptions import BadRequest +from Products.Utf8Splitter.Utf8Splitter import Utf8Utils +from Globals import REPLACEABLE, NOT_REPLACEABLE, UNIQUE +from zope.i18n import translate as i18ntranslate +from zope.i18n.interfaces import IUserPreferredLanguages +from zope.i18nmessageid import MessageFactory +from zope.component.interfaces import ComponentLookupError +from zope.dottedname.resolve import resolve as resolve_dotted_name +from zope.component import queryAdapter + +_marker = [] + +security = ModuleSecurityInfo( 'Products.Plinn.utils' ) + +security.declarePublic('thisObjectComeFromPortalSkin') +def thisObjectComeFromPortalSkin(ob, portal=None): + """ check if ob comes from portal_skins """ + if not portal : + portal = getToolByName(ob, 'portal_url') + portal = portal.getPortalObject() + + if ob.aq_self == portal.aq_self : + return False + + obId = ob.id + if callable(obId) : + obId = obId() + + sob = getattr(portal, obId, None) + + if sob is None : + return False + elif not(sob.aq_inner.aq_self is ob.aq_inner.aq_self) : + return False + else : + try : + portal._checkId(obId) + return True + except BadRequest : + return False + +security.declarePublic('listActionProviders_') +def listActionProviders_(context) : + atool = getToolByName(context, 'portal_actions') + return atool.listActionProviders() + +def capitalizeCompoundGivenName(givenName) : + givenName = givenName.strip() + givenNames = ' '.join(givenName.split('-')).split() + givenNameCapitalized = '-'.join(map(string.capitalize, givenNames)) + return givenNameCapitalized + + +def formatFullName(memberName, memberGivenName, memberId, nameBefore=1) : + memberName = memberName.decode('utf-8') + memberGivenName = memberGivenName.decode('utf-8') + memberFullName = u'' + if memberName and memberGivenName : + if nameBefore : + memberFullName = memberName.capitalize() + ' ' + capitalizeCompoundGivenName(memberGivenName) + else : + memberFullName = capitalizeCompoundGivenName(memberGivenName) + ' ' + memberName.capitalize() + + elif memberName and not memberGivenName : + memberFullName = memberName.capitalize() + + elif not memberName and memberGivenName : + memberFullName = capitalizeCompoundGivenName(memberGivenName) + + else : + memberFullName = memberId + + return memberFullName.encode('utf-8') + +# from OFS.ObjectManager #63 +bad_url_chars = re.compile(r'[^a-zA-Z0-9-_~,.$\(\)@]') + +security.declarePublic('makeValidId') +def makeValidId(self, id, allow_dup=0): + id = Utf8Utils.desacc(id) + id = bad_url_chars.sub('-', id) + # If allow_dup is false, an error will be raised if an object + # with the given id already exists. If allow_dup is true, + # only check that the id string contains no illegal chars; + # check_valid_id() will be called again later with allow_dup + # set to false before the object is added. + + makeRandomId = False + if id in ('.', '..'): + makeRandomId = True + if id.startswith('_'): + id = id.lstrip('_') + if id.startswith('aq_'): + id = id[3:] + + while id.endswith('__') : + id = id[:-1] + if not allow_dup: + obj = getattr(self, id, None) + if obj is not None: + # An object by the given id exists either in this + # ObjectManager or in the acquisition path. + flags = getattr(obj, '__replaceable__', NOT_REPLACEABLE) + if hasattr(aq_base(self), id): + # The object is located in this ObjectManager. + if not flags & REPLACEABLE: + makeRandomId = True + # else the object is replaceable even if the UNIQUE + # flag is set. + elif flags & UNIQUE: + makeRandomId = True + if id == 'REQUEST': + makeRandomId = True + + if makeRandomId is True : + id = str(randrange(2,10000)) + id + return id + + + +def _checkMemberPermission(userid, permission, obj, StringType = type('')): + user = obj.aq_inner.acl_users.getUser(userid) + roles = rolesForPermissionOn(permission, obj) + if type(roles) is StringType: + roles=[roles] + if user.allowed( obj, roles ): + return 1 + return 0 + +def getCPInfo(self) : + if self.REQUEST.RESPONSE.cookies.has_key('__cp') : + cp = self.REQUEST.RESPONSE.cookies['__cp']['value'] + else : + cp = self.REQUEST.get('__cp') + try: cp = _cb_decode(cp) + except: return None + return cp + + +def popCP(self, indexes=None) : + try: cp = _cb_decode(self.REQUEST['__cp']) + except: return + + paths = list(cp[1]) + if indexes is not None : + indexes = list(indexes) + indexes.sort() + indexes.reverse() + for index in indexes : + paths.pop(index) + else : + paths.pop() + + if not paths : + self.REQUEST.RESPONSE.expireCookie('__cp', path=self.REQUEST['BASEPATH1'] or "/") + else : + paths = tuple(paths) + cp = _cb_encode( (cp[0], paths) ) + resp = self.REQUEST['RESPONSE'] + resp.setCookie('__cp', cp, path='%s' % cookie_path(self.REQUEST)) + +security.declarePublic('Message') +Message = MessageFactory('plinn') + +security.declarePublic('translate') +def translate(message, context=None): + """ Translate i18n message. + """ + if isinstance(message, Exception): + try: + message = message[0] + except (TypeError, IndexError): + pass + if not context : + request = getRequest() + else : + request = context.REQUEST + return i18ntranslate(message, domain='plinn', context=request) + +security.declarePublic('desacc') +desacc = Utf8Utils.desacc + +security.declarePublic('getPreferredLanguages') +def getPreferredLanguages(context): + """ returns browser prefered languages""" + request = getattr(context, 'REQUEST', None) + if request is not None : + adapter = IUserPreferredLanguages(request, None) + if adapter is not None : + return adapter.getPreferredLanguages() + return [] + +security.declarePublic('getBestTranslationLanguage') +def getBestTranslationLanguage(langs, context): + """ returns best translation language according + to available languages (param langs) + and user preferences (retrieves by context) + """ + request = getattr(context, 'REQUEST', None) + if request : + negociator = getUtilityByInterfaceName('zope.i18n.interfaces.INegotiator') + return negociator.getLanguage(langs, request) or langs[0] + else : + return langs[0] + +security.declarePublic('getAdapterByInterface') +def getAdapterByInterface(ob, dotted_name, default=_marker) : + """ Get the adapter which provides the interface on the given object. + """ + try: + iface = resolve_dotted_name(dotted_name) + except ImportError: + if default is _marker: + raise ComponentLookupError, dotted_name + return default + + adapter = queryAdapter(ob, iface, default=default) + if adapter is _marker : + raise ComponentLookupError, "no adapter providing %r found on %r" % (dotted_name, ob) + + # the adapter must be wrapped to allow security mahinery to work. + if adapter != default : + return adapter.__of__(ob) + else : + return default + +security.declarePublic('encodeQuopriEmail') +def encodeQuopriEmail(name, email) : + qpName = encodestring(name).replace('=\n', '') + return '''"=?utf-8?q?%s?=" <%s>''' % (qpName, email) + +security.declarePublic('encodeMailHeader') +def encodeMailHeader(content) : + s = encodestring(content).replace('=\n', '') + s = s.replace('_', '=5F') + s = s.replace(' ', '_') + + lines = [] + STEP = 50 + start = 0 + stop = STEP + part = s[start:stop] + lines.append(part) + + while len(part) == STEP: + start = start + STEP + stop = stop + STEP + part = s[start:stop] + lines.append(part) + + lines = [' =?utf-8?Q?%s?=' % part for part in lines] + s = '\n'.join(lines) + s = s.strip() + return s + + +def _sudo(func, userid=None) : + """ + execute func or any callable object + without restriction. Used to switch off + security assertions (eg. checkPermission) encountered + during the execution. + """ + + sm = getSecurityManager() + restrictedUser = sm.getUser() + + if not userid : + userid = restrictedUser.getId() + + sm._context.user = UnrestrictedUser(userid, '', (), ()) + + deferedEx = None + try : + ret = func() + except Exception, e : + deferedEx = e + + sm._context.user = restrictedUser + + if deferedEx is not None : + raise e + + return ret + +security.declarePublic('searchContentsWithLocalRolesForAuthenticatedUser') +def searchContentsWithLocalRolesForAuthenticatedUser(**kw): + mtool = getUtilityByInterfaceName('Products.CMFCore.interfaces.IMembershipTool') + ctool = getUtilityByInterfaceName('Products.CMFCore.interfaces.ICatalogTool') + member = mtool.getAuthenticatedMember() + userid = member.getId() + userAndGroups = ['user:%s' % userid] + + getGroups = getattr(member, 'getGroups', None) + if getGroups is not None : + for group in getGroups(): + userAndGroups.append('user:'+group) + + kw[ 'allowedRolesAndUsers' ] = userAndGroups + + return ctool.unrestrictedSearchResults(**kw)