Module exchangelib.services.common
Functions
def attachment_ids_element(items, version, tag='m:AttachmentIds')
-
Expand source code
def attachment_ids_element(items, version, tag="m:AttachmentIds"): return _ids_element(items, AttachmentId, version, tag)
def folder_ids_element(folders, version, tag='m:FolderIds')
-
Expand source code
def folder_ids_element(folders, version, tag="m:FolderIds"): return _ids_element(folders, FolderId, version, tag)
def item_ids_element(items, version, tag='m:ItemIds')
-
Expand source code
def item_ids_element(items, version, tag="m:ItemIds"): return _ids_element(items, ItemId, version, tag)
def parse_folder_elem(elem, folder)
-
Expand source code
def parse_folder_elem(elem, folder): if isinstance(folder, RootOfHierarchy): res = folder.from_xml(elem=elem, account=folder.account) elif isinstance(folder, Folder): res = folder.from_xml_with_root(elem=elem, root=folder.root) else: raise ValueError(f"Unsupported folder class: {folder}") # Not all servers support fetching the DistinguishedFolderId field. Add it back here. if folder._distinguished_id and not res._distinguished_id: res._distinguished_id = folder._distinguished_id return res
def shape_element(tag, shape, additional_fields, version)
-
Expand source code
def shape_element(tag, shape, additional_fields, version): shape_elem = create_element(tag) add_xml_child(shape_elem, "t:BaseShape", shape) if additional_fields: additional_properties = create_element("t:AdditionalProperties") expanded_fields = chain(*(f.expand(version=version) for f in additional_fields)) # 'path' is insufficient to consistently sort additional properties. For example, we have both # 'contacts:Companies' and 'task:Companies' with path 'companies'. Sort by both 'field_uri' and 'path'. # Extended properties do not have a 'field_uri' value. set_xml_value( additional_properties, sorted(expanded_fields, key=lambda f: (getattr(f.field, "field_uri", ""), f.path)), version=version, ) shape_elem.append(additional_properties) return shape_elem
def to_item_id(item, item_cls)
-
Expand source code
def to_item_id(item, item_cls): # Coerce a tuple, dict or object to an 'item_cls' instance. Used to create [Parent][Item|Folder]Id instances from a # variety of input. if isinstance(item, (BaseItemId, AttachmentId)): # Allow any BaseItemId subclass to pass unaltered return item if isinstance(item, (BaseFolder, BaseItem)): return item.to_id() if isinstance(item, (str, tuple, list)): return item_cls(*item) return item_cls(item.id, item.changekey)
Classes
class EWSAccountService (*args, **kwargs)
-
Expand source code
class EWSAccountService(EWSService, metaclass=abc.ABCMeta): """Base class for services that act on items concerning a single Mailbox on the server.""" NO_VALID_SERVER_VERSIONS = ErrorInvalidSchemaVersionForMailboxVersion # Marks services that need affinity to the backend server prefer_affinity = False def __init__(self, *args, **kwargs): self.account = kwargs.pop("account") kwargs["protocol"] = self.account.protocol super().__init__(*args, **kwargs) @property def _version_hint(self): return self.account.version @_version_hint.setter def _version_hint(self, value): self.account.version = value def _handle_response_cookies(self, session): super()._handle_response_cookies(session=session) # See self._extra_headers() for documentation on affinity if self.prefer_affinity: for cookie in session.cookies: if cookie.name == "X-BackEndOverrideCookie": self.account.affinity_cookie = cookie.value break def _extra_headers(self): headers = super()._extra_headers() # See # https://blogs.msdn.microsoft.com/webdav_101/2015/05/11/best-practices-ews-authentication-and-access-issues/ headers["X-AnchorMailbox"] = self.account.primary_smtp_address # See # https://docs.microsoft.com/en-us/exchange/client-developer/exchange-web-services/how-to-maintain-affinity-between-group-of-subscriptions-and-mailbox-server if self.prefer_affinity: headers["X-PreferServerAffinity"] = "True" if self.account.affinity_cookie: headers["X-BackEndOverrideCookie"] = self.account.affinity_cookie return headers @property def _account_to_impersonate(self): if self.account.access_type == IMPERSONATION: return self.account.identity return super()._account_to_impersonate @property def _timezone(self): return self.account.default_timezone
Base class for services that act on items concerning a single Mailbox on the server.
Ancestors
Subclasses
- ArchiveItem
- EWSPagingService
- CreateAttachment
- CreateFolder
- CreateItem
- CreateUserConfiguration
- DeleteAttachment
- DeleteFolder
- DeleteItem
- DeleteUserConfiguration
- EmptyFolder
- ExportItems
- GetAttachment
- GetDelegate
- GetEvents
- GetFolder
- GetItem
- GetPersona
- GetStreamingEvents
- GetUserConfiguration
- GetUserOofSettings
- GetInboxRules
- UpdateInboxRules
- MarkAsJunk
- MoveFolder
- MoveItem
- SendItem
- SetUserOofSettings
- Subscribe
- SyncFolder
- Unsubscribe
- BaseUpdateService
- UpdateUserConfiguration
- UploadItems
Class variables
var prefer_affinity
Inherited members
class EWSPagingService (*args, **kwargs)
-
Expand source code
class EWSPagingService(EWSAccountService): PAGE_SIZE = 100 # A default page size for all paging services. This is the number of items we request per page paging_container_name = None # The name of the element that contains paging information and the paged results def __init__(self, *args, **kwargs): self.page_size = kwargs.pop("page_size", None) or self.PAGE_SIZE if not isinstance(self.page_size, int): raise InvalidTypeError("page_size", self.page_size, int) if self.page_size < 1: raise ValueError(f"'page_size' {self.page_size} must be a positive number") super().__init__(*args, **kwargs) def _response_generator(self, payload): """Send the payload to the server, and return the response. :param payload: payload as an XML object :return: the response, as XML objects """ response = self._get_response_xml(payload=payload) return (self._get_page(message) for message in response) def _paged_call(self, payload_func, max_items, folders, **kwargs): """Call a service that supports paging requests. Return a generator over all response items. Keeps track of all paging-related counters. """ paging_infos = {f: dict(item_count=0, next_offset=None) for f in folders} common_next_offset = kwargs["offset"] total_item_count = 0 while True: if not paging_infos: # Paging is done for all folders break log.debug("Getting page at offset %s (max_items %s)", common_next_offset, max_items) kwargs["offset"] = common_next_offset kwargs["folders"] = paging_infos.keys() # Only request the paging of the remaining folders. pages = self._get_pages(payload_func, kwargs, len(paging_infos)) for (page, next_offset), (f, paging_info) in zip(pages, list(paging_infos.items())): paging_info["next_offset"] = next_offset if isinstance(page, Exception): # Assume this folder no longer works. Don't attempt to page it again. log.debug("Exception occurred for folder %s. Removing.", f) del paging_infos[f] yield page continue if page is not None: for elem in self._get_elems_from_page(page, max_items, total_item_count): paging_info["item_count"] += 1 total_item_count += 1 yield elem if max_items and total_item_count >= max_items: # No need to continue. Break out of inner loop log.debug("'max_items' count reached (inner)") break if not paging_info["next_offset"]: # Paging is done for this folder. Don't attempt to page it again. log.debug("Paging has completed for folder %s. Removing.", f) del paging_infos[f] continue log.debug("Folder %s still has items", f) # Check sanity of paging offsets, but don't fail. When we are iterating huge collections that take a # long time to complete, the collection may change while we are iterating. This can affect the # 'next_offset' value and make it inconsistent with the number of already collected items. # We may have a mismatch if we stopped early due to reaching 'max_items'. if paging_info["next_offset"] != paging_info["item_count"] and ( not max_items or total_item_count < max_items ): log.warning( "Unexpected next offset: %s -> %s. Maybe the server-side collection has changed?", paging_info["item_count"], paging_info["next_offset"], ) # Also break out of outer loop if max_items and total_item_count >= max_items: log.debug("'max_items' count reached (outer)") break common_next_offset = self._get_next_offset(paging_infos.values()) if common_next_offset is None: # Paging is done for all folders break @staticmethod def _get_paging_values(elem): """Read paging information from the paging container element.""" offset_attr = elem.get("IndexedPagingOffset") next_offset = None if offset_attr is None else int(offset_attr) item_count = int(elem.get("TotalItemsInView")) is_last_page = elem.get("IncludesLastItemInRange").lower() in ("true", "0") log.debug("Got page with offset %s, item_count %s, last_page %s", next_offset, item_count, is_last_page) # Clean up contradictory paging values if next_offset is None and not is_last_page: log.debug("Not last page in range, but server didn't send a page offset. Assuming first page") next_offset = 1 if next_offset is not None and is_last_page: if next_offset != item_count: log.debug("Last page in range, but we still got an offset. Assuming paging has completed") next_offset = None if not item_count and not is_last_page: log.debug("Not last page in range, but also no items left. Assuming paging has completed") next_offset = None if item_count and next_offset == 0: log.debug("Non-zero offset, but also no items left. Assuming paging has completed") next_offset = None return item_count, next_offset def _get_page(self, message): """Get a single page from a request message, and return the container and next offset.""" paging_elem = self._get_element_container(message=message, name=self.paging_container_name) if isinstance(paging_elem, Exception): return paging_elem, None item_count, next_offset = self._get_paging_values(paging_elem) if not item_count: paging_elem = None return paging_elem, next_offset def _get_elems_from_page(self, elem, max_items, total_item_count): container = elem.find(self.element_container_name) if container is None: raise MalformedResponseError( f"No {self.element_container_name} elements in ResponseMessage ({xml_to_str(elem)})" ) for e in self._get_elements_in_container(container=container): if max_items and total_item_count >= max_items: # No need to continue. Break out of elements loop log.debug("'max_items' count reached (elements)") break yield e def _get_pages(self, payload_func, kwargs, expected_message_count): """Request a page, or a list of pages if multiple collections are pages in a single request. Return each page. """ payload = payload_func(**kwargs) page_elems = list(self._get_elements(payload=payload)) if len(page_elems) != expected_message_count: raise MalformedResponseError( f"Expected {expected_message_count} items in 'response', got {len(page_elems)}" ) return page_elems @staticmethod def _get_next_offset(paging_infos): next_offsets = {p["next_offset"] for p in paging_infos if p["next_offset"] is not None} if not next_offsets: # Paging is done for all messages return None # We cannot guarantee that all messages that have a next_offset also have the *same* next_offset. This is # because the collections that we are iterating may change while iterating. We'll do our best, but we cannot # guarantee 100% consistency when large collections are simultaneously being changed on the server. # # It's not possible to supply a per-folder offset when iterating multiple folders, so we'll just have to # choose something that is most likely to work. Select the lowest of all the values to at least make sure # we don't miss any items, although we may then get duplicates ¯\_(ツ)_/¯ if len(next_offsets) > 1: log.warning("Inconsistent next_offset values: %r. Using lowest value", next_offsets) return min(next_offsets)
Base class for services that act on items concerning a single Mailbox on the server.
Ancestors
Subclasses
Class variables
var PAGE_SIZE
var paging_container_name
Inherited members
class EWSService (protocol, chunk_size=None, timeout=None)
-
Expand source code
class EWSService(SupportedVersionClassMixIn, metaclass=abc.ABCMeta): """Base class for all EWS services.""" CHUNK_SIZE = 100 # A default chunk size for all services. This is the number of items we send in a single request SERVICE_NAME = None # The name of the SOAP service element_container_name = None # The name of the XML element wrapping the collection of returned items returns_elements = True # If False, the service does not return response elements, just the ResponseCode status # Return exception instance instead of raising exceptions for the following errors when contained in an element ERRORS_TO_CATCH_IN_RESPONSE = ( EWSWarning, ErrorCannotDeleteObject, ErrorInvalidChangeKey, ErrorItemNotFound, ErrorItemSave, ErrorInvalidIdMalformed, ErrorMessageSizeExceeded, ErrorCannotDeleteTaskOccurrence, ErrorMimeContentConversionFailed, ErrorRecurrenceHasNoOccurrence, ErrorCorruptData, ErrorItemCorrupt, ErrorMailRecipientNotFound, ) # Similarly, define the warnings we want to return un-raised WARNINGS_TO_CATCH_IN_RESPONSE = ErrorBatchProcessingStopped # Define the warnings we want to ignore, to let response processing proceed WARNINGS_TO_IGNORE_IN_RESPONSE = () # The exception type to raise when all attempted API versions failed NO_VALID_SERVER_VERSIONS = ErrorInvalidServerVersion NS_MAP = {k: v for k, v in ns_translation.items() if k in ("s", "m", "t")} def __init__(self, protocol, chunk_size=None, timeout=None): self.chunk_size = chunk_size or self.CHUNK_SIZE if not isinstance(self.chunk_size, int): raise InvalidTypeError("chunk_size", chunk_size, int) if self.chunk_size < 1: raise ValueError(f"'chunk_size' {self.chunk_size} must be a positive number") if self.supported_from and protocol.version.build < self.supported_from: raise NotImplementedError( f"Service {self.SERVICE_NAME!r} only supports server versions from {self.supported_from or '*'} to " f"{self.deprecated_from or '*'} (server has {protocol.version})" ) self.protocol = protocol # Allow a service to override the default protocol timeout. Useful for streaming services self.timeout = timeout # Controls whether the HTTP request should be streaming or fetch everything at once self.streaming = False # Streaming connection variables self._streaming_session = None self._streaming_response = None def __del__(self): # pylint: disable=bare-except try: if self.streaming: # Make sure to clean up lingering resources self.stop_streaming() except Exception: # nosec # __del__ should never fail pass # The following two methods are the minimum required to be implemented by subclasses, but the name and number of # kwargs differs between services. Therefore, we cannot make these methods abstract. # @abc.abstractmethod # def call(self, **kwargs): # """Defines the arguments required by the service. Arguments are basic Python types or EWSElement objects. # Returns either XML objects or EWSElement objects. # """" # pass # @abc.abstractmethod # def get_payload(self, **kwargs): # """Using the arguments from .call(), return the payload expected by the service, as an XML object. The XML # object should consist of a SERVICE_NAME element and everything within that. # """ # pass def get(self, expect_result=True, **kwargs): """Like .call(), but expects exactly one result from the server, or zero when expect_result=False, or either zero or one when expect_result=None. Returns either one object or None. :param expect_result: None, True, or False :param kwargs: Same as arguments for .call() :return: Same as .call(), but returns either None or exactly one item """ res = list(self.call(**kwargs)) # Raise any errors for r in res: if isinstance(r, Exception): raise r if expect_result is None and not res: # Allow empty result return None if expect_result is False: if res: raise ValueError(f"Expected result length 0, but got {res}") return None if len(res) != 1: raise ValueError(f"Expected result length 1, but got {res}") return res[0] def parse(self, xml): """Used mostly for testing, when we want to parse static XML data.""" resp = DummyResponse(content=xml, streaming=self.streaming) _, body = self._get_soap_parts(response=resp) return self._elems_to_objs(self._get_elements_in_response(response=self._get_soap_messages(body=body))) def wrap(self, content, api_version=None): """Generate the necessary boilerplate XML for a raw SOAP request. The XML is specific to the server version. ExchangeImpersonation allows to act as the user we want to impersonate. RequestServerVersion element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/requestserverversion ExchangeImpersonation element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/exchangeimpersonation TimeZoneContent element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/timezonecontext :param content: :param api_version: """ envelope = create_element("s:Envelope", nsmap=self.NS_MAP) header = create_element("s:Header") if api_version: request_server_version = create_element("t:RequestServerVersion", attrs=dict(Version=api_version)) header.append(request_server_version) identity = self._account_to_impersonate if identity: add_xml_child(header, "t:ExchangeImpersonation", identity) timezone = self._timezone if timezone: timezone_context = create_element("t:TimeZoneContext") timezone_definition = create_element("t:TimeZoneDefinition", attrs=dict(Id=timezone.ms_id)) timezone_context.append(timezone_definition) header.append(timezone_context) if len(header): envelope.append(header) body = create_element("s:Body") body.append(content) envelope.append(body) return xml_to_str(envelope, encoding=DEFAULT_ENCODING, xml_declaration=True) def _elems_to_objs(self, elems): """Takes a generator of XML elements and exceptions. Returns the equivalent Python objects (or exceptions).""" for elem in elems: # Allow None here. Some services don't return an ID if the target folder is outside the mailbox. if isinstance(elem, (Exception, type(None))): yield elem continue yield self._elem_to_obj(elem) def _elem_to_obj(self, elem): """Convert a single XML element to a single Python object""" if not self.returns_elements: raise RuntimeError("Incorrect call to method when 'returns_elements' is False") raise NotImplementedError() @property def _version_hint(self): # We may be here due to version guessing in Protocol.version, so we can't use the self.protocol.version property return self.protocol.config.version @_version_hint.setter def _version_hint(self, value): self.protocol.config.version = value def _extra_headers(self): headers = {} identity = self._account_to_impersonate if identity and identity.primary_smtp_address: # See # https://blogs.msdn.microsoft.com/webdav_101/2015/05/11/best-practices-ews-authentication-and-access-issues/ headers["X-AnchorMailbox"] = identity.primary_smtp_address return headers @property def _account_to_impersonate(self): if self.protocol and isinstance(self.protocol.credentials, BaseOAuth2Credentials): return self.protocol.credentials.identity return None @property def _timezone(self): return None def _response_generator(self, payload): """Send the payload to the server, and return the response. :param payload: payload as an XML object :return: the response, as XML objects """ response = self._get_response_xml(payload=payload) return self._get_elements_in_response(response=response) def _chunked_get_elements(self, payload_func, items, **kwargs): """Yield elements in a response. Like ._get_elements(), but chop items into suitable chunks and send multiple requests. :param payload_func: A reference to .payload() :param items: An iterable of items (messages, folders, etc.) to process :param kwargs: Same as arguments for .call(), except for the 'items' argument :return: Same as ._get_elements() """ # If the input for a service is a QuerySet, it can be difficult to remove exceptions before now filtered_items = filter(lambda item: not isinstance(item, Exception), items) for i, chunk in enumerate(chunkify(filtered_items, self.chunk_size), start=1): log.debug("Processing chunk %s containing %s items", i, len(chunk)) yield from self._get_elements(payload=payload_func(chunk, **kwargs)) def stop_streaming(self): if not self.streaming: raise RuntimeError("Attempt to stop a non-streaming service") if self._streaming_response: self._streaming_response.close() # Release memory self._streaming_response = None if self._streaming_session: self.protocol.release_session(self._streaming_session) self._streaming_session = None def _get_elements(self, payload): """Send the payload to be sent and parsed. Handles and re-raise exceptions that are not meant to be returned to the caller as exception objects. Retry the request according to the retry policy. """ wait = self.protocol.RETRY_WAIT while True: try: # Create a generator over the response elements so exceptions in response elements are also raised # here and can be handled. yield from self._response_generator(payload=payload) # TODO: Restore session pool size on succeeding request? return except TokenExpiredError: # Retry immediately continue except ErrorServerBusy as e: if not e.back_off: e.back_off = wait self._handle_backoff(e) except ErrorExceededConnectionCount as e: # ErrorExceededConnectionCount indicates that the connecting user has too many open TCP connections to # the server. Decrease our session pool size and retry immediately. try: self.protocol.decrease_poolsize() continue except SessionPoolMinSizeReached: # We're already as low as we can go. Let the user handle this. raise e except ErrorTimeoutExpired as e: # ErrorTimeoutExpired can be caused by a busy server, or by an overly large request. If it's the latter, # we don't want to continue hammering the server with this request indefinitely. Instead, lower the # connection count, if possible, and retry the request. if self.protocol.session_pool_size <= 1: # We're already as low as we can go. We can no longer use the session count to put less load # on the server. If this is a chunked request we could lower the chunk size, but we don't have a # way of doing that from this part of the code yet. Let the user handle this. raise e self._handle_backoff(ErrorServerBusy(f"Reraised from {e.__class__.__name__}({e})", back_off=wait)) except (ErrorTooManyObjectsOpened, ErrorInternalServerTransientError) as e: # ErrorTooManyObjectsOpened means there are too many connections to the Exchange database. This is very # often a symptom of sending too many requests. self._handle_backoff(ErrorServerBusy(f"Reraised from {e.__class__.__name__}({e})", back_off=wait)) finally: wait *= 2 # Increase delay for every retry if self.streaming: self.stop_streaming() def _handle_response_cookies(self, session): """Code to react on response cookies""" def _get_response(self, payload, api_version): """Send the actual HTTP request and get the response.""" if self.streaming: # Make sure to clean up lingering resources self.stop_streaming() session = self.protocol.get_session() r, session = post_ratelimited( protocol=self.protocol, session=session, url=self.protocol.service_endpoint, headers=self._extra_headers(), data=self.wrap( content=payload, api_version=api_version, ), stream=self.streaming, timeout=self.timeout or self.protocol.TIMEOUT, ) self._handle_response_cookies(session) if self.streaming: # We con only release the session when we have fully consumed the response. Save session and response # objects for later. self._streaming_session, self._streaming_response = session, r else: self.protocol.release_session(session) return r @classmethod def supported_api_versions(cls): """Return API versions supported by the service, sorted from newest to oldest""" return sorted({v.api_version for v in Version.all_versions() if cls.supports_version(v)}, reverse=True) def _api_versions_to_try(self): # Put the hint first in the list, and then all other versions except the hint, from newest to oldest return (self._version_hint.api_version,) + tuple( v for v in self.supported_api_versions() if v != self._version_hint.api_version ) def _get_response_xml(self, payload): """Send the payload to the server and return relevant elements from the result. Several things happen here: * Wraps the payload is wrapped in SOAP headers and sends to the server * Negotiates the Exchange API version and stores it in the protocol object * Handles connection errors and possibly re-raises them as ErrorServerBusy * Raises SOAP errors * Raises EWS errors or passes them on to the caller :param payload: The request payload, as an XML object :return: A generator of XML objects or None if the service does not return a result """ # Microsoft really doesn't want to make our lives easy. The server may report one version in our initial version # guessing tango, but then the server may decide that any arbitrary legacy backend server may actually process # the request for an account. Prepare to handle version-related errors and set the server version per-account. log.debug("Calling service %s", self.SERVICE_NAME) for api_version in self._api_versions_to_try(): log.debug("Trying API version %s", api_version) r = self._get_response(payload=payload, api_version=api_version) if self.streaming: # Let 'requests' decode raw data automatically r.raw.decode_content = True try: header, body = self._get_soap_parts(response=r) except Exception: r.close() # Release memory raise # The body may contain error messages from Exchange, but we still want to collect version info if header is not None: self._update_api_version(api_version=api_version, header=header) try: return self._get_soap_messages(body=body) except ( ErrorInvalidServerVersion, ErrorIncorrectSchemaVersion, ErrorInvalidRequest, ErrorInvalidSchemaVersionForMailboxVersion, ): # The guessed server version is wrong. Try the next version log.debug("API version %s was invalid", api_version) continue finally: if not self.streaming: # In streaming mode, we may not have accessed the raw stream yet. Caller must handle this. r.close() # Release memory raise self.NO_VALID_SERVER_VERSIONS(f"Tried versions {self._api_versions_to_try()} but all were invalid") def _handle_backoff(self, e): """Take a request from the server to back off and checks the retry policy for what to do. Re-raise the exception if conditions are not met. :param e: An ErrorServerBusy instance :return: """ log.debug("Got ErrorServerBusy (back off %s seconds)", e.back_off) # ErrorServerBusy is very often a symptom of sending too many requests. Scale back connections if possible. with suppress(SessionPoolMinSizeReached): self.protocol.decrease_poolsize() if self.protocol.retry_policy.fail_fast: raise e self.protocol.retry_policy.back_off(e.back_off) # We'll warn about this later if we actually need to sleep def _update_api_version(self, api_version, header): """Parse the server version contained in SOAP headers and update the version hint stored by the caller, if necessary. """ try: head_version = Version.from_soap_header(requested_api_version=api_version, header=header) except TransportError as te: log.debug("Failed to update version info (%s)", te) return if self._version_hint == head_version: # Nothing to do return log.debug("Found new version (%s -> %s)", self._version_hint, head_version) # The api_version that worked was different from our hint, or we never got a build version. Store the working # version. self._version_hint = head_version @classmethod def _response_tag(cls): """Return the name of the element containing the service response.""" return f"{{{MNS}}}{cls.SERVICE_NAME}Response" @staticmethod def _response_messages_tag(): """Return the name of the element containing service response messages.""" return f"{{{MNS}}}ResponseMessages" @classmethod def _response_message_tag(cls): """Return the name of the element of a single response message.""" return f"{{{MNS}}}{cls.SERVICE_NAME}ResponseMessage" @classmethod def _get_soap_parts(cls, response): """Split the SOAP response into its headers and body elements.""" try: root = to_xml(response.iter_content()) except ParseError as e: raise SOAPError(f"Bad SOAP response: {e}") header = root.find(f"{{{SOAPNS}}}Header") if header is None: # This is normal when the response contains SOAP-level errors log.debug("No header in XML response") body = root.find(f"{{{SOAPNS}}}Body") if body is None: raise MalformedResponseError("No Body element in SOAP response") return header, body def _get_soap_messages(self, body): """Return the elements in the response containing the response messages. Raises any SOAP exceptions.""" response = body.find(self._response_tag()) if response is None: fault = body.find(f"{{{SOAPNS}}}Fault") if fault is None: raise SOAPError(f"Unknown SOAP response (expected {self._response_tag()} or Fault): {xml_to_str(body)}") self._raise_soap_errors(fault=fault) # Will throw SOAPError or custom EWS error response_messages = response.find(self._response_messages_tag()) if response_messages is None: # Result isn't delivered in a list of FooResponseMessages, but directly in the FooResponse. Consumers expect # a list, so return a list return [response] return response_messages.findall(self._response_message_tag()) @classmethod def _raise_soap_errors(cls, fault): """Parse error messages contained in SOAP headers and raise as exceptions defined in this package.""" # Fault: See http://www.w3.org/TR/2000/NOTE-SOAP-20000508/#_Toc478383507 fault_code = get_xml_attr(fault, "faultcode") fault_string = get_xml_attr(fault, "faultstring") fault_actor = get_xml_attr(fault, "faultactor") detail = fault.find("detail") if detail is not None: code = get_xml_attr(detail, f"{{{ENS}}}ResponseCode") if code: code = code.strip() msg = get_xml_attr(detail, f"{{{ENS}}}Message") if msg: msg = msg.strip() msg_xml = detail.find(f"{{{TNS}}}MessageXml") # Crazy. Here, it's in the TNS namespace if code == "ErrorServerBusy": back_off = None with suppress(TypeError, AttributeError): value = msg_xml.find(f"{{{TNS}}}Value") if value.get("Name") == "BackOffMilliseconds": back_off = int(value.text) / 1000.0 # Convert to seconds raise ErrorServerBusy(msg, back_off=back_off) if code == "ErrorSchemaValidation" and msg_xml is not None: line_number = get_xml_attr(msg_xml, f"{{{TNS}}}LineNumber") line_position = get_xml_attr(msg_xml, f"{{{TNS}}}LinePosition") violation = get_xml_attr(msg_xml, f"{{{TNS}}}Violation") if violation: msg = f"{msg} {violation}" if line_number or line_position: msg = f"{msg} (line: {line_number} position: {line_position})" try: raise vars(errors)[code](msg) except KeyError: detail = f"{cls.SERVICE_NAME}: code: {code} msg: {msg} ({xml_to_str(detail)})" with suppress(KeyError): raise vars(errors)[fault_code](fault_string) raise SOAPError(f"SOAP error code: {fault_code} string: {fault_string} actor: {fault_actor} detail: {detail}") def _get_element_container(self, message, name=None): """Return the XML element in a response element that contains the elements we want the service to return. For example, in a GetFolder response, 'message' is the GetFolderResponseMessage element, and we return the 'Folders' element: <m:GetFolderResponseMessage ResponseClass="Success"> <m:ResponseCode>NoError</m:ResponseCode> <m:Folders> <t:Folder> <t:FolderId Id="AQApA=" ChangeKey="AQAAAB" /> [...] </t:Folder> </m:Folders> </m:GetFolderResponseMessage> Some service responses don't have a containing element for the returned elements ('name' is None). In that case, we return the 'SomeServiceResponseMessage' element. If the response contains a warning or an error message, we raise the relevant exception, unless the error class is contained in WARNINGS_TO_CATCH_IN_RESPONSE or ERRORS_TO_CATCH_IN_RESPONSE, in which case we return the exception instance. """ # ResponseClass is an XML attribute of various SomeServiceResponseMessage elements: Possible values are: # Success, Warning, Error. See e.g. # https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/finditemresponsemessage response_class = message.get("ResponseClass") # ResponseCode, MessageText: See # https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/responsecode response_code = get_xml_attr(message, f"{{{MNS}}}ResponseCode") if response_class == "Success" and response_code == "NoError": if not name: return message container = message.find(name) if container is None: raise MalformedResponseError(f"No {name} elements in ResponseMessage ({xml_to_str(message)})") return container if response_code == "NoError": return True # Raise any non-acceptable errors in the container, or return the container or the acceptable exception instance msg_text = get_xml_attr(message, f"{{{MNS}}}MessageText") msg_xml = message.find(f"{{{MNS}}}MessageXml") rule_errors = message.find(f"{{{MNS}}}RuleOperationErrors") if response_class == "Warning": try: raise self._get_exception(code=response_code, text=msg_text, msg_xml=msg_xml) except self.WARNINGS_TO_CATCH_IN_RESPONSE as e: return e except self.WARNINGS_TO_IGNORE_IN_RESPONSE as e: log.warning(str(e)) container = message.find(name) if container is None: raise MalformedResponseError(f"No {name} elements in ResponseMessage ({xml_to_str(message)})") return container # response_class == 'Error', or 'Success' and not 'NoError' try: raise self._get_exception(code=response_code, text=msg_text, msg_xml=msg_xml, rule_errors=rule_errors) except self.ERRORS_TO_CATCH_IN_RESPONSE as e: return e @staticmethod def _get_exception(code, text, msg_xml=None, rule_errors=None): """Parse error messages contained in EWS responses and raise as exceptions defined in this package.""" if not code: return TransportError(f"Empty ResponseCode in ResponseMessage (MessageText: {text}, MessageXml: {msg_xml})") if msg_xml is not None: # If this is an ErrorInvalidPropertyRequest error, the xml may contain a specific FieldURI for elem_cls in (FieldURI, IndexedFieldURI, ExtendedFieldURI, ExceptionFieldURI): elem = msg_xml.find(elem_cls.response_tag()) if elem is not None: field_uri = elem_cls.from_xml(elem, account=None) text += f" (field: {field_uri})" break # If this is an ErrorInvalidValueForProperty error, the xml may contain the name and value of the property if code == "ErrorInvalidValueForProperty": msg_parts = {} for elem in msg_xml.findall(f"{{{TNS}}}Value"): key, val = elem.get("Name"), elem.text if key: msg_parts[key] = val if msg_parts: text += f" ({', '.join(f'{k}: {v}' for k, v in msg_parts.items())})" # If this is an ErrorInternalServerError error, the xml may contain a more specific error code inner_code, inner_text = None, None for value_elem in msg_xml.findall(f"{{{TNS}}}Value"): name = value_elem.get("Name") if name == "InnerErrorResponseCode": inner_code = value_elem.text elif name == "InnerErrorMessageText": inner_text = value_elem.text if inner_code: try: # Raise the error as the inner error code return vars(errors)[inner_code](f"{inner_text} (raised from: {code}({text!r}))") except KeyError: # Inner code is unknown to us. Just append to the original text text += f" (inner error: {inner_code}({inner_text!r}))" if rule_errors is not None: for rule_error in rule_errors.findall(f"{{{TNS}}}RuleOperationError"): for error in rule_error.find(f"{{{TNS}}}ValidationErrors").findall(f"{{{TNS}}}Error"): field_uri = get_xml_attr(error, f"{{{TNS}}}FieldURI") error_code = get_xml_attr(error, f"{{{TNS}}}ErrorCode") error_message = get_xml_attr(error, f"{{{TNS}}}ErrorMessage") text += f" ({error_code} on field {field_uri}: {error_message})" try: # Raise the error corresponding to the ResponseCode return vars(errors)[code](text) except KeyError: # Should not happen return TransportError( f"Unknown ResponseCode in ResponseMessage: {code} (MessageText: {text}, MessageXml: {msg_xml})" ) def _get_elements_in_response(self, response): """Take a list of 'SomeServiceResponseMessage' elements and return the elements in each response message that we want the service to return. With e.g. 'CreateItem', we get a list of 'CreateItemResponseMessage' elements and return the 'Message' elements. <m:CreateItemResponseMessage ResponseClass="Success"> <m:ResponseCode>NoError</m:ResponseCode> <m:Items> <t:Message> <t:ItemId Id="AQApA=" ChangeKey="AQAAAB"/> </t:Message> </m:Items> </m:CreateItemResponseMessage> <m:CreateItemResponseMessage ResponseClass="Success"> <m:ResponseCode>NoError</m:ResponseCode> <m:Items> <t:Message> <t:ItemId Id="AQApB=" ChangeKey="AQAAAC"/> </t:Message> </m:Items> </m:CreateItemResponseMessage> :param response: a list of 'SomeServiceResponseMessage' XML objects :return: a generator of items as returned by '_get_elements_in_container() """ for msg in response: container_or_exc = self._get_element_container(message=msg, name=self.element_container_name) if isinstance(container_or_exc, (bool, Exception)): yield container_or_exc else: for c in self._get_elements_in_container(container=container_or_exc): yield c @classmethod def _get_elements_in_container(cls, container): """Return a list of response elements from an XML response element container. With e.g. 'CreateItem', 'Items' is the container element, and we return the 'Message' child elements: <m:Items> <t:Message> <t:ItemId Id="AQApA=" ChangeKey="AQAAAB"/> </t:Message> </m:Items> If the service does not return response elements, return True to indicate the status. Errors have already been raised. """ if cls.returns_elements: return list(container) return [True]
Base class for all EWS services.
Ancestors
Subclasses
- EWSAccountService
- ConvertId
- ExpandDL
- GetMailTips
- GetRoomLists
- GetRooms
- GetSearchableMailboxes
- GetServerTimeZones
- GetUserAvailability
- GetUserSettings
- ResolveNames
- SendNotification
Class variables
var CHUNK_SIZE
var ERRORS_TO_CATCH_IN_RESPONSE
var NO_VALID_SERVER_VERSIONS
-
Global error type within this module.
var NS_MAP
var SERVICE_NAME
var WARNINGS_TO_CATCH_IN_RESPONSE
-
Global error type within this module.
var WARNINGS_TO_IGNORE_IN_RESPONSE
var element_container_name
var returns_elements
Static methods
def supported_api_versions()
-
Return API versions supported by the service, sorted from newest to oldest
Methods
def get(self, expect_result=True, **kwargs)
-
Expand source code
def get(self, expect_result=True, **kwargs): """Like .call(), but expects exactly one result from the server, or zero when expect_result=False, or either zero or one when expect_result=None. Returns either one object or None. :param expect_result: None, True, or False :param kwargs: Same as arguments for .call() :return: Same as .call(), but returns either None or exactly one item """ res = list(self.call(**kwargs)) # Raise any errors for r in res: if isinstance(r, Exception): raise r if expect_result is None and not res: # Allow empty result return None if expect_result is False: if res: raise ValueError(f"Expected result length 0, but got {res}") return None if len(res) != 1: raise ValueError(f"Expected result length 1, but got {res}") return res[0]
Like .call(), but expects exactly one result from the server, or zero when expect_result=False, or either zero or one when expect_result=None. Returns either one object or None.
:param expect_result: None, True, or False :param kwargs: Same as arguments for .call() :return: Same as .call(), but returns either None or exactly one item
def parse(self, xml)
-
Expand source code
def parse(self, xml): """Used mostly for testing, when we want to parse static XML data.""" resp = DummyResponse(content=xml, streaming=self.streaming) _, body = self._get_soap_parts(response=resp) return self._elems_to_objs(self._get_elements_in_response(response=self._get_soap_messages(body=body)))
Used mostly for testing, when we want to parse static XML data.
def stop_streaming(self)
-
Expand source code
def stop_streaming(self): if not self.streaming: raise RuntimeError("Attempt to stop a non-streaming service") if self._streaming_response: self._streaming_response.close() # Release memory self._streaming_response = None if self._streaming_session: self.protocol.release_session(self._streaming_session) self._streaming_session = None
def wrap(self, content, api_version=None)
-
Expand source code
def wrap(self, content, api_version=None): """Generate the necessary boilerplate XML for a raw SOAP request. The XML is specific to the server version. ExchangeImpersonation allows to act as the user we want to impersonate. RequestServerVersion element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/requestserverversion ExchangeImpersonation element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/exchangeimpersonation TimeZoneContent element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/timezonecontext :param content: :param api_version: """ envelope = create_element("s:Envelope", nsmap=self.NS_MAP) header = create_element("s:Header") if api_version: request_server_version = create_element("t:RequestServerVersion", attrs=dict(Version=api_version)) header.append(request_server_version) identity = self._account_to_impersonate if identity: add_xml_child(header, "t:ExchangeImpersonation", identity) timezone = self._timezone if timezone: timezone_context = create_element("t:TimeZoneContext") timezone_definition = create_element("t:TimeZoneDefinition", attrs=dict(Id=timezone.ms_id)) timezone_context.append(timezone_definition) header.append(timezone_context) if len(header): envelope.append(header) body = create_element("s:Body") body.append(content) envelope.append(body) return xml_to_str(envelope, encoding=DEFAULT_ENCODING, xml_declaration=True)
Generate the necessary boilerplate XML for a raw SOAP request. The XML is specific to the server version. ExchangeImpersonation allows to act as the user we want to impersonate.
RequestServerVersion element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/requestserverversion
ExchangeImpersonation element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/exchangeimpersonation
TimeZoneContent element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/timezonecontext
:param content: :param api_version: