Module exchangelib.services
Implement a selection of EWS services (operations).
Exchange is very picky about things like the order of XML elements in SOAP requests, so we need to generate XML automatically instead of taking advantage of Python SOAP libraries and the WSDL file.
Exchange EWS operations overview: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/ews-operations-in-exchange
Sub-modules
exchangelib.services.archive_item
exchangelib.services.common
exchangelib.services.convert_id
exchangelib.services.copy_item
exchangelib.services.create_attachment
exchangelib.services.create_folder
exchangelib.services.create_item
exchangelib.services.create_user_configuration
exchangelib.services.delete_attachment
exchangelib.services.delete_folder
exchangelib.services.delete_item
exchangelib.services.delete_user_configuration
exchangelib.services.empty_folder
exchangelib.services.expand_dl
exchangelib.services.export_items
exchangelib.services.find_folder
exchangelib.services.find_item
exchangelib.services.find_people
exchangelib.services.get_attachment
exchangelib.services.get_delegate
exchangelib.services.get_events
exchangelib.services.get_folder
exchangelib.services.get_item
exchangelib.services.get_mail_tips
exchangelib.services.get_persona
exchangelib.services.get_room_lists
exchangelib.services.get_rooms
exchangelib.services.get_searchable_mailboxes
exchangelib.services.get_server_time_zones
exchangelib.services.get_streaming_events
exchangelib.services.get_user_availability
exchangelib.services.get_user_configuration
exchangelib.services.get_user_oof_settings
exchangelib.services.get_user_settings
exchangelib.services.inbox_rules
exchangelib.services.mark_as_junk
exchangelib.services.move_folder
exchangelib.services.move_item
exchangelib.services.resolve_names
exchangelib.services.send_item
exchangelib.services.send_notification
exchangelib.services.set_user_oof_settings
exchangelib.services.subscribe
-
The 'Subscribe' service has three different modes - pull, push and streaming - with different signatures. Implement as three distinct classes.
exchangelib.services.sync_folder_hierarchy
exchangelib.services.sync_folder_items
exchangelib.services.unsubscribe
exchangelib.services.update_folder
exchangelib.services.update_item
exchangelib.services.update_user_configuration
exchangelib.services.upload_items
Classes
class ArchiveItem (*args, **kwargs)
-
Expand source code
class ArchiveItem(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/archiveitem-operation""" SERVICE_NAME = "ArchiveItem" element_container_name = f"{{{MNS}}}Items" supported_from = EXCHANGE_2013 def call(self, items, to_folder): """Move a list of items to a specific folder in the archive mailbox. :param items: a list of (id, changekey) tuples or Item objects :param to_folder: :return: None """ return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, to_folder=to_folder)) def _elem_to_obj(self, elem): return Item.id_from_xml(elem) def get_payload(self, items, to_folder): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append( folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ArchiveSourceFolderId") ) payload.append(item_ids_element(items=items, version=self.account.version)) return payload
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
var supported_from
Methods
def call(self, items, to_folder)
-
Expand source code
def call(self, items, to_folder): """Move a list of items to a specific folder in the archive mailbox. :param items: a list of (id, changekey) tuples or Item objects :param to_folder: :return: None """ return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, to_folder=to_folder))
Move a list of items to a specific folder in the archive mailbox.
:param items: a list of (id, changekey) tuples or Item objects :param to_folder:
:return: None
def get_payload(self, items, to_folder)
-
Expand source code
def get_payload(self, items, to_folder): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append( folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ArchiveSourceFolderId") ) payload.append(item_ids_element(items=items, version=self.account.version)) return payload
Inherited members
class ConvertId (protocol, chunk_size=None, timeout=None)
-
Expand source code
class ConvertId(EWSService): """Take a list of IDs to convert. Returns a list of converted IDs or exception instances, in the same order as the input list. MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/convertid-operation """ SERVICE_NAME = "ConvertId" cls_map = {cls.response_tag(): cls for cls in (AlternateId, AlternatePublicFolderId, AlternatePublicFolderItemId)} def call(self, items, destination_format): if destination_format not in ID_FORMATS: raise InvalidEnumValue("destination_format", destination_format, ID_FORMATS) return self._elems_to_objs( self._chunked_get_elements(self.get_payload, items=items, destination_format=destination_format) ) def _elem_to_obj(self, elem): return self.cls_map[elem.tag].from_xml(elem, account=None) def get_payload(self, items, destination_format): supported_item_classes = AlternateId, AlternatePublicFolderId, AlternatePublicFolderItemId payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(DestinationFormat=destination_format)) item_ids = create_element("m:SourceIds") for item in items: if not isinstance(item, supported_item_classes): raise InvalidTypeError("item", item, supported_item_classes) set_xml_value(item_ids, item, version=None) payload.append(item_ids) return payload @classmethod def _get_elements_in_container(cls, container): # We may have other elements in here, e.g. 'ResponseCode'. Filter away those. return ( container.findall(AlternateId.response_tag()) + container.findall(AlternatePublicFolderId.response_tag()) + container.findall(AlternatePublicFolderItemId.response_tag()) )
Take a list of IDs to convert. Returns a list of converted IDs or exception instances, in the same order as the input list.
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/convertid-operation
Ancestors
Class variables
var SERVICE_NAME
var cls_map
Methods
def call(self, items, destination_format)
-
Expand source code
def call(self, items, destination_format): if destination_format not in ID_FORMATS: raise InvalidEnumValue("destination_format", destination_format, ID_FORMATS) return self._elems_to_objs( self._chunked_get_elements(self.get_payload, items=items, destination_format=destination_format) )
def get_payload(self, items, destination_format)
-
Expand source code
def get_payload(self, items, destination_format): supported_item_classes = AlternateId, AlternatePublicFolderId, AlternatePublicFolderItemId payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(DestinationFormat=destination_format)) item_ids = create_element("m:SourceIds") for item in items: if not isinstance(item, supported_item_classes): raise InvalidTypeError("item", item, supported_item_classes) set_xml_value(item_ids, item, version=None) payload.append(item_ids) return payload
Inherited members
class CopyItem (*args, **kwargs)
-
Expand source code
class CopyItem(move_item.MoveItem): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/copyitem-operation""" SERVICE_NAME = "CopyItem"
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/copyitem-operation
Ancestors
Class variables
var SERVICE_NAME
Inherited members
class CreateAttachment (*args, **kwargs)
-
Expand source code
class CreateAttachment(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createattachment-operation """ SERVICE_NAME = "CreateAttachment" element_container_name = f"{{{MNS}}}Attachments" cls_map = {cls.response_tag(): cls for cls in (FileAttachment, ItemAttachment)} def call(self, parent_item, items): return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, parent_item=parent_item)) def _elem_to_obj(self, elem): return self.cls_map[elem.tag].from_xml(elem=elem, account=self.account) def get_payload(self, items, parent_item): payload = create_element(f"m:{self.SERVICE_NAME}") version = self.account.version if isinstance(parent_item, BaseItem): # to_item_id() would convert this to a normal ItemId, but the service wants a ParentItemId parent_item = ParentItemId(parent_item.id, parent_item.changekey) set_xml_value(payload, to_item_id(parent_item, ParentItemId), version=self.account.version) attachments = create_element("m:Attachments") for item in items: set_xml_value(attachments, item, version=version) payload.append(attachments) return payload
Ancestors
Class variables
var SERVICE_NAME
var cls_map
var element_container_name
Methods
def call(self, parent_item, items)
-
Expand source code
def call(self, parent_item, items): return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, parent_item=parent_item))
def get_payload(self, items, parent_item)
-
Expand source code
def get_payload(self, items, parent_item): payload = create_element(f"m:{self.SERVICE_NAME}") version = self.account.version if isinstance(parent_item, BaseItem): # to_item_id() would convert this to a normal ItemId, but the service wants a ParentItemId parent_item = ParentItemId(parent_item.id, parent_item.changekey) set_xml_value(payload, to_item_id(parent_item, ParentItemId), version=self.account.version) attachments = create_element("m:Attachments") for item in items: set_xml_value(attachments, item, version=version) payload.append(attachments) return payload
Inherited members
class CreateFolder (*args, **kwargs)
-
Expand source code
class CreateFolder(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createfolder-operation""" SERVICE_NAME = "CreateFolder" element_container_name = f"{{{MNS}}}Folders" ERRORS_TO_CATCH_IN_RESPONSE = EWSAccountService.ERRORS_TO_CATCH_IN_RESPONSE + (ErrorFolderExists,) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.folders = [] # A hack to communicate parsing args to _elems_to_objs() def call(self, parent_folder, folders): # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same # class as the folder instance it was requested with. self.folders = list(folders) # Convert to a list, in case 'folders' is a generator. We're iterating twice. return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=self.folders, parent_folder=parent_folder, ) ) def _elems_to_objs(self, elems): for folder, elem in zip(self.folders, elems): if isinstance(elem, Exception): yield elem continue yield parse_folder_elem(elem=elem, folder=folder) def get_payload(self, folders, parent_folder): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append( folder_ids_element(folders=[parent_folder], version=self.account.version, tag="m:ParentFolderId") ) folder_elems = create_element("m:Folders") for folder in folders: set_xml_value(folder_elems, folder, version=self.account.version) payload.append(folder_elems) return payload
Ancestors
Class variables
var ERRORS_TO_CATCH_IN_RESPONSE
var SERVICE_NAME
var element_container_name
Methods
def call(self, parent_folder, folders)
-
Expand source code
def call(self, parent_folder, folders): # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same # class as the folder instance it was requested with. self.folders = list(folders) # Convert to a list, in case 'folders' is a generator. We're iterating twice. return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=self.folders, parent_folder=parent_folder, ) )
def get_payload(self, folders, parent_folder)
-
Expand source code
def get_payload(self, folders, parent_folder): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append( folder_ids_element(folders=[parent_folder], version=self.account.version, tag="m:ParentFolderId") ) folder_elems = create_element("m:Folders") for folder in folders: set_xml_value(folder_elems, folder, version=self.account.version) payload.append(folder_elems) return payload
Inherited members
class CreateInboxRule (*args, **kwargs)
-
Expand source code
class CreateInboxRule(UpdateInboxRules): """ MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/updateinboxrules-operation#updateinboxrules-create-rule-request-example """ def _get_operation(self, rule): return Operations(create_rule_operation=CreateRuleOperation(rule=rule))
Ancestors
Inherited members
class CreateItem (*args, **kwargs)
-
Expand source code
class CreateItem(EWSAccountService): """Take a folder and a list of items. Return the result of creation as a list of tuples (success[True|False], errormessage), in the same order as the input list. MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-operation """ SERVICE_NAME = "CreateItem" element_container_name = f"{{{MNS}}}Items" def call(self, items, folder, message_disposition, send_meeting_invitations): if message_disposition not in MESSAGE_DISPOSITION_CHOICES: raise InvalidEnumValue("message_disposition", message_disposition, MESSAGE_DISPOSITION_CHOICES) if send_meeting_invitations not in SEND_MEETING_INVITATIONS_CHOICES: raise InvalidEnumValue( "send_meeting_invitations", send_meeting_invitations, SEND_MEETING_INVITATIONS_CHOICES ) if folder is not None: if not isinstance(folder, (BaseFolder, FolderId)): raise InvalidTypeError("folder", folder, (BaseFolder, FolderId)) if folder.account != self.account: raise ValueError("Folder must belong to account") if message_disposition == SAVE_ONLY and folder is None: raise AttributeError("Folder must be supplied when in save-only mode") if message_disposition == SEND_AND_SAVE_COPY and folder is None: folder = self.account.sent # 'Sent' is default EWS behaviour if message_disposition == SEND_ONLY and folder is not None: raise AttributeError("Folder must be None in send-ony mode") return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=items, folder=folder, message_disposition=message_disposition, send_meeting_invitations=send_meeting_invitations, ) ) def _elem_to_obj(self, elem): if isinstance(elem, bool): return elem return BulkCreateResult.from_xml(elem=elem, account=self.account) @classmethod def _get_elements_in_container(cls, container): res = super()._get_elements_in_container(container) return res or [True] def get_payload(self, items, folder, message_disposition, send_meeting_invitations): """Take a list of Item objects (CalendarItem, Message etc.) and return the XML for a CreateItem request. convert items to XML Elements. MessageDisposition is only applicable to email messages, where it is required. SendMeetingInvitations is required for calendar items. It is also applicable to tasks, meeting request responses (see https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-operation-meeting-request ) and sharing invitation accepts (see https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-acceptsharinginvitation ). The last two are not supported yet. :param items: :param folder: :param message_disposition: :param send_meeting_invitations: """ payload = create_element( f"m:{self.SERVICE_NAME}", attrs=dict(MessageDisposition=message_disposition, SendMeetingInvitations=send_meeting_invitations), ) if folder: payload.append( folder_ids_element(folders=[folder], version=self.account.version, tag="m:SavedItemFolderId") ) item_elems = create_element("m:Items") for item in items: if not item.account: item.account = self.account set_xml_value(item_elems, item, version=self.account.version) payload.append(item_elems) return payload
Take a folder and a list of items. Return the result of creation as a list of tuples (success[True|False], errormessage), in the same order as the input list.
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
Methods
def call(self, items, folder, message_disposition, send_meeting_invitations)
-
Expand source code
def call(self, items, folder, message_disposition, send_meeting_invitations): if message_disposition not in MESSAGE_DISPOSITION_CHOICES: raise InvalidEnumValue("message_disposition", message_disposition, MESSAGE_DISPOSITION_CHOICES) if send_meeting_invitations not in SEND_MEETING_INVITATIONS_CHOICES: raise InvalidEnumValue( "send_meeting_invitations", send_meeting_invitations, SEND_MEETING_INVITATIONS_CHOICES ) if folder is not None: if not isinstance(folder, (BaseFolder, FolderId)): raise InvalidTypeError("folder", folder, (BaseFolder, FolderId)) if folder.account != self.account: raise ValueError("Folder must belong to account") if message_disposition == SAVE_ONLY and folder is None: raise AttributeError("Folder must be supplied when in save-only mode") if message_disposition == SEND_AND_SAVE_COPY and folder is None: folder = self.account.sent # 'Sent' is default EWS behaviour if message_disposition == SEND_ONLY and folder is not None: raise AttributeError("Folder must be None in send-ony mode") return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=items, folder=folder, message_disposition=message_disposition, send_meeting_invitations=send_meeting_invitations, ) )
def get_payload(self, items, folder, message_disposition, send_meeting_invitations)
-
Expand source code
def get_payload(self, items, folder, message_disposition, send_meeting_invitations): """Take a list of Item objects (CalendarItem, Message etc.) and return the XML for a CreateItem request. convert items to XML Elements. MessageDisposition is only applicable to email messages, where it is required. SendMeetingInvitations is required for calendar items. It is also applicable to tasks, meeting request responses (see https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-operation-meeting-request ) and sharing invitation accepts (see https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-acceptsharinginvitation ). The last two are not supported yet. :param items: :param folder: :param message_disposition: :param send_meeting_invitations: """ payload = create_element( f"m:{self.SERVICE_NAME}", attrs=dict(MessageDisposition=message_disposition, SendMeetingInvitations=send_meeting_invitations), ) if folder: payload.append( folder_ids_element(folders=[folder], version=self.account.version, tag="m:SavedItemFolderId") ) item_elems = create_element("m:Items") for item in items: if not item.account: item.account = self.account set_xml_value(item_elems, item, version=self.account.version) payload.append(item_elems) return payload
Take a list of Item objects (CalendarItem, Message etc.) and return the XML for a CreateItem request. convert items to XML Elements.
MessageDisposition is only applicable to email messages, where it is required.
SendMeetingInvitations is required for calendar items. It is also applicable to tasks, meeting request responses (see https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-operation-meeting-request ) and sharing invitation accepts (see https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-acceptsharinginvitation ). The last two are not supported yet.
:param items: :param folder: :param message_disposition: :param send_meeting_invitations:
Inherited members
class CreateUserConfiguration (*args, **kwargs)
-
Expand source code
class CreateUserConfiguration(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createuserconfiguration-operation """ SERVICE_NAME = "CreateUserConfiguration" returns_elements = False def call(self, user_configuration): return self._get_elements(payload=self.get_payload(user_configuration=user_configuration)) def get_payload(self, user_configuration): return set_xml_value( create_element(f"m:{self.SERVICE_NAME}"), user_configuration, version=self.protocol.version )
Ancestors
Class variables
var SERVICE_NAME
var returns_elements
Methods
def call(self, user_configuration)
-
Expand source code
def call(self, user_configuration): return self._get_elements(payload=self.get_payload(user_configuration=user_configuration))
def get_payload(self, user_configuration)
-
Expand source code
def get_payload(self, user_configuration): return set_xml_value( create_element(f"m:{self.SERVICE_NAME}"), user_configuration, version=self.protocol.version )
Inherited members
class DeleteAttachment (*args, **kwargs)
-
Expand source code
class DeleteAttachment(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/deleteattachment-operation """ SERVICE_NAME = "DeleteAttachment" def call(self, items): return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items)) def _elem_to_obj(self, elem): return RootItemId.from_xml(elem=elem, account=self.account) @classmethod def _get_elements_in_container(cls, container): return container.findall(RootItemId.response_tag()) def get_payload(self, items): return set_xml_value( create_element(f"m:{self.SERVICE_NAME}"), attachment_ids_element(items=items, version=self.account.version), version=self.account.version, )
Ancestors
Class variables
var SERVICE_NAME
Methods
def call(self, items)
-
Expand source code
def call(self, items): return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items))
def get_payload(self, items)
-
Expand source code
def get_payload(self, items): return set_xml_value( create_element(f"m:{self.SERVICE_NAME}"), attachment_ids_element(items=items, version=self.account.version), version=self.account.version, )
Inherited members
class DeleteFolder (*args, **kwargs)
-
Expand source code
class DeleteFolder(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/deletefolder-operation""" SERVICE_NAME = "DeleteFolder" returns_elements = False def call(self, folders, delete_type): if delete_type not in DELETE_TYPE_CHOICES: raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES) return self._chunked_get_elements(self.get_payload, items=folders, delete_type=delete_type) def get_payload(self, folders, delete_type): payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(DeleteType=delete_type)) payload.append(folder_ids_element(folders=folders, version=self.account.version)) return payload
Ancestors
Class variables
var SERVICE_NAME
var returns_elements
Methods
def call(self, folders, delete_type)
-
Expand source code
def call(self, folders, delete_type): if delete_type not in DELETE_TYPE_CHOICES: raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES) return self._chunked_get_elements(self.get_payload, items=folders, delete_type=delete_type)
def get_payload(self, folders, delete_type)
-
Expand source code
def get_payload(self, folders, delete_type): payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(DeleteType=delete_type)) payload.append(folder_ids_element(folders=folders, version=self.account.version)) return payload
Inherited members
class DeleteInboxRule (*args, **kwargs)
-
Expand source code
class DeleteInboxRule(UpdateInboxRules): """ MSDN: https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/updateinboxrules-operation#updateinboxrules-delete-rule-request-example """ def _get_operation(self, rule): return Operations(delete_rule_operation=DeleteRuleOperation(id=rule.id))
Ancestors
Inherited members
class DeleteItem (*args, **kwargs)
-
Expand source code
class DeleteItem(EWSAccountService): """Take a folder and a list of (id, changekey) tuples. Return result of deletion as a list of tuples (success[True|False], errormessage), in the same order as the input list. MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/deleteitem-operation """ SERVICE_NAME = "DeleteItem" returns_elements = False def call(self, items, delete_type, send_meeting_cancellations, affected_task_occurrences, suppress_read_receipts): if delete_type not in DELETE_TYPE_CHOICES: raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES) if send_meeting_cancellations not in SEND_MEETING_CANCELLATIONS_CHOICES: raise InvalidEnumValue( "send_meeting_cancellations", send_meeting_cancellations, SEND_MEETING_CANCELLATIONS_CHOICES ) if affected_task_occurrences not in AFFECTED_TASK_OCCURRENCES_CHOICES: raise InvalidEnumValue( "affected_task_occurrences", affected_task_occurrences, AFFECTED_TASK_OCCURRENCES_CHOICES ) return self._chunked_get_elements( self.get_payload, items=items, delete_type=delete_type, send_meeting_cancellations=send_meeting_cancellations, affected_task_occurrences=affected_task_occurrences, suppress_read_receipts=suppress_read_receipts, ) def get_payload( self, items, delete_type, send_meeting_cancellations, affected_task_occurrences, suppress_read_receipts ): # Takes a list of (id, changekey) tuples or Item objects and returns the XML for a DeleteItem request. attrs = dict( DeleteType=delete_type, SendMeetingCancellations=send_meeting_cancellations, AffectedTaskOccurrences=affected_task_occurrences, ) if self.account.version.build >= EXCHANGE_2013_SP1: attrs["SuppressReadReceipts"] = suppress_read_receipts payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs) payload.append(item_ids_element(items=items, version=self.account.version)) return payload
Take a folder and a list of (id, changekey) tuples. Return result of deletion as a list of tuples (success[True|False], errormessage), in the same order as the input list.
Ancestors
Class variables
var SERVICE_NAME
var returns_elements
Methods
def call(self,
items,
delete_type,
send_meeting_cancellations,
affected_task_occurrences,
suppress_read_receipts)-
Expand source code
def call(self, items, delete_type, send_meeting_cancellations, affected_task_occurrences, suppress_read_receipts): if delete_type not in DELETE_TYPE_CHOICES: raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES) if send_meeting_cancellations not in SEND_MEETING_CANCELLATIONS_CHOICES: raise InvalidEnumValue( "send_meeting_cancellations", send_meeting_cancellations, SEND_MEETING_CANCELLATIONS_CHOICES ) if affected_task_occurrences not in AFFECTED_TASK_OCCURRENCES_CHOICES: raise InvalidEnumValue( "affected_task_occurrences", affected_task_occurrences, AFFECTED_TASK_OCCURRENCES_CHOICES ) return self._chunked_get_elements( self.get_payload, items=items, delete_type=delete_type, send_meeting_cancellations=send_meeting_cancellations, affected_task_occurrences=affected_task_occurrences, suppress_read_receipts=suppress_read_receipts, )
def get_payload(self,
items,
delete_type,
send_meeting_cancellations,
affected_task_occurrences,
suppress_read_receipts)-
Expand source code
def get_payload( self, items, delete_type, send_meeting_cancellations, affected_task_occurrences, suppress_read_receipts ): # Takes a list of (id, changekey) tuples or Item objects and returns the XML for a DeleteItem request. attrs = dict( DeleteType=delete_type, SendMeetingCancellations=send_meeting_cancellations, AffectedTaskOccurrences=affected_task_occurrences, ) if self.account.version.build >= EXCHANGE_2013_SP1: attrs["SuppressReadReceipts"] = suppress_read_receipts payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs) payload.append(item_ids_element(items=items, version=self.account.version)) return payload
Inherited members
class DeleteUserConfiguration (*args, **kwargs)
-
Expand source code
class DeleteUserConfiguration(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/deleteuserconfiguration-operation """ SERVICE_NAME = "DeleteUserConfiguration" returns_elements = False def call(self, user_configuration_name): return self._get_elements(payload=self.get_payload(user_configuration_name=user_configuration_name)) def get_payload(self, user_configuration_name): return set_xml_value( create_element(f"m:{self.SERVICE_NAME}"), user_configuration_name, version=self.account.version )
Ancestors
Class variables
var SERVICE_NAME
var returns_elements
Methods
def call(self, user_configuration_name)
-
Expand source code
def call(self, user_configuration_name): return self._get_elements(payload=self.get_payload(user_configuration_name=user_configuration_name))
def get_payload(self, user_configuration_name)
-
Expand source code
def get_payload(self, user_configuration_name): return set_xml_value( create_element(f"m:{self.SERVICE_NAME}"), user_configuration_name, version=self.account.version )
Inherited members
class EWSService (protocol, chunk_size=None, timeout=None)
-
Expand source code
class EWSService(SupportedVersionClassMixIn, metaclass=abc.ABCMeta): """Base class for all EWS services.""" CHUNK_SIZE = 100 # A default chunk size for all services. This is the number of items we send in a single request SERVICE_NAME = None # The name of the SOAP service element_container_name = None # The name of the XML element wrapping the collection of returned items returns_elements = True # If False, the service does not return response elements, just the ResponseCode status # Return exception instance instead of raising exceptions for the following errors when contained in an element ERRORS_TO_CATCH_IN_RESPONSE = ( EWSWarning, ErrorCannotDeleteObject, ErrorInvalidChangeKey, ErrorItemNotFound, ErrorItemSave, ErrorInvalidIdMalformed, ErrorMessageSizeExceeded, ErrorCannotDeleteTaskOccurrence, ErrorMimeContentConversionFailed, ErrorRecurrenceHasNoOccurrence, ErrorCorruptData, ErrorItemCorrupt, ErrorMailRecipientNotFound, ) # Similarly, define the warnings we want to return un-raised WARNINGS_TO_CATCH_IN_RESPONSE = ErrorBatchProcessingStopped # Define the warnings we want to ignore, to let response processing proceed WARNINGS_TO_IGNORE_IN_RESPONSE = () # The exception type to raise when all attempted API versions failed NO_VALID_SERVER_VERSIONS = ErrorInvalidServerVersion NS_MAP = {k: v for k, v in ns_translation.items() if k in ("s", "m", "t")} def __init__(self, protocol, chunk_size=None, timeout=None): self.chunk_size = chunk_size or self.CHUNK_SIZE if not isinstance(self.chunk_size, int): raise InvalidTypeError("chunk_size", chunk_size, int) if self.chunk_size < 1: raise ValueError(f"'chunk_size' {self.chunk_size} must be a positive number") if self.supported_from and protocol.version.build < self.supported_from: raise NotImplementedError( f"Service {self.SERVICE_NAME!r} only supports server versions from {self.supported_from or '*'} to " f"{self.deprecated_from or '*'} (server has {protocol.version})" ) self.protocol = protocol # Allow a service to override the default protocol timeout. Useful for streaming services self.timeout = timeout # Controls whether the HTTP request should be streaming or fetch everything at once self.streaming = False # Streaming connection variables self._streaming_session = None self._streaming_response = None def __del__(self): # pylint: disable=bare-except try: if self.streaming: # Make sure to clean up lingering resources self.stop_streaming() except Exception: # nosec # __del__ should never fail pass # The following two methods are the minimum required to be implemented by subclasses, but the name and number of # kwargs differs between services. Therefore, we cannot make these methods abstract. # @abc.abstractmethod # def call(self, **kwargs): # """Defines the arguments required by the service. Arguments are basic Python types or EWSElement objects. # Returns either XML objects or EWSElement objects. # """" # pass # @abc.abstractmethod # def get_payload(self, **kwargs): # """Using the arguments from .call(), return the payload expected by the service, as an XML object. The XML # object should consist of a SERVICE_NAME element and everything within that. # """ # pass def get(self, expect_result=True, **kwargs): """Like .call(), but expects exactly one result from the server, or zero when expect_result=False, or either zero or one when expect_result=None. Returns either one object or None. :param expect_result: None, True, or False :param kwargs: Same as arguments for .call() :return: Same as .call(), but returns either None or exactly one item """ res = list(self.call(**kwargs)) # Raise any errors for r in res: if isinstance(r, Exception): raise r if expect_result is None and not res: # Allow empty result return None if expect_result is False: if res: raise ValueError(f"Expected result length 0, but got {res}") return None if len(res) != 1: raise ValueError(f"Expected result length 1, but got {res}") return res[0] def parse(self, xml): """Used mostly for testing, when we want to parse static XML data.""" resp = DummyResponse(content=xml, streaming=self.streaming) _, body = self._get_soap_parts(response=resp) return self._elems_to_objs(self._get_elements_in_response(response=self._get_soap_messages(body=body))) def wrap(self, content, api_version=None): """Generate the necessary boilerplate XML for a raw SOAP request. The XML is specific to the server version. ExchangeImpersonation allows to act as the user we want to impersonate. RequestServerVersion element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/requestserverversion ExchangeImpersonation element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/exchangeimpersonation TimeZoneContent element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/timezonecontext :param content: :param api_version: """ envelope = create_element("s:Envelope", nsmap=self.NS_MAP) header = create_element("s:Header") if api_version: request_server_version = create_element("t:RequestServerVersion", attrs=dict(Version=api_version)) header.append(request_server_version) identity = self._account_to_impersonate if identity: add_xml_child(header, "t:ExchangeImpersonation", identity) timezone = self._timezone if timezone: timezone_context = create_element("t:TimeZoneContext") timezone_definition = create_element("t:TimeZoneDefinition", attrs=dict(Id=timezone.ms_id)) timezone_context.append(timezone_definition) header.append(timezone_context) if len(header): envelope.append(header) body = create_element("s:Body") body.append(content) envelope.append(body) return xml_to_str(envelope, encoding=DEFAULT_ENCODING, xml_declaration=True) def _elems_to_objs(self, elems): """Takes a generator of XML elements and exceptions. Returns the equivalent Python objects (or exceptions).""" for elem in elems: # Allow None here. Some services don't return an ID if the target folder is outside the mailbox. if isinstance(elem, (Exception, type(None))): yield elem continue yield self._elem_to_obj(elem) def _elem_to_obj(self, elem): """Convert a single XML element to a single Python object""" if not self.returns_elements: raise RuntimeError("Incorrect call to method when 'returns_elements' is False") raise NotImplementedError() @property def _version_hint(self): # We may be here due to version guessing in Protocol.version, so we can't use the self.protocol.version property return self.protocol.config.version @_version_hint.setter def _version_hint(self, value): self.protocol.config.version = value def _extra_headers(self): headers = {} identity = self._account_to_impersonate if identity and identity.primary_smtp_address: # See # https://blogs.msdn.microsoft.com/webdav_101/2015/05/11/best-practices-ews-authentication-and-access-issues/ headers["X-AnchorMailbox"] = identity.primary_smtp_address return headers @property def _account_to_impersonate(self): if self.protocol and isinstance(self.protocol.credentials, BaseOAuth2Credentials): return self.protocol.credentials.identity return None @property def _timezone(self): return None def _response_generator(self, payload): """Send the payload to the server, and return the response. :param payload: payload as an XML object :return: the response, as XML objects """ response = self._get_response_xml(payload=payload) return self._get_elements_in_response(response=response) def _chunked_get_elements(self, payload_func, items, **kwargs): """Yield elements in a response. Like ._get_elements(), but chop items into suitable chunks and send multiple requests. :param payload_func: A reference to .payload() :param items: An iterable of items (messages, folders, etc.) to process :param kwargs: Same as arguments for .call(), except for the 'items' argument :return: Same as ._get_elements() """ # If the input for a service is a QuerySet, it can be difficult to remove exceptions before now filtered_items = filter(lambda item: not isinstance(item, Exception), items) for i, chunk in enumerate(chunkify(filtered_items, self.chunk_size), start=1): log.debug("Processing chunk %s containing %s items", i, len(chunk)) yield from self._get_elements(payload=payload_func(chunk, **kwargs)) def stop_streaming(self): if not self.streaming: raise RuntimeError("Attempt to stop a non-streaming service") if self._streaming_response: self._streaming_response.close() # Release memory self._streaming_response = None if self._streaming_session: self.protocol.release_session(self._streaming_session) self._streaming_session = None def _get_elements(self, payload): """Send the payload to be sent and parsed. Handles and re-raise exceptions that are not meant to be returned to the caller as exception objects. Retry the request according to the retry policy. """ wait = self.protocol.RETRY_WAIT while True: try: # Create a generator over the response elements so exceptions in response elements are also raised # here and can be handled. yield from self._response_generator(payload=payload) # TODO: Restore session pool size on succeeding request? return except TokenExpiredError: # Retry immediately continue except ErrorServerBusy as e: if not e.back_off: e.back_off = wait self._handle_backoff(e) except ErrorExceededConnectionCount as e: # ErrorExceededConnectionCount indicates that the connecting user has too many open TCP connections to # the server. Decrease our session pool size and retry immediately. try: self.protocol.decrease_poolsize() continue except SessionPoolMinSizeReached: # We're already as low as we can go. Let the user handle this. raise e except ErrorTimeoutExpired as e: # ErrorTimeoutExpired can be caused by a busy server, or by an overly large request. If it's the latter, # we don't want to continue hammering the server with this request indefinitely. Instead, lower the # connection count, if possible, and retry the request. if self.protocol.session_pool_size <= 1: # We're already as low as we can go. We can no longer use the session count to put less load # on the server. If this is a chunked request we could lower the chunk size, but we don't have a # way of doing that from this part of the code yet. Let the user handle this. raise e self._handle_backoff(ErrorServerBusy(f"Reraised from {e.__class__.__name__}({e})", back_off=wait)) except (ErrorTooManyObjectsOpened, ErrorInternalServerTransientError) as e: # ErrorTooManyObjectsOpened means there are too many connections to the Exchange database. This is very # often a symptom of sending too many requests. self._handle_backoff(ErrorServerBusy(f"Reraised from {e.__class__.__name__}({e})", back_off=wait)) finally: wait *= 2 # Increase delay for every retry if self.streaming: self.stop_streaming() def _handle_response_cookies(self, session): """Code to react on response cookies""" def _get_response(self, payload, api_version): """Send the actual HTTP request and get the response.""" if self.streaming: # Make sure to clean up lingering resources self.stop_streaming() session = self.protocol.get_session() r, session = post_ratelimited( protocol=self.protocol, session=session, url=self.protocol.service_endpoint, headers=self._extra_headers(), data=self.wrap( content=payload, api_version=api_version, ), stream=self.streaming, timeout=self.timeout or self.protocol.TIMEOUT, ) self._handle_response_cookies(session) if self.streaming: # We con only release the session when we have fully consumed the response. Save session and response # objects for later. self._streaming_session, self._streaming_response = session, r else: self.protocol.release_session(session) return r @classmethod def supported_api_versions(cls): """Return API versions supported by the service, sorted from newest to oldest""" return sorted({v.api_version for v in Version.all_versions() if cls.supports_version(v)}, reverse=True) def _api_versions_to_try(self): # Put the hint first in the list, and then all other versions except the hint, from newest to oldest return (self._version_hint.api_version,) + tuple( v for v in self.supported_api_versions() if v != self._version_hint.api_version ) def _get_response_xml(self, payload): """Send the payload to the server and return relevant elements from the result. Several things happen here: * Wraps the payload is wrapped in SOAP headers and sends to the server * Negotiates the Exchange API version and stores it in the protocol object * Handles connection errors and possibly re-raises them as ErrorServerBusy * Raises SOAP errors * Raises EWS errors or passes them on to the caller :param payload: The request payload, as an XML object :return: A generator of XML objects or None if the service does not return a result """ # Microsoft really doesn't want to make our lives easy. The server may report one version in our initial version # guessing tango, but then the server may decide that any arbitrary legacy backend server may actually process # the request for an account. Prepare to handle version-related errors and set the server version per-account. log.debug("Calling service %s", self.SERVICE_NAME) for api_version in self._api_versions_to_try(): log.debug("Trying API version %s", api_version) r = self._get_response(payload=payload, api_version=api_version) if self.streaming: # Let 'requests' decode raw data automatically r.raw.decode_content = True try: header, body = self._get_soap_parts(response=r) except Exception: r.close() # Release memory raise # The body may contain error messages from Exchange, but we still want to collect version info if header is not None: self._update_api_version(api_version=api_version, header=header) try: return self._get_soap_messages(body=body) except ( ErrorInvalidServerVersion, ErrorIncorrectSchemaVersion, ErrorInvalidRequest, ErrorInvalidSchemaVersionForMailboxVersion, ): # The guessed server version is wrong. Try the next version log.debug("API version %s was invalid", api_version) continue finally: if not self.streaming: # In streaming mode, we may not have accessed the raw stream yet. Caller must handle this. r.close() # Release memory raise self.NO_VALID_SERVER_VERSIONS(f"Tried versions {self._api_versions_to_try()} but all were invalid") def _handle_backoff(self, e): """Take a request from the server to back off and checks the retry policy for what to do. Re-raise the exception if conditions are not met. :param e: An ErrorServerBusy instance :return: """ log.debug("Got ErrorServerBusy (back off %s seconds)", e.back_off) # ErrorServerBusy is very often a symptom of sending too many requests. Scale back connections if possible. with suppress(SessionPoolMinSizeReached): self.protocol.decrease_poolsize() if self.protocol.retry_policy.fail_fast: raise e self.protocol.retry_policy.back_off(e.back_off) # We'll warn about this later if we actually need to sleep def _update_api_version(self, api_version, header): """Parse the server version contained in SOAP headers and update the version hint stored by the caller, if necessary. """ try: head_version = Version.from_soap_header(requested_api_version=api_version, header=header) except TransportError as te: log.debug("Failed to update version info (%s)", te) return if self._version_hint == head_version: # Nothing to do return log.debug("Found new version (%s -> %s)", self._version_hint, head_version) # The api_version that worked was different from our hint, or we never got a build version. Store the working # version. self._version_hint = head_version @classmethod def _response_tag(cls): """Return the name of the element containing the service response.""" return f"{{{MNS}}}{cls.SERVICE_NAME}Response" @staticmethod def _response_messages_tag(): """Return the name of the element containing service response messages.""" return f"{{{MNS}}}ResponseMessages" @classmethod def _response_message_tag(cls): """Return the name of the element of a single response message.""" return f"{{{MNS}}}{cls.SERVICE_NAME}ResponseMessage" @classmethod def _get_soap_parts(cls, response): """Split the SOAP response into its headers and body elements.""" try: root = to_xml(response.iter_content()) except ParseError as e: raise SOAPError(f"Bad SOAP response: {e}") header = root.find(f"{{{SOAPNS}}}Header") if header is None: # This is normal when the response contains SOAP-level errors log.debug("No header in XML response") body = root.find(f"{{{SOAPNS}}}Body") if body is None: raise MalformedResponseError("No Body element in SOAP response") return header, body def _get_soap_messages(self, body): """Return the elements in the response containing the response messages. Raises any SOAP exceptions.""" response = body.find(self._response_tag()) if response is None: fault = body.find(f"{{{SOAPNS}}}Fault") if fault is None: raise SOAPError(f"Unknown SOAP response (expected {self._response_tag()} or Fault): {xml_to_str(body)}") self._raise_soap_errors(fault=fault) # Will throw SOAPError or custom EWS error response_messages = response.find(self._response_messages_tag()) if response_messages is None: # Result isn't delivered in a list of FooResponseMessages, but directly in the FooResponse. Consumers expect # a list, so return a list return [response] return response_messages.findall(self._response_message_tag()) @classmethod def _raise_soap_errors(cls, fault): """Parse error messages contained in SOAP headers and raise as exceptions defined in this package.""" # Fault: See http://www.w3.org/TR/2000/NOTE-SOAP-20000508/#_Toc478383507 fault_code = get_xml_attr(fault, "faultcode") fault_string = get_xml_attr(fault, "faultstring") fault_actor = get_xml_attr(fault, "faultactor") detail = fault.find("detail") if detail is not None: code = get_xml_attr(detail, f"{{{ENS}}}ResponseCode") if code: code = code.strip() msg = get_xml_attr(detail, f"{{{ENS}}}Message") if msg: msg = msg.strip() msg_xml = detail.find(f"{{{TNS}}}MessageXml") # Crazy. Here, it's in the TNS namespace if code == "ErrorServerBusy": back_off = None with suppress(TypeError, AttributeError): value = msg_xml.find(f"{{{TNS}}}Value") if value.get("Name") == "BackOffMilliseconds": back_off = int(value.text) / 1000.0 # Convert to seconds raise ErrorServerBusy(msg, back_off=back_off) if code == "ErrorSchemaValidation" and msg_xml is not None: line_number = get_xml_attr(msg_xml, f"{{{TNS}}}LineNumber") line_position = get_xml_attr(msg_xml, f"{{{TNS}}}LinePosition") violation = get_xml_attr(msg_xml, f"{{{TNS}}}Violation") if violation: msg = f"{msg} {violation}" if line_number or line_position: msg = f"{msg} (line: {line_number} position: {line_position})" try: raise vars(errors)[code](msg) except KeyError: detail = f"{cls.SERVICE_NAME}: code: {code} msg: {msg} ({xml_to_str(detail)})" with suppress(KeyError): raise vars(errors)[fault_code](fault_string) raise SOAPError(f"SOAP error code: {fault_code} string: {fault_string} actor: {fault_actor} detail: {detail}") def _get_element_container(self, message, name=None): """Return the XML element in a response element that contains the elements we want the service to return. For example, in a GetFolder response, 'message' is the GetFolderResponseMessage element, and we return the 'Folders' element: <m:GetFolderResponseMessage ResponseClass="Success"> <m:ResponseCode>NoError</m:ResponseCode> <m:Folders> <t:Folder> <t:FolderId Id="AQApA=" ChangeKey="AQAAAB" /> [...] </t:Folder> </m:Folders> </m:GetFolderResponseMessage> Some service responses don't have a containing element for the returned elements ('name' is None). In that case, we return the 'SomeServiceResponseMessage' element. If the response contains a warning or an error message, we raise the relevant exception, unless the error class is contained in WARNINGS_TO_CATCH_IN_RESPONSE or ERRORS_TO_CATCH_IN_RESPONSE, in which case we return the exception instance. """ # ResponseClass is an XML attribute of various SomeServiceResponseMessage elements: Possible values are: # Success, Warning, Error. See e.g. # https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/finditemresponsemessage response_class = message.get("ResponseClass") # ResponseCode, MessageText: See # https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/responsecode response_code = get_xml_attr(message, f"{{{MNS}}}ResponseCode") if response_class == "Success" and response_code == "NoError": if not name: return message container = message.find(name) if container is None: raise MalformedResponseError(f"No {name} elements in ResponseMessage ({xml_to_str(message)})") return container if response_code == "NoError": return True # Raise any non-acceptable errors in the container, or return the container or the acceptable exception instance msg_text = get_xml_attr(message, f"{{{MNS}}}MessageText") msg_xml = message.find(f"{{{MNS}}}MessageXml") rule_errors = message.find(f"{{{MNS}}}RuleOperationErrors") if response_class == "Warning": try: raise self._get_exception(code=response_code, text=msg_text, msg_xml=msg_xml) except self.WARNINGS_TO_CATCH_IN_RESPONSE as e: return e except self.WARNINGS_TO_IGNORE_IN_RESPONSE as e: log.warning(str(e)) container = message.find(name) if container is None: raise MalformedResponseError(f"No {name} elements in ResponseMessage ({xml_to_str(message)})") return container # response_class == 'Error', or 'Success' and not 'NoError' try: raise self._get_exception(code=response_code, text=msg_text, msg_xml=msg_xml, rule_errors=rule_errors) except self.ERRORS_TO_CATCH_IN_RESPONSE as e: return e @staticmethod def _get_exception(code, text, msg_xml=None, rule_errors=None): """Parse error messages contained in EWS responses and raise as exceptions defined in this package.""" if not code: return TransportError(f"Empty ResponseCode in ResponseMessage (MessageText: {text}, MessageXml: {msg_xml})") if msg_xml is not None: # If this is an ErrorInvalidPropertyRequest error, the xml may contain a specific FieldURI for elem_cls in (FieldURI, IndexedFieldURI, ExtendedFieldURI, ExceptionFieldURI): elem = msg_xml.find(elem_cls.response_tag()) if elem is not None: field_uri = elem_cls.from_xml(elem, account=None) text += f" (field: {field_uri})" break # If this is an ErrorInvalidValueForProperty error, the xml may contain the name and value of the property if code == "ErrorInvalidValueForProperty": msg_parts = {} for elem in msg_xml.findall(f"{{{TNS}}}Value"): key, val = elem.get("Name"), elem.text if key: msg_parts[key] = val if msg_parts: text += f" ({', '.join(f'{k}: {v}' for k, v in msg_parts.items())})" # If this is an ErrorInternalServerError error, the xml may contain a more specific error code inner_code, inner_text = None, None for value_elem in msg_xml.findall(f"{{{TNS}}}Value"): name = value_elem.get("Name") if name == "InnerErrorResponseCode": inner_code = value_elem.text elif name == "InnerErrorMessageText": inner_text = value_elem.text if inner_code: try: # Raise the error as the inner error code return vars(errors)[inner_code](f"{inner_text} (raised from: {code}({text!r}))") except KeyError: # Inner code is unknown to us. Just append to the original text text += f" (inner error: {inner_code}({inner_text!r}))" if rule_errors is not None: for rule_error in rule_errors.findall(f"{{{TNS}}}RuleOperationError"): for error in rule_error.find(f"{{{TNS}}}ValidationErrors").findall(f"{{{TNS}}}Error"): field_uri = get_xml_attr(error, f"{{{TNS}}}FieldURI") error_code = get_xml_attr(error, f"{{{TNS}}}ErrorCode") error_message = get_xml_attr(error, f"{{{TNS}}}ErrorMessage") text += f" ({error_code} on field {field_uri}: {error_message})" try: # Raise the error corresponding to the ResponseCode return vars(errors)[code](text) except KeyError: # Should not happen return TransportError( f"Unknown ResponseCode in ResponseMessage: {code} (MessageText: {text}, MessageXml: {msg_xml})" ) def _get_elements_in_response(self, response): """Take a list of 'SomeServiceResponseMessage' elements and return the elements in each response message that we want the service to return. With e.g. 'CreateItem', we get a list of 'CreateItemResponseMessage' elements and return the 'Message' elements. <m:CreateItemResponseMessage ResponseClass="Success"> <m:ResponseCode>NoError</m:ResponseCode> <m:Items> <t:Message> <t:ItemId Id="AQApA=" ChangeKey="AQAAAB"/> </t:Message> </m:Items> </m:CreateItemResponseMessage> <m:CreateItemResponseMessage ResponseClass="Success"> <m:ResponseCode>NoError</m:ResponseCode> <m:Items> <t:Message> <t:ItemId Id="AQApB=" ChangeKey="AQAAAC"/> </t:Message> </m:Items> </m:CreateItemResponseMessage> :param response: a list of 'SomeServiceResponseMessage' XML objects :return: a generator of items as returned by '_get_elements_in_container() """ for msg in response: container_or_exc = self._get_element_container(message=msg, name=self.element_container_name) if isinstance(container_or_exc, (bool, Exception)): yield container_or_exc else: for c in self._get_elements_in_container(container=container_or_exc): yield c @classmethod def _get_elements_in_container(cls, container): """Return a list of response elements from an XML response element container. With e.g. 'CreateItem', 'Items' is the container element, and we return the 'Message' child elements: <m:Items> <t:Message> <t:ItemId Id="AQApA=" ChangeKey="AQAAAB"/> </t:Message> </m:Items> If the service does not return response elements, return True to indicate the status. Errors have already been raised. """ if cls.returns_elements: return list(container) return [True]
Base class for all EWS services.
Ancestors
Subclasses
- EWSAccountService
- ConvertId
- ExpandDL
- GetMailTips
- GetRoomLists
- GetRooms
- GetSearchableMailboxes
- GetServerTimeZones
- GetUserAvailability
- GetUserSettings
- ResolveNames
- SendNotification
Class variables
var CHUNK_SIZE
var ERRORS_TO_CATCH_IN_RESPONSE
var NO_VALID_SERVER_VERSIONS
-
Global error type within this module.
var NS_MAP
var SERVICE_NAME
var WARNINGS_TO_CATCH_IN_RESPONSE
-
Global error type within this module.
var WARNINGS_TO_IGNORE_IN_RESPONSE
var element_container_name
var returns_elements
Static methods
def supported_api_versions()
-
Return API versions supported by the service, sorted from newest to oldest
Methods
def get(self, expect_result=True, **kwargs)
-
Expand source code
def get(self, expect_result=True, **kwargs): """Like .call(), but expects exactly one result from the server, or zero when expect_result=False, or either zero or one when expect_result=None. Returns either one object or None. :param expect_result: None, True, or False :param kwargs: Same as arguments for .call() :return: Same as .call(), but returns either None or exactly one item """ res = list(self.call(**kwargs)) # Raise any errors for r in res: if isinstance(r, Exception): raise r if expect_result is None and not res: # Allow empty result return None if expect_result is False: if res: raise ValueError(f"Expected result length 0, but got {res}") return None if len(res) != 1: raise ValueError(f"Expected result length 1, but got {res}") return res[0]
Like .call(), but expects exactly one result from the server, or zero when expect_result=False, or either zero or one when expect_result=None. Returns either one object or None.
:param expect_result: None, True, or False :param kwargs: Same as arguments for .call() :return: Same as .call(), but returns either None or exactly one item
def parse(self, xml)
-
Expand source code
def parse(self, xml): """Used mostly for testing, when we want to parse static XML data.""" resp = DummyResponse(content=xml, streaming=self.streaming) _, body = self._get_soap_parts(response=resp) return self._elems_to_objs(self._get_elements_in_response(response=self._get_soap_messages(body=body)))
Used mostly for testing, when we want to parse static XML data.
def stop_streaming(self)
-
Expand source code
def stop_streaming(self): if not self.streaming: raise RuntimeError("Attempt to stop a non-streaming service") if self._streaming_response: self._streaming_response.close() # Release memory self._streaming_response = None if self._streaming_session: self.protocol.release_session(self._streaming_session) self._streaming_session = None
def wrap(self, content, api_version=None)
-
Expand source code
def wrap(self, content, api_version=None): """Generate the necessary boilerplate XML for a raw SOAP request. The XML is specific to the server version. ExchangeImpersonation allows to act as the user we want to impersonate. RequestServerVersion element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/requestserverversion ExchangeImpersonation element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/exchangeimpersonation TimeZoneContent element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/timezonecontext :param content: :param api_version: """ envelope = create_element("s:Envelope", nsmap=self.NS_MAP) header = create_element("s:Header") if api_version: request_server_version = create_element("t:RequestServerVersion", attrs=dict(Version=api_version)) header.append(request_server_version) identity = self._account_to_impersonate if identity: add_xml_child(header, "t:ExchangeImpersonation", identity) timezone = self._timezone if timezone: timezone_context = create_element("t:TimeZoneContext") timezone_definition = create_element("t:TimeZoneDefinition", attrs=dict(Id=timezone.ms_id)) timezone_context.append(timezone_definition) header.append(timezone_context) if len(header): envelope.append(header) body = create_element("s:Body") body.append(content) envelope.append(body) return xml_to_str(envelope, encoding=DEFAULT_ENCODING, xml_declaration=True)
Generate the necessary boilerplate XML for a raw SOAP request. The XML is specific to the server version. ExchangeImpersonation allows to act as the user we want to impersonate.
RequestServerVersion element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/requestserverversion
ExchangeImpersonation element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/exchangeimpersonation
TimeZoneContent element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/timezonecontext
:param content: :param api_version:
class EmptyFolder (*args, **kwargs)
-
Expand source code
class EmptyFolder(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/emptyfolder-operation""" SERVICE_NAME = "EmptyFolder" returns_elements = False def call(self, folders, delete_type, delete_sub_folders): if delete_type not in DELETE_TYPE_CHOICES: raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES) return self._chunked_get_elements( self.get_payload, items=folders, delete_type=delete_type, delete_sub_folders=delete_sub_folders ) def get_payload(self, folders, delete_type, delete_sub_folders): payload = create_element( f"m:{self.SERVICE_NAME}", attrs=dict(DeleteType=delete_type, DeleteSubFolders=delete_sub_folders) ) payload.append(folder_ids_element(folders=folders, version=self.account.version)) return payload
Ancestors
Class variables
var SERVICE_NAME
var returns_elements
Methods
def call(self, folders, delete_type, delete_sub_folders)
-
Expand source code
def call(self, folders, delete_type, delete_sub_folders): if delete_type not in DELETE_TYPE_CHOICES: raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES) return self._chunked_get_elements( self.get_payload, items=folders, delete_type=delete_type, delete_sub_folders=delete_sub_folders )
def get_payload(self, folders, delete_type, delete_sub_folders)
-
Expand source code
def get_payload(self, folders, delete_type, delete_sub_folders): payload = create_element( f"m:{self.SERVICE_NAME}", attrs=dict(DeleteType=delete_type, DeleteSubFolders=delete_sub_folders) ) payload.append(folder_ids_element(folders=folders, version=self.account.version)) return payload
Inherited members
class ExpandDL (protocol, chunk_size=None, timeout=None)
-
Expand source code
class ExpandDL(EWSService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/expanddl-operation""" SERVICE_NAME = "ExpandDL" element_container_name = f"{{{MNS}}}DLExpansion" WARNINGS_TO_IGNORE_IN_RESPONSE = ErrorNameResolutionMultipleResults def call(self, distribution_list): return self._elems_to_objs(self._get_elements(payload=self.get_payload(distribution_list=distribution_list))) def _elem_to_obj(self, elem): return Mailbox.from_xml(elem, account=None) def get_payload(self, distribution_list): return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), distribution_list, version=self.protocol.version)
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/expanddl-operation
Ancestors
Class variables
var SERVICE_NAME
var WARNINGS_TO_IGNORE_IN_RESPONSE
-
Global error type within this module.
var element_container_name
Methods
def call(self, distribution_list)
-
Expand source code
def call(self, distribution_list): return self._elems_to_objs(self._get_elements(payload=self.get_payload(distribution_list=distribution_list)))
def get_payload(self, distribution_list)
-
Expand source code
def get_payload(self, distribution_list): return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), distribution_list, version=self.protocol.version)
Inherited members
class ExportItems (*args, **kwargs)
-
Expand source code
class ExportItems(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/exportitems-operation""" ERRORS_TO_CATCH_IN_RESPONSE = ResponseMessageError SERVICE_NAME = "ExportItems" element_container_name = f"{{{MNS}}}Data" def call(self, items): return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items)) def _elem_to_obj(self, elem): return elem.text # All we want is the 64bit string in the 'Data' tag def get_payload(self, items): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append(item_ids_element(items=items, version=self.account.version)) return payload # We need to override this since ExportItemsResponseMessage is formatted a bit differently. @classmethod def _get_elements_in_container(cls, container): return [container]
Ancestors
Class variables
var ERRORS_TO_CATCH_IN_RESPONSE
-
Global error type within this module.
var SERVICE_NAME
var element_container_name
Methods
def call(self, items)
-
Expand source code
def call(self, items): return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items))
def get_payload(self, items)
-
Expand source code
def get_payload(self, items): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append(item_ids_element(items=items, version=self.account.version)) return payload
Inherited members
class FindFolder (*args, **kwargs)
-
Expand source code
class FindFolder(EWSPagingService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/findfolder-operation""" SERVICE_NAME = "FindFolder" element_container_name = f"{{{TNS}}}Folders" paging_container_name = f"{{{MNS}}}RootFolder" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.root = None # A hack to communicate parsing args to _elems_to_objs() def call(self, folders, additional_fields, restriction, shape, depth, max_items, offset): """Find sub-folders of a folder. :param folders: the folders to act on :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects :param restriction: Restriction object that defines the filters for the query :param shape: The set of attributes to return :param depth: How deep in the folder structure to search for folders :param max_items: The maximum number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0. :return: XML elements for the matching folders """ if shape not in SHAPE_CHOICES: raise InvalidEnumValue("shape", shape, SHAPE_CHOICES) if depth not in FOLDER_TRAVERSAL_CHOICES: raise InvalidEnumValue("depth", depth, FOLDER_TRAVERSAL_CHOICES) roots = {f.root for f in folders} if len(roots) != 1: raise ValueError(f"All folders must have the same root hierarchy ({roots})") self.root = roots.pop() return self._elems_to_objs( self._paged_call( payload_func=self.get_payload, max_items=max_items, folders=folders, **dict( additional_fields=additional_fields, restriction=restriction, shape=shape, depth=depth, page_size=self.page_size, offset=offset, ), ) ) def _elem_to_obj(self, elem): return Folder.from_xml_with_root(elem=elem, root=self.root) def get_payload(self, folders, additional_fields, restriction, shape, depth, page_size, offset=0): payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth)) payload.append( shape_element( tag="m:FolderShape", shape=shape, additional_fields=additional_fields, version=self.account.version ) ) if self.account.version.build >= EXCHANGE_2010: indexed_page_folder_view = create_element( "m:IndexedPageFolderView", attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning"), ) payload.append(indexed_page_folder_view) else: if offset != 0: raise NotImplementedError("'offset' is only supported for Exchange 2010 servers and later") if restriction: payload.append(restriction.to_xml(version=self.account.version)) payload.append(folder_ids_element(folders=folders, version=self.protocol.version, tag="m:ParentFolderIds")) return payload
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
var paging_container_name
Methods
def call(self, folders, additional_fields, restriction, shape, depth, max_items, offset)
-
Expand source code
def call(self, folders, additional_fields, restriction, shape, depth, max_items, offset): """Find sub-folders of a folder. :param folders: the folders to act on :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects :param restriction: Restriction object that defines the filters for the query :param shape: The set of attributes to return :param depth: How deep in the folder structure to search for folders :param max_items: The maximum number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0. :return: XML elements for the matching folders """ if shape not in SHAPE_CHOICES: raise InvalidEnumValue("shape", shape, SHAPE_CHOICES) if depth not in FOLDER_TRAVERSAL_CHOICES: raise InvalidEnumValue("depth", depth, FOLDER_TRAVERSAL_CHOICES) roots = {f.root for f in folders} if len(roots) != 1: raise ValueError(f"All folders must have the same root hierarchy ({roots})") self.root = roots.pop() return self._elems_to_objs( self._paged_call( payload_func=self.get_payload, max_items=max_items, folders=folders, **dict( additional_fields=additional_fields, restriction=restriction, shape=shape, depth=depth, page_size=self.page_size, offset=offset, ), ) )
Find sub-folders of a folder.
:param folders: the folders to act on :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects :param restriction: Restriction object that defines the filters for the query :param shape: The set of attributes to return :param depth: How deep in the folder structure to search for folders :param max_items: The maximum number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0.
:return: XML elements for the matching folders
def get_payload(self, folders, additional_fields, restriction, shape, depth, page_size, offset=0)
-
Expand source code
def get_payload(self, folders, additional_fields, restriction, shape, depth, page_size, offset=0): payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth)) payload.append( shape_element( tag="m:FolderShape", shape=shape, additional_fields=additional_fields, version=self.account.version ) ) if self.account.version.build >= EXCHANGE_2010: indexed_page_folder_view = create_element( "m:IndexedPageFolderView", attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning"), ) payload.append(indexed_page_folder_view) else: if offset != 0: raise NotImplementedError("'offset' is only supported for Exchange 2010 servers and later") if restriction: payload.append(restriction.to_xml(version=self.account.version)) payload.append(folder_ids_element(folders=folders, version=self.protocol.version, tag="m:ParentFolderIds")) return payload
Inherited members
class FindItem (*args, **kwargs)
-
Expand source code
class FindItem(EWSPagingService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/finditem-operation""" SERVICE_NAME = "FindItem" element_container_name = f"{{{TNS}}}Items" paging_container_name = f"{{{MNS}}}RootFolder" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # A hack to communicate parsing args to _elems_to_objs() self.additional_fields = None self.shape = None def call( self, folders, additional_fields, restriction, order_fields, shape, query_string, depth, calendar_view, max_items, offset, ): """Find items in an account. :param folders: the folders to act on :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param restriction: a Restriction object for :param order_fields: the fields to sort the results by :param shape: The set of attributes to return :param query_string: a QueryString object :param depth: How deep in the folder structure to search for items :param calendar_view: If set, returns recurring calendar items unfolded :param max_items: the max number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0. :return: XML elements for the matching items """ if shape not in SHAPE_CHOICES: raise InvalidEnumValue("shape", shape, SHAPE_CHOICES) if depth not in ITEM_TRAVERSAL_CHOICES: raise InvalidEnumValue("depth", depth, ITEM_TRAVERSAL_CHOICES) self.additional_fields = additional_fields self.shape = shape return self._elems_to_objs( self._paged_call( payload_func=self.get_payload, max_items=max_items, folders=folders, **dict( additional_fields=additional_fields, restriction=restriction, order_fields=order_fields, query_string=query_string, shape=shape, depth=depth, calendar_view=calendar_view, page_size=self.page_size, offset=offset, ), ) ) def _elem_to_obj(self, elem): if self.shape == ID_ONLY and self.additional_fields is None: return Item.id_from_xml(elem) return BaseFolder.item_model_from_tag(elem.tag).from_xml(elem=elem, account=self.account) def get_payload( self, folders, additional_fields, restriction, order_fields, query_string, shape, depth, calendar_view, page_size, offset=0, ): payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth)) payload.append( shape_element( tag="m:ItemShape", shape=shape, additional_fields=additional_fields, version=self.account.version ) ) if calendar_view is None: view_type = create_element( "m:IndexedPageItemView", attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning") ) else: view_type = calendar_view.to_xml(version=self.account.version) payload.append(view_type) if restriction: payload.append(restriction.to_xml(version=self.account.version)) if order_fields: payload.append(set_xml_value(create_element("m:SortOrder"), order_fields, version=self.account.version)) payload.append(folder_ids_element(folders=folders, version=self.protocol.version, tag="m:ParentFolderIds")) if query_string: payload.append(query_string.to_xml(version=self.account.version)) return payload
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/finditem-operation
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
var paging_container_name
Methods
def call(self,
folders,
additional_fields,
restriction,
order_fields,
shape,
query_string,
depth,
calendar_view,
max_items,
offset)-
Expand source code
def call( self, folders, additional_fields, restriction, order_fields, shape, query_string, depth, calendar_view, max_items, offset, ): """Find items in an account. :param folders: the folders to act on :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param restriction: a Restriction object for :param order_fields: the fields to sort the results by :param shape: The set of attributes to return :param query_string: a QueryString object :param depth: How deep in the folder structure to search for items :param calendar_view: If set, returns recurring calendar items unfolded :param max_items: the max number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0. :return: XML elements for the matching items """ if shape not in SHAPE_CHOICES: raise InvalidEnumValue("shape", shape, SHAPE_CHOICES) if depth not in ITEM_TRAVERSAL_CHOICES: raise InvalidEnumValue("depth", depth, ITEM_TRAVERSAL_CHOICES) self.additional_fields = additional_fields self.shape = shape return self._elems_to_objs( self._paged_call( payload_func=self.get_payload, max_items=max_items, folders=folders, **dict( additional_fields=additional_fields, restriction=restriction, order_fields=order_fields, query_string=query_string, shape=shape, depth=depth, calendar_view=calendar_view, page_size=self.page_size, offset=offset, ), ) )
Find items in an account.
:param folders: the folders to act on :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param restriction: a Restriction object for :param order_fields: the fields to sort the results by :param shape: The set of attributes to return :param query_string: a QueryString object :param depth: How deep in the folder structure to search for items :param calendar_view: If set, returns recurring calendar items unfolded :param max_items: the max number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0.
:return: XML elements for the matching items
def get_payload(self,
folders,
additional_fields,
restriction,
order_fields,
query_string,
shape,
depth,
calendar_view,
page_size,
offset=0)-
Expand source code
def get_payload( self, folders, additional_fields, restriction, order_fields, query_string, shape, depth, calendar_view, page_size, offset=0, ): payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth)) payload.append( shape_element( tag="m:ItemShape", shape=shape, additional_fields=additional_fields, version=self.account.version ) ) if calendar_view is None: view_type = create_element( "m:IndexedPageItemView", attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning") ) else: view_type = calendar_view.to_xml(version=self.account.version) payload.append(view_type) if restriction: payload.append(restriction.to_xml(version=self.account.version)) if order_fields: payload.append(set_xml_value(create_element("m:SortOrder"), order_fields, version=self.account.version)) payload.append(folder_ids_element(folders=folders, version=self.protocol.version, tag="m:ParentFolderIds")) if query_string: payload.append(query_string.to_xml(version=self.account.version)) return payload
Inherited members
class FindPeople (*args, **kwargs)
-
Expand source code
class FindPeople(EWSPagingService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/findpeople-operation""" SERVICE_NAME = "FindPeople" element_container_name = f"{{{MNS}}}People" supported_from = EXCHANGE_2013 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # A hack to communicate parsing args to _elems_to_objs() self.additional_fields = None self.shape = None def call(self, folder, additional_fields, restriction, order_fields, shape, query_string, depth, max_items, offset): """Find items in an account. This service can only be called on a single folder. :param folder: the Folder object to query :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param restriction: a Restriction object for :param order_fields: the fields to sort the results by :param shape: The set of attributes to return :param query_string: a QueryString object :param depth: How deep in the folder structure to search for items :param max_items: the max number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0. :return: XML elements for the matching items """ if shape not in SHAPE_CHOICES: raise InvalidEnumValue("shape", shape, SHAPE_CHOICES) if depth not in ITEM_TRAVERSAL_CHOICES: raise InvalidEnumValue("depth", depth, ITEM_TRAVERSAL_CHOICES) self.additional_fields = additional_fields self.shape = shape return self._elems_to_objs( self._paged_call( payload_func=self.get_payload, max_items=max_items, folders=[folder], # We just need the list to satisfy self._paged_call() **dict( additional_fields=additional_fields, restriction=restriction, order_fields=order_fields, query_string=query_string, shape=shape, depth=depth, page_size=self.page_size, offset=offset, ), ) ) def _elem_to_obj(self, elem): if self.shape == ID_ONLY and self.additional_fields is None: return Persona.id_from_xml(elem) return Persona.from_xml(elem, account=self.account) def get_payload( self, folders, additional_fields, restriction, order_fields, query_string, shape, depth, page_size, offset=0 ): # We actually only support a single folder, but self._paged_call() sends us a list payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth)) payload.append( shape_element( tag="m:PersonaShape", shape=shape, additional_fields=additional_fields, version=self.account.version ) ) payload.append( create_element( "m:IndexedPageItemView", attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning") ) ) if restriction: payload.append(restriction.to_xml(version=self.account.version)) if order_fields: payload.append(set_xml_value(create_element("m:SortOrder"), order_fields, version=self.account.version)) payload.append(folder_ids_element(folders=folders, version=self.account.version, tag="m:ParentFolderId")) if query_string: payload.append(query_string.to_xml(version=self.account.version)) return payload @staticmethod def _get_paging_values(elem): """Find paging values. The paging element from FindPeople is different from other paging containers.""" item_count = int(elem.find(f"{{{MNS}}}TotalNumberOfPeopleInView").text) first_matching = int(elem.find(f"{{{MNS}}}FirstMatchingRowIndex").text) first_loaded = int(elem.find(f"{{{MNS}}}FirstLoadedRowIndex").text) log.debug( "Got page with total items %s, first matching %s, first loaded %s ", item_count, first_matching, first_loaded, ) next_offset = None # GetPersona does not support fetching more pages return item_count, next_offset
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
var supported_from
Methods
def call(self,
folder,
additional_fields,
restriction,
order_fields,
shape,
query_string,
depth,
max_items,
offset)-
Expand source code
def call(self, folder, additional_fields, restriction, order_fields, shape, query_string, depth, max_items, offset): """Find items in an account. This service can only be called on a single folder. :param folder: the Folder object to query :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param restriction: a Restriction object for :param order_fields: the fields to sort the results by :param shape: The set of attributes to return :param query_string: a QueryString object :param depth: How deep in the folder structure to search for items :param max_items: the max number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0. :return: XML elements for the matching items """ if shape not in SHAPE_CHOICES: raise InvalidEnumValue("shape", shape, SHAPE_CHOICES) if depth not in ITEM_TRAVERSAL_CHOICES: raise InvalidEnumValue("depth", depth, ITEM_TRAVERSAL_CHOICES) self.additional_fields = additional_fields self.shape = shape return self._elems_to_objs( self._paged_call( payload_func=self.get_payload, max_items=max_items, folders=[folder], # We just need the list to satisfy self._paged_call() **dict( additional_fields=additional_fields, restriction=restriction, order_fields=order_fields, query_string=query_string, shape=shape, depth=depth, page_size=self.page_size, offset=offset, ), ) )
Find items in an account. This service can only be called on a single folder.
:param folder: the Folder object to query :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param restriction: a Restriction object for :param order_fields: the fields to sort the results by :param shape: The set of attributes to return :param query_string: a QueryString object :param depth: How deep in the folder structure to search for items :param max_items: the max number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0.
:return: XML elements for the matching items
def get_payload(self,
folders,
additional_fields,
restriction,
order_fields,
query_string,
shape,
depth,
page_size,
offset=0)-
Expand source code
def get_payload( self, folders, additional_fields, restriction, order_fields, query_string, shape, depth, page_size, offset=0 ): # We actually only support a single folder, but self._paged_call() sends us a list payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth)) payload.append( shape_element( tag="m:PersonaShape", shape=shape, additional_fields=additional_fields, version=self.account.version ) ) payload.append( create_element( "m:IndexedPageItemView", attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning") ) ) if restriction: payload.append(restriction.to_xml(version=self.account.version)) if order_fields: payload.append(set_xml_value(create_element("m:SortOrder"), order_fields, version=self.account.version)) payload.append(folder_ids_element(folders=folders, version=self.account.version, tag="m:ParentFolderId")) if query_string: payload.append(query_string.to_xml(version=self.account.version)) return payload
Inherited members
class GetAttachment (*args, **kwargs)
-
Expand source code
class GetAttachment(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getattachment-operation""" SERVICE_NAME = "GetAttachment" element_container_name = f"{{{MNS}}}Attachments" cls_map = {cls.response_tag(): cls for cls in (FileAttachment, ItemAttachment)} def call(self, items, include_mime_content, body_type, filter_html_content, additional_fields): if body_type and body_type not in BODY_TYPE_CHOICES: raise InvalidEnumValue("body_type", body_type, BODY_TYPE_CHOICES) return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=items, include_mime_content=include_mime_content, body_type=body_type, filter_html_content=filter_html_content, additional_fields=additional_fields, ) ) def _elem_to_obj(self, elem): return self.cls_map[elem.tag].from_xml(elem=elem, account=self.account) def get_payload(self, items, include_mime_content, body_type, filter_html_content, additional_fields): payload = create_element(f"m:{self.SERVICE_NAME}") shape_elem = create_element("m:AttachmentShape") if include_mime_content: add_xml_child(shape_elem, "t:IncludeMimeContent", "true") if body_type: add_xml_child(shape_elem, "t:BodyType", body_type) if filter_html_content is not None: add_xml_child(shape_elem, "t:FilterHtmlContent", "true" if filter_html_content else "false") if additional_fields: additional_properties = create_element("t:AdditionalProperties") expanded_fields = chain(*(f.expand(version=self.account.version) for f in additional_fields)) set_xml_value( additional_properties, sorted(expanded_fields, key=lambda f: (getattr(f.field, "field_uri", ""), f.path)), version=self.account.version, ) shape_elem.append(additional_properties) if len(shape_elem): payload.append(shape_elem) payload.append(attachment_ids_element(items=items, version=self.account.version)) return payload def _update_api_version(self, api_version, header): if not self.streaming: super()._update_api_version(api_version, header) # TODO: We're skipping this part in streaming mode because StreamingBase64Parser cannot parse the SOAP header def _get_soap_parts(self, response): if not self.streaming: return super()._get_soap_parts(response) # Pass the response unaltered. We want to use our custom streaming parser return None, response def _get_soap_messages(self, body): if not self.streaming: return super()._get_soap_messages(body) # 'body' is actually the raw response passed on by '_get_soap_parts' r = body parser = StreamingBase64Parser() field = FileAttachment.get_field_by_fieldname("_content") handler = StreamingContentHandler(parser=parser, ns=field.namespace, element_name=field.field_uri) parser.setContentHandler(handler) return parser.parse(r) def stream_file_content(self, attachment_id): # The streaming XML parser can only stream content of one attachment payload = self.get_payload( items=[attachment_id], include_mime_content=False, body_type=None, filter_html_content=None, additional_fields=None, ) self.streaming = True try: yield from self._get_response_xml(payload=payload) except ElementNotFound as enf: # When the returned XML does not contain a Content element, ElementNotFound is thrown by parser.parse(). # Let the non-streaming SOAP parser parse the response and hook into the normal exception handling. # Wrap in DummyResponse because _get_soap_parts() expects an iter_content() method. response = DummyResponse(content=enf.data) _, body = super()._get_soap_parts(response=response) # TODO: We're skipping ._update_api_version() here because we don't have access to the 'api_version' used. res = super()._get_soap_messages(body=body) for e in self._get_elements_in_response(response=res): if isinstance(e, Exception): raise e # The returned content did not contain any EWS exceptions. Give up and re-raise the original exception. raise enf finally: self.stop_streaming() self.streaming = False
Ancestors
Class variables
var SERVICE_NAME
var cls_map
var element_container_name
Methods
def call(self, items, include_mime_content, body_type, filter_html_content, additional_fields)
-
Expand source code
def call(self, items, include_mime_content, body_type, filter_html_content, additional_fields): if body_type and body_type not in BODY_TYPE_CHOICES: raise InvalidEnumValue("body_type", body_type, BODY_TYPE_CHOICES) return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=items, include_mime_content=include_mime_content, body_type=body_type, filter_html_content=filter_html_content, additional_fields=additional_fields, ) )
def get_payload(self, items, include_mime_content, body_type, filter_html_content, additional_fields)
-
Expand source code
def get_payload(self, items, include_mime_content, body_type, filter_html_content, additional_fields): payload = create_element(f"m:{self.SERVICE_NAME}") shape_elem = create_element("m:AttachmentShape") if include_mime_content: add_xml_child(shape_elem, "t:IncludeMimeContent", "true") if body_type: add_xml_child(shape_elem, "t:BodyType", body_type) if filter_html_content is not None: add_xml_child(shape_elem, "t:FilterHtmlContent", "true" if filter_html_content else "false") if additional_fields: additional_properties = create_element("t:AdditionalProperties") expanded_fields = chain(*(f.expand(version=self.account.version) for f in additional_fields)) set_xml_value( additional_properties, sorted(expanded_fields, key=lambda f: (getattr(f.field, "field_uri", ""), f.path)), version=self.account.version, ) shape_elem.append(additional_properties) if len(shape_elem): payload.append(shape_elem) payload.append(attachment_ids_element(items=items, version=self.account.version)) return payload
def stream_file_content(self, attachment_id)
-
Expand source code
def stream_file_content(self, attachment_id): # The streaming XML parser can only stream content of one attachment payload = self.get_payload( items=[attachment_id], include_mime_content=False, body_type=None, filter_html_content=None, additional_fields=None, ) self.streaming = True try: yield from self._get_response_xml(payload=payload) except ElementNotFound as enf: # When the returned XML does not contain a Content element, ElementNotFound is thrown by parser.parse(). # Let the non-streaming SOAP parser parse the response and hook into the normal exception handling. # Wrap in DummyResponse because _get_soap_parts() expects an iter_content() method. response = DummyResponse(content=enf.data) _, body = super()._get_soap_parts(response=response) # TODO: We're skipping ._update_api_version() here because we don't have access to the 'api_version' used. res = super()._get_soap_messages(body=body) for e in self._get_elements_in_response(response=res): if isinstance(e, Exception): raise e # The returned content did not contain any EWS exceptions. Give up and re-raise the original exception. raise enf finally: self.stop_streaming() self.streaming = False
Inherited members
class GetDelegate (*args, **kwargs)
-
Expand source code
class GetDelegate(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getdelegate-operation""" SERVICE_NAME = "GetDelegate" ERRORS_TO_CATCH_IN_RESPONSE = () supported_from = EXCHANGE_2007_SP1 def call(self, user_ids, include_permissions): return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=user_ids or [None], mailbox=DLMailbox(email_address=self.account.primary_smtp_address), include_permissions=include_permissions, ) ) def _elem_to_obj(self, elem): return DelegateUser.from_xml(elem=elem, account=self.account) def get_payload(self, user_ids, mailbox, include_permissions): payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(IncludePermissions=include_permissions)) set_xml_value(payload, mailbox, version=self.protocol.version) if user_ids != [None]: user_ids_elem = create_element("m:UserIds") for user_id in user_ids: if isinstance(user_id, str): user_id = UserId(primary_smtp_address=user_id) set_xml_value(user_ids_elem, user_id, version=self.protocol.version) set_xml_value(payload, user_ids_elem, version=self.protocol.version) return payload @classmethod def _get_elements_in_container(cls, container): return container.findall(DelegateUser.response_tag()) @classmethod def _response_message_tag(cls): return f"{{{MNS}}}DelegateUserResponseMessageType"
Ancestors
Class variables
var ERRORS_TO_CATCH_IN_RESPONSE
var SERVICE_NAME
var supported_from
Methods
def call(self, user_ids, include_permissions)
-
Expand source code
def call(self, user_ids, include_permissions): return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=user_ids or [None], mailbox=DLMailbox(email_address=self.account.primary_smtp_address), include_permissions=include_permissions, ) )
def get_payload(self, user_ids, mailbox, include_permissions)
-
Expand source code
def get_payload(self, user_ids, mailbox, include_permissions): payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(IncludePermissions=include_permissions)) set_xml_value(payload, mailbox, version=self.protocol.version) if user_ids != [None]: user_ids_elem = create_element("m:UserIds") for user_id in user_ids: if isinstance(user_id, str): user_id = UserId(primary_smtp_address=user_id) set_xml_value(user_ids_elem, user_id, version=self.protocol.version) set_xml_value(payload, user_ids_elem, version=self.protocol.version) return payload
Inherited members
class GetEvents (*args, **kwargs)
-
Expand source code
class GetEvents(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getevents-operation """ SERVICE_NAME = "GetEvents" prefer_affinity = True def call(self, subscription_id, watermark): return self._elems_to_objs( self._get_elements( payload=self.get_payload( subscription_id=subscription_id, watermark=watermark, ) ) ) def _elem_to_obj(self, elem): return Notification.from_xml(elem=elem, account=None) @classmethod def _get_elements_in_container(cls, container): return container.findall(Notification.response_tag()) def get_payload(self, subscription_id, watermark): payload = create_element(f"m:{self.SERVICE_NAME}") add_xml_child(payload, "m:SubscriptionId", subscription_id) add_xml_child(payload, "m:Watermark", watermark) return payload
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getevents-operation
Ancestors
Class variables
var SERVICE_NAME
var prefer_affinity
Methods
def call(self, subscription_id, watermark)
-
Expand source code
def call(self, subscription_id, watermark): return self._elems_to_objs( self._get_elements( payload=self.get_payload( subscription_id=subscription_id, watermark=watermark, ) ) )
def get_payload(self, subscription_id, watermark)
-
Expand source code
def get_payload(self, subscription_id, watermark): payload = create_element(f"m:{self.SERVICE_NAME}") add_xml_child(payload, "m:SubscriptionId", subscription_id) add_xml_child(payload, "m:Watermark", watermark) return payload
Inherited members
class GetFolder (*args, **kwargs)
-
Expand source code
class GetFolder(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getfolder-operation""" SERVICE_NAME = "GetFolder" element_container_name = f"{{{MNS}}}Folders" ERRORS_TO_CATCH_IN_RESPONSE = EWSAccountService.ERRORS_TO_CATCH_IN_RESPONSE + ( ErrorAccessDenied, ErrorFolderNotFound, ErrorNoPublicFolderReplicaAvailable, ErrorInvalidOperation, ErrorItemNotFound, ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.folders = [] # A hack to communicate parsing args to _elems_to_objs() def call(self, folders, additional_fields, shape): """Take a folder ID and returns the full information for that folder. :param folders: a list of Folder objects :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects :param shape: The set of attributes to return :return: XML elements for the folders, in stable order """ # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same # class as the folder instance it was requested with. self.folders = list(folders) # Convert to a list, in case 'folders' is a generator. We're iterating twice. return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=self.folders, additional_fields=additional_fields, shape=shape, ) ) def _elems_to_objs(self, elems): for folder, elem in zip(self.folders, elems): if isinstance(elem, Exception): yield elem continue yield parse_folder_elem(elem=elem, folder=folder) def get_payload(self, folders, additional_fields, shape): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append( shape_element( tag="m:FolderShape", shape=shape, additional_fields=additional_fields, version=self.account.version ) ) payload.append(folder_ids_element(folders=folders, version=self.account.version)) return payload
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getfolder-operation
Ancestors
Class variables
var ERRORS_TO_CATCH_IN_RESPONSE
var SERVICE_NAME
var element_container_name
Methods
def call(self, folders, additional_fields, shape)
-
Expand source code
def call(self, folders, additional_fields, shape): """Take a folder ID and returns the full information for that folder. :param folders: a list of Folder objects :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects :param shape: The set of attributes to return :return: XML elements for the folders, in stable order """ # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same # class as the folder instance it was requested with. self.folders = list(folders) # Convert to a list, in case 'folders' is a generator. We're iterating twice. return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=self.folders, additional_fields=additional_fields, shape=shape, ) )
Take a folder ID and returns the full information for that folder.
:param folders: a list of Folder objects :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects :param shape: The set of attributes to return
:return: XML elements for the folders, in stable order
def get_payload(self, folders, additional_fields, shape)
-
Expand source code
def get_payload(self, folders, additional_fields, shape): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append( shape_element( tag="m:FolderShape", shape=shape, additional_fields=additional_fields, version=self.account.version ) ) payload.append(folder_ids_element(folders=folders, version=self.account.version)) return payload
Inherited members
class GetInboxRules (*args, **kwargs)
-
Expand source code
class GetInboxRules(EWSAccountService): """ MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getinboxrules-operation The GetInboxRules operation uses Exchange Web Services to retrieve Inbox rules in the identified user's mailbox. """ SERVICE_NAME = "GetInboxRules" element_container_name = InboxRules.response_tag() ERRORS_TO_CATCH_IN_RESPONSE = EWSAccountService.ERRORS_TO_CATCH_IN_RESPONSE + (ErrorInvalidOperation,) def call(self, mailbox: Optional[str] = None) -> Generator[Union[Rule, Exception, None], Any, None]: if not mailbox: mailbox = self.account.primary_smtp_address payload = self.get_payload(mailbox=mailbox) elements = self._get_elements(payload=payload) return self._elems_to_objs(elements) def _elem_to_obj(self, elem): return Rule.from_xml(elem=elem, account=self.account) def get_payload(self, mailbox): payload = create_element(f"m:{self.SERVICE_NAME}") add_xml_child(payload, "m:MailboxSmtpAddress", mailbox) return payload def _get_element_container(self, message, name=None): if name: response_class = message.get("ResponseClass") response_code = get_xml_attr(message, f"{{{MNS}}}ResponseCode") if response_class == "Success" and response_code == "NoError" and message.find(name) is None: return [] return super()._get_element_container(message, name)
The GetInboxRules operation uses Exchange Web Services to retrieve Inbox rules in the identified user's mailbox.
Ancestors
Class variables
var ERRORS_TO_CATCH_IN_RESPONSE
var SERVICE_NAME
var element_container_name
Methods
def call(self, mailbox: str | None = None) ‑> Generator[Rule | Exception | None, Any, None]
-
Expand source code
def call(self, mailbox: Optional[str] = None) -> Generator[Union[Rule, Exception, None], Any, None]: if not mailbox: mailbox = self.account.primary_smtp_address payload = self.get_payload(mailbox=mailbox) elements = self._get_elements(payload=payload) return self._elems_to_objs(elements)
def get_payload(self, mailbox)
-
Expand source code
def get_payload(self, mailbox): payload = create_element(f"m:{self.SERVICE_NAME}") add_xml_child(payload, "m:MailboxSmtpAddress", mailbox) return payload
Inherited members
class GetItem (*args, **kwargs)
-
Expand source code
class GetItem(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getitem-operation""" SERVICE_NAME = "GetItem" element_container_name = f"{{{MNS}}}Items" def call(self, items, additional_fields, shape): """Return all items in an account that correspond to a list of ID's, in stable order. :param items: a list of (id, changekey) tuples or Item objects :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param shape: The shape of returned objects :return: XML elements for the items, in stable order """ return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=items, additional_fields=additional_fields, shape=shape, ) ) def _elem_to_obj(self, elem): return BaseFolder.item_model_from_tag(elem.tag).from_xml(elem=elem, account=self.account) def get_payload(self, items, additional_fields, shape): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append( shape_element( tag="m:ItemShape", shape=shape, additional_fields=additional_fields, version=self.account.version ) ) payload.append(item_ids_element(items=items, version=self.account.version)) return payload
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getitem-operation
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
Methods
def call(self, items, additional_fields, shape)
-
Expand source code
def call(self, items, additional_fields, shape): """Return all items in an account that correspond to a list of ID's, in stable order. :param items: a list of (id, changekey) tuples or Item objects :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param shape: The shape of returned objects :return: XML elements for the items, in stable order """ return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=items, additional_fields=additional_fields, shape=shape, ) )
Return all items in an account that correspond to a list of ID's, in stable order.
:param items: a list of (id, changekey) tuples or Item objects :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param shape: The shape of returned objects
:return: XML elements for the items, in stable order
def get_payload(self, items, additional_fields, shape)
-
Expand source code
def get_payload(self, items, additional_fields, shape): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append( shape_element( tag="m:ItemShape", shape=shape, additional_fields=additional_fields, version=self.account.version ) ) payload.append(item_ids_element(items=items, version=self.account.version)) return payload
Inherited members
class GetMailTips (protocol, chunk_size=None, timeout=None)
-
Expand source code
class GetMailTips(EWSService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getmailtips-operation""" SERVICE_NAME = "GetMailTips" def call(self, sending_as, recipients, mail_tips_requested): return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=recipients, sending_as=sending_as, mail_tips_requested=mail_tips_requested, ) ) def _elem_to_obj(self, elem): return MailTips.from_xml(elem=elem, account=None) def get_payload(self, recipients, sending_as, mail_tips_requested): payload = create_element(f"m:{self.SERVICE_NAME}") set_xml_value(payload, sending_as, version=self.protocol.version) recipients_elem = create_element("m:Recipients") for recipient in recipients: set_xml_value(recipients_elem, recipient, version=self.protocol.version) payload.append(recipients_elem) if mail_tips_requested: set_xml_value(payload, mail_tips_requested, version=self.protocol.version) return payload def _get_elements_in_response(self, response): for msg in response: yield self._get_element_container(message=msg, name=MailTips.response_tag()) @classmethod def _response_message_tag(cls): return f"{{{MNS}}}MailTipsResponseMessageType"
Ancestors
Class variables
var SERVICE_NAME
Methods
def call(self, sending_as, recipients, mail_tips_requested)
-
Expand source code
def call(self, sending_as, recipients, mail_tips_requested): return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=recipients, sending_as=sending_as, mail_tips_requested=mail_tips_requested, ) )
def get_payload(self, recipients, sending_as, mail_tips_requested)
-
Expand source code
def get_payload(self, recipients, sending_as, mail_tips_requested): payload = create_element(f"m:{self.SERVICE_NAME}") set_xml_value(payload, sending_as, version=self.protocol.version) recipients_elem = create_element("m:Recipients") for recipient in recipients: set_xml_value(recipients_elem, recipient, version=self.protocol.version) payload.append(recipients_elem) if mail_tips_requested: set_xml_value(payload, mail_tips_requested, version=self.protocol.version) return payload
Inherited members
class GetPersona (*args, **kwargs)
-
Expand source code
class GetPersona(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getpersona-operation""" SERVICE_NAME = "GetPersona" def call(self, personas): # GetPersona only accepts one persona ID per request. Crazy. for persona in personas: yield from self._elems_to_objs(self._get_elements(payload=self.get_payload(persona=persona))) def _elem_to_obj(self, elem): return Persona.from_xml(elem=elem, account=None) def get_payload(self, persona): return set_xml_value( create_element(f"m:{self.SERVICE_NAME}"), to_item_id(persona, PersonaId), version=self.protocol.version ) @classmethod def _get_elements_in_container(cls, container): return container.findall(f"{{{MNS}}}{Persona.ELEMENT_NAME}") @classmethod def _response_tag(cls): return f"{{{MNS}}}{cls.SERVICE_NAME}ResponseMessage"
Ancestors
Class variables
var SERVICE_NAME
Methods
def call(self, personas)
-
Expand source code
def call(self, personas): # GetPersona only accepts one persona ID per request. Crazy. for persona in personas: yield from self._elems_to_objs(self._get_elements(payload=self.get_payload(persona=persona)))
def get_payload(self, persona)
-
Expand source code
def get_payload(self, persona): return set_xml_value( create_element(f"m:{self.SERVICE_NAME}"), to_item_id(persona, PersonaId), version=self.protocol.version )
Inherited members
class GetRoomLists (protocol, chunk_size=None, timeout=None)
-
Expand source code
class GetRoomLists(EWSService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getroomlists-operation""" SERVICE_NAME = "GetRoomLists" element_container_name = f"{{{MNS}}}RoomLists" supported_from = EXCHANGE_2010 def call(self): return self._elems_to_objs(self._get_elements(payload=self.get_payload())) def _elem_to_obj(self, elem): return RoomList.from_xml(elem=elem, account=None) def get_payload(self): return create_element(f"m:{self.SERVICE_NAME}")
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
var supported_from
Methods
def call(self)
-
Expand source code
def call(self): return self._elems_to_objs(self._get_elements(payload=self.get_payload()))
def get_payload(self)
-
Expand source code
def get_payload(self): return create_element(f"m:{self.SERVICE_NAME}")
Inherited members
class GetRooms (protocol, chunk_size=None, timeout=None)
-
Expand source code
class GetRooms(EWSService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getrooms-operation""" SERVICE_NAME = "GetRooms" element_container_name = f"{{{MNS}}}Rooms" supported_from = EXCHANGE_2010 def call(self, room_list): return self._elems_to_objs(self._get_elements(payload=self.get_payload(room_list=room_list))) def _elem_to_obj(self, elem): return Room.from_xml(elem=elem, account=None) def get_payload(self, room_list): return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), room_list, version=self.protocol.version)
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getrooms-operation
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
var supported_from
Methods
def call(self, room_list)
-
Expand source code
def call(self, room_list): return self._elems_to_objs(self._get_elements(payload=self.get_payload(room_list=room_list)))
def get_payload(self, room_list)
-
Expand source code
def get_payload(self, room_list): return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), room_list, version=self.protocol.version)
Inherited members
class GetSearchableMailboxes (protocol, chunk_size=None, timeout=None)
-
Expand source code
class GetSearchableMailboxes(EWSService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getsearchablemailboxes-operation """ SERVICE_NAME = "GetSearchableMailboxes" element_container_name = f"{{{MNS}}}SearchableMailboxes" failed_mailboxes_container_name = f"{{{MNS}}}FailedMailboxes" supported_from = EXCHANGE_2013 cls_map = {cls.response_tag(): cls for cls in (SearchableMailbox, FailedMailbox)} def call(self, search_filter, expand_group_membership): return self._elems_to_objs( self._get_elements( payload=self.get_payload( search_filter=search_filter, expand_group_membership=expand_group_membership, ) ) ) def _elem_to_obj(self, elem): return self.cls_map[elem.tag].from_xml(elem=elem, account=None) def get_payload(self, search_filter, expand_group_membership): payload = create_element(f"m:{self.SERVICE_NAME}") if search_filter: add_xml_child(payload, "m:SearchFilter", search_filter) if expand_group_membership is not None: add_xml_child(payload, "m:ExpandGroupMembership", "true" if expand_group_membership else "false") return payload def _get_elements_in_response(self, response): for msg in response: for container_name in (self.element_container_name, self.failed_mailboxes_container_name): try: container = self._get_element_container(message=msg, name=container_name) except MalformedResponseError: # Responses may contain no mailboxes of either kind. _get_element_container() does not accept this. continue yield from self._get_elements_in_container(container=container)
Ancestors
Class variables
var SERVICE_NAME
var cls_map
var element_container_name
var failed_mailboxes_container_name
var supported_from
Methods
def call(self, search_filter, expand_group_membership)
-
Expand source code
def call(self, search_filter, expand_group_membership): return self._elems_to_objs( self._get_elements( payload=self.get_payload( search_filter=search_filter, expand_group_membership=expand_group_membership, ) ) )
def get_payload(self, search_filter, expand_group_membership)
-
Expand source code
def get_payload(self, search_filter, expand_group_membership): payload = create_element(f"m:{self.SERVICE_NAME}") if search_filter: add_xml_child(payload, "m:SearchFilter", search_filter) if expand_group_membership is not None: add_xml_child(payload, "m:ExpandGroupMembership", "true" if expand_group_membership else "false") return payload
Inherited members
class GetServerTimeZones (protocol, chunk_size=None, timeout=None)
-
Expand source code
class GetServerTimeZones(EWSService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getservertimezones-operation """ SERVICE_NAME = "GetServerTimeZones" element_container_name = f"{{{MNS}}}TimeZoneDefinitions" supported_from = EXCHANGE_2010 def call(self, timezones=None, return_full_timezone_data=False): return self._elems_to_objs( self._get_elements( payload=self.get_payload(timezones=timezones, return_full_timezone_data=return_full_timezone_data) ) ) def get_payload(self, timezones, return_full_timezone_data): payload = create_element( f"m:{self.SERVICE_NAME}", attrs=dict(ReturnFullTimeZoneData=return_full_timezone_data), ) if timezones is not None: is_empty, timezones = peek(timezones) if not is_empty: tz_ids = create_element("m:Ids") for timezone in timezones: tz_id = set_xml_value(create_element("t:Id"), timezone.ms_id) tz_ids.append(tz_id) payload.append(tz_ids) return payload def _elem_to_obj(self, elem): return TimeZoneDefinition.from_xml(elem=elem, account=None)
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
var supported_from
Methods
def call(self, timezones=None, return_full_timezone_data=False)
-
Expand source code
def call(self, timezones=None, return_full_timezone_data=False): return self._elems_to_objs( self._get_elements( payload=self.get_payload(timezones=timezones, return_full_timezone_data=return_full_timezone_data) ) )
def get_payload(self, timezones, return_full_timezone_data)
-
Expand source code
def get_payload(self, timezones, return_full_timezone_data): payload = create_element( f"m:{self.SERVICE_NAME}", attrs=dict(ReturnFullTimeZoneData=return_full_timezone_data), ) if timezones is not None: is_empty, timezones = peek(timezones) if not is_empty: tz_ids = create_element("m:Ids") for timezone in timezones: tz_id = set_xml_value(create_element("t:Id"), timezone.ms_id) tz_ids.append(tz_id) payload.append(tz_ids) return payload
Inherited members
class GetStreamingEvents (*args, **kwargs)
-
Expand source code
class GetStreamingEvents(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getstreamingevents-operation """ SERVICE_NAME = "GetStreamingEvents" element_container_name = f"{{{MNS}}}Notifications" prefer_affinity = True # Connection status values OK = "OK" CLOSED = "Closed" def __init__(self, *args, **kwargs): # These values are set each time call() is consumed self.connection_status = None super().__init__(*args, **kwargs) self.streaming = True def call(self, subscription_ids, connection_timeout): if not isinstance(connection_timeout, int): raise InvalidTypeError("connection_timeout", connection_timeout, int) if connection_timeout < 1: raise ValueError(f"'connection_timeout' {connection_timeout} must be a positive integer") # Add 60 seconds to the timeout, to allow us to always get the final message containing ConnectionStatus=Closed self.timeout = connection_timeout * 60 + 60 return self._elems_to_objs( self._get_elements( payload=self.get_payload( subscription_ids=subscription_ids, connection_timeout=connection_timeout, ) ) ) def _elem_to_obj(self, elem): return Notification.from_xml(elem=elem, account=None) @classmethod def _get_soap_parts(cls, response): # Pass the response unaltered. We want to use our custom document yielder return None, response def _get_soap_messages(self, body): # 'body' is actually the raw response passed on by '_get_soap_parts'. We want to continuously read the content, # looking for complete XML documents. When we have a full document, we want to parse it as if it was a normal # XML response. r = body for i, doc in enumerate(DocumentYielder(r.iter_content()), start=1): xml_log.debug("Response XML (docs counter: %(i)s): %(xml_response)s", dict(i=i, xml_response=doc)) response = DummyResponse(content=doc) try: _, body = super()._get_soap_parts(response=response) except Exception: r.close() # Release memory raise # TODO: We're skipping ._update_api_version() here because we don't have access to the 'api_version' used. # TODO: We should be doing a lot of error handling for ._get_soap_messages(). yield from super()._get_soap_messages(body=body) if self.connection_status == self.CLOSED: # Don't wait for the TCP connection to timeout break def _get_element_container(self, message, name=None): error_ids_elem = message.find(f"{{{MNS}}}ErrorSubscriptionIds") error_ids = [] if error_ids_elem is None else get_xml_attrs(error_ids_elem, f"{{{MNS}}}SubscriptionId") self.connection_status = get_xml_attr(message, f"{{{MNS}}}ConnectionStatus") # Either 'OK' or 'Closed' log.debug("Connection status is: %s", self.connection_status) # Upstream normally expects to find a 'name' tag but our response does not always have it. We still want to # call upstream, to have exceptions raised. Return an empty list if there is no 'name' tag and no errors. if message.find(name) is None: name = None try: res = super()._get_element_container(message=message, name=name) except EWSError as e: # When the request contains a combination of good and failing subscription IDs, notifications for the good # subscriptions seem to never be returned even though the XML spec allows it. This means there's no point in # trying to collect any notifications here and delivering a combination of errors and return values. if error_ids: e.value += f" (subscription IDs: {error_ids})" raise e return [] if name is None else res def get_payload(self, subscription_ids, connection_timeout): payload = create_element(f"m:{self.SERVICE_NAME}") subscriptions_elem = create_element("m:SubscriptionIds") for subscription_id in subscription_ids: add_xml_child(subscriptions_elem, "t:SubscriptionId", subscription_id) payload.append(subscriptions_elem) add_xml_child(payload, "m:ConnectionTimeout", connection_timeout) return payload
Ancestors
Class variables
var CLOSED
var OK
var SERVICE_NAME
var element_container_name
var prefer_affinity
Methods
def call(self, subscription_ids, connection_timeout)
-
Expand source code
def call(self, subscription_ids, connection_timeout): if not isinstance(connection_timeout, int): raise InvalidTypeError("connection_timeout", connection_timeout, int) if connection_timeout < 1: raise ValueError(f"'connection_timeout' {connection_timeout} must be a positive integer") # Add 60 seconds to the timeout, to allow us to always get the final message containing ConnectionStatus=Closed self.timeout = connection_timeout * 60 + 60 return self._elems_to_objs( self._get_elements( payload=self.get_payload( subscription_ids=subscription_ids, connection_timeout=connection_timeout, ) ) )
def get_payload(self, subscription_ids, connection_timeout)
-
Expand source code
def get_payload(self, subscription_ids, connection_timeout): payload = create_element(f"m:{self.SERVICE_NAME}") subscriptions_elem = create_element("m:SubscriptionIds") for subscription_id in subscription_ids: add_xml_child(subscriptions_elem, "t:SubscriptionId", subscription_id) payload.append(subscriptions_elem) add_xml_child(payload, "m:ConnectionTimeout", connection_timeout) return payload
Inherited members
class GetUserAvailability (*args, **kwargs)
-
Expand source code
class GetUserAvailability(EWSService): """Get detailed availability information for a list of users. MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getuseravailability-operation """ SERVICE_NAME = "GetUserAvailability" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.tzinfo = None def call(self, tzinfo, mailbox_data, timezone, free_busy_view_options): # TODO: Also supports SuggestionsViewOptions, see # https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/suggestionsviewoptions self.tzinfo = tzinfo return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=mailbox_data, timezone=timezone, free_busy_view_options=free_busy_view_options, ) ) @property def _timezone(self): return self.tzinfo def _elem_to_obj(self, elem): fake_account = namedtuple("Account", ["default_timezone"])(default_timezone=self.tzinfo) return FreeBusyView.from_xml(elem=elem, account=fake_account) def get_payload(self, mailbox_data, timezone, free_busy_view_options): payload = create_element(f"m:{self.SERVICE_NAME}Request") set_xml_value(payload, timezone, version=self.protocol.version) mailbox_data_array = create_element("m:MailboxDataArray") set_xml_value(mailbox_data_array, mailbox_data, version=self.protocol.version) payload.append(mailbox_data_array) return set_xml_value(payload, free_busy_view_options, version=self.protocol.version) @staticmethod def _response_messages_tag(): return f"{{{MNS}}}FreeBusyResponseArray" @classmethod def _response_message_tag(cls): return f"{{{MNS}}}FreeBusyResponse" def _get_elements_in_response(self, response): for msg in response: container_or_exc = self._get_element_container(message=msg.find(f"{{{MNS}}}ResponseMessage")) if isinstance(container_or_exc, Exception): yield container_or_exc else: yield from self._get_elements_in_container(container=msg) @classmethod def _get_elements_in_container(cls, container): return [container.find(f"{{{MNS}}}FreeBusyView")]
Get detailed availability information for a list of users. MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getuseravailability-operation
Ancestors
Class variables
var SERVICE_NAME
Methods
def call(self, tzinfo, mailbox_data, timezone, free_busy_view_options)
-
Expand source code
def call(self, tzinfo, mailbox_data, timezone, free_busy_view_options): # TODO: Also supports SuggestionsViewOptions, see # https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/suggestionsviewoptions self.tzinfo = tzinfo return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=mailbox_data, timezone=timezone, free_busy_view_options=free_busy_view_options, ) )
def get_payload(self, mailbox_data, timezone, free_busy_view_options)
-
Expand source code
def get_payload(self, mailbox_data, timezone, free_busy_view_options): payload = create_element(f"m:{self.SERVICE_NAME}Request") set_xml_value(payload, timezone, version=self.protocol.version) mailbox_data_array = create_element("m:MailboxDataArray") set_xml_value(mailbox_data_array, mailbox_data, version=self.protocol.version) payload.append(mailbox_data_array) return set_xml_value(payload, free_busy_view_options, version=self.protocol.version)
Inherited members
class GetUserConfiguration (*args, **kwargs)
-
Expand source code
class GetUserConfiguration(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getuserconfiguration-operation """ SERVICE_NAME = "GetUserConfiguration" def call(self, user_configuration_name, properties): if properties not in PROPERTIES_CHOICES: raise InvalidEnumValue("properties", properties, PROPERTIES_CHOICES) return self._elems_to_objs( self._get_elements( payload=self.get_payload(user_configuration_name=user_configuration_name, properties=properties) ) ) def _elem_to_obj(self, elem): return UserConfiguration.from_xml(elem=elem, account=self.account) @classmethod def _get_elements_in_container(cls, container): return container.findall(UserConfiguration.response_tag()) def get_payload(self, user_configuration_name, properties): payload = create_element(f"m:{self.SERVICE_NAME}") set_xml_value(payload, user_configuration_name, version=self.account.version) payload.append( set_xml_value(create_element("m:UserConfigurationProperties"), properties, version=self.account.version) ) return payload
Ancestors
Class variables
var SERVICE_NAME
Methods
def call(self, user_configuration_name, properties)
-
Expand source code
def call(self, user_configuration_name, properties): if properties not in PROPERTIES_CHOICES: raise InvalidEnumValue("properties", properties, PROPERTIES_CHOICES) return self._elems_to_objs( self._get_elements( payload=self.get_payload(user_configuration_name=user_configuration_name, properties=properties) ) )
def get_payload(self, user_configuration_name, properties)
-
Expand source code
def get_payload(self, user_configuration_name, properties): payload = create_element(f"m:{self.SERVICE_NAME}") set_xml_value(payload, user_configuration_name, version=self.account.version) payload.append( set_xml_value(create_element("m:UserConfigurationProperties"), properties, version=self.account.version) ) return payload
Inherited members
class GetUserOofSettings (*args, **kwargs)
-
Expand source code
class GetUserOofSettings(EWSAccountService): """Get automatic reply settings for the specified mailbox. MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getuseroofsettings-operation """ SERVICE_NAME = "GetUserOofSettings" element_container_name = f"{{{TNS}}}OofSettings" def call(self, mailbox): return self._elems_to_objs(self._get_elements(payload=self.get_payload(mailbox=mailbox))) def _elem_to_obj(self, elem): return OofSettings.from_xml(elem=elem, account=self.account) def get_payload(self, mailbox): return set_xml_value( create_element(f"m:{self.SERVICE_NAME}Request"), AvailabilityMailbox.from_mailbox(mailbox), version=self.account.version, ) @classmethod def _get_elements_in_container(cls, container): # This service only returns one result, directly in 'container' return [container] def _get_element_container(self, message, name=None): # This service returns the result container outside the response message super()._get_element_container(message=message.find(self._response_message_tag()), name=None) return message.find(name) @classmethod def _response_message_tag(cls): return f"{{{MNS}}}ResponseMessage"
Get automatic reply settings for the specified mailbox. MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getuseroofsettings-operation
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
Methods
def call(self, mailbox)
-
Expand source code
def call(self, mailbox): return self._elems_to_objs(self._get_elements(payload=self.get_payload(mailbox=mailbox)))
def get_payload(self, mailbox)
-
Expand source code
def get_payload(self, mailbox): return set_xml_value( create_element(f"m:{self.SERVICE_NAME}Request"), AvailabilityMailbox.from_mailbox(mailbox), version=self.account.version, )
Inherited members
class GetUserSettings (protocol, chunk_size=None, timeout=None)
-
Expand source code
class GetUserSettings(EWSService): """Take a list of users and requested Autodiscover settings for these users. Returns the requested settings values. MSDN: https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/getusersettings-operation-soap """ SERVICE_NAME = "GetUserSettings" NS_MAP = {k: v for k, v in ns_translation.items() if k in ("s", "t", "a", "wsa", "xsi")} element_container_name = f"{{{ANS}}}UserResponses" supported_from = EXCHANGE_2010 def call(self, users, settings): return self._elems_to_objs(self._get_elements(self.get_payload(users=users, settings=settings))) def wrap(self, content, api_version=None): envelope = create_element("s:Envelope", nsmap=self.NS_MAP) header = create_element("s:Header") if api_version: add_xml_child(header, "a:RequestedServerVersion", api_version) action = f"http://schemas.microsoft.com/exchange/2010/Autodiscover/Autodiscover/{self.SERVICE_NAME}" add_xml_child(header, "wsa:Action", action) add_xml_child(header, "wsa:To", self.protocol.service_endpoint) identity = self._account_to_impersonate if identity: add_xml_child(header, "t:ExchangeImpersonation", identity) envelope.append(header) body = create_element("s:Body") body.append(content) envelope.append(body) return xml_to_str(envelope, encoding=DEFAULT_ENCODING, xml_declaration=True) def _elem_to_obj(self, elem): return UserResponse.from_xml(elem=elem, account=None) def get_payload(self, users, settings): payload = create_element(f"a:{self.SERVICE_NAME}RequestMessage") request = create_element("a:Request") users_elem = create_element("a:Users") for user in users: mailbox = create_element("a:Mailbox") set_xml_value(mailbox, user) add_xml_child(users_elem, "a:User", mailbox) request.append(users_elem) requested_settings = create_element("a:RequestedSettings") for setting in settings: add_xml_child(requested_settings, "a:Setting", UserResponse.SETTINGS_MAP[setting]) request.append(requested_settings) payload.append(request) return payload @classmethod def _response_tag(cls): """Return the name of the element containing the service response.""" return f"{{{ANS}}}{cls.SERVICE_NAME}ResponseMessage" def _get_element_container(self, message, name=None): response = message.find(f"{{{ANS}}}Response") # ErrorCode: See # https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/errorcode-soap # There are two 'ErrorCode' elements in the response; one is a child of the 'Response' element, the other is a # child of the 'UserResponse' element. Let's handle both with the same code. res = UserResponse.parse_elem(response) if isinstance(res, Exception): raise res if res.error_code == "NoError": container = response.find(name) if container is None: raise MalformedResponseError(f"No {name} elements in ResponseMessage ({xml_to_str(response)})") return container # Raise any non-acceptable errors in the container, or return the acceptable exception instance try: raise self._get_exception(code=res.error_code, text=res.error_message) except self.ERRORS_TO_CATCH_IN_RESPONSE as e: return e
Take a list of users and requested Autodiscover settings for these users. Returns the requested settings values.
Ancestors
Class variables
var NS_MAP
var SERVICE_NAME
var element_container_name
var supported_from
Methods
def call(self, users, settings)
-
Expand source code
def call(self, users, settings): return self._elems_to_objs(self._get_elements(self.get_payload(users=users, settings=settings)))
def get_payload(self, users, settings)
-
Expand source code
def get_payload(self, users, settings): payload = create_element(f"a:{self.SERVICE_NAME}RequestMessage") request = create_element("a:Request") users_elem = create_element("a:Users") for user in users: mailbox = create_element("a:Mailbox") set_xml_value(mailbox, user) add_xml_child(users_elem, "a:User", mailbox) request.append(users_elem) requested_settings = create_element("a:RequestedSettings") for setting in settings: add_xml_child(requested_settings, "a:Setting", UserResponse.SETTINGS_MAP[setting]) request.append(requested_settings) payload.append(request) return payload
Inherited members
class MarkAsJunk (*args, **kwargs)
-
Expand source code
class MarkAsJunk(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/markasjunk-operation""" SERVICE_NAME = "MarkAsJunk" def call(self, items, is_junk, move_item): return self._elems_to_objs( self._chunked_get_elements(self.get_payload, items=items, is_junk=is_junk, move_item=move_item) ) def _elem_to_obj(self, elem): return MovedItemId.id_from_xml(elem) @classmethod def _get_elements_in_container(cls, container): return container.findall(MovedItemId.response_tag()) def get_payload(self, items, is_junk, move_item): # Takes a list of items and returns either success or raises an error message payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(IsJunk=is_junk, MoveItem=move_item)) payload.append(item_ids_element(items=items, version=self.account.version)) return payload
Ancestors
Class variables
var SERVICE_NAME
Methods
def call(self, items, is_junk, move_item)
-
Expand source code
def call(self, items, is_junk, move_item): return self._elems_to_objs( self._chunked_get_elements(self.get_payload, items=items, is_junk=is_junk, move_item=move_item) )
def get_payload(self, items, is_junk, move_item)
-
Expand source code
def get_payload(self, items, is_junk, move_item): # Takes a list of items and returns either success or raises an error message payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(IsJunk=is_junk, MoveItem=move_item)) payload.append(item_ids_element(items=items, version=self.account.version)) return payload
Inherited members
class MoveFolder (*args, **kwargs)
-
Expand source code
class MoveFolder(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/movefolder-operation""" SERVICE_NAME = "MoveFolder" element_container_name = f"{{{MNS}}}Folders" def call(self, folders, to_folder): if not isinstance(to_folder, (BaseFolder, FolderId)): raise InvalidTypeError("to_folder", to_folder, (BaseFolder, FolderId)) return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=folders, to_folder=to_folder)) def _elem_to_obj(self, elem): return FolderId.from_xml(elem=elem.find(FolderId.response_tag()), account=self.account) def get_payload(self, folders, to_folder): # Takes a list of folders and returns their new folder IDs payload = create_element(f"m:{self.SERVICE_NAME}") payload.append(folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ToFolderId")) payload.append(folder_ids_element(folders=folders, version=self.account.version)) return payload
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
Methods
def call(self, folders, to_folder)
-
Expand source code
def call(self, folders, to_folder): if not isinstance(to_folder, (BaseFolder, FolderId)): raise InvalidTypeError("to_folder", to_folder, (BaseFolder, FolderId)) return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=folders, to_folder=to_folder))
def get_payload(self, folders, to_folder)
-
Expand source code
def get_payload(self, folders, to_folder): # Takes a list of folders and returns their new folder IDs payload = create_element(f"m:{self.SERVICE_NAME}") payload.append(folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ToFolderId")) payload.append(folder_ids_element(folders=folders, version=self.account.version)) return payload
Inherited members
class MoveItem (*args, **kwargs)
-
Expand source code
class MoveItem(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/moveitem-operation""" SERVICE_NAME = "MoveItem" element_container_name = f"{{{MNS}}}Items" def call(self, items, to_folder): if not isinstance(to_folder, (BaseFolder, FolderId)): raise InvalidTypeError("to_folder", to_folder, (BaseFolder, FolderId)) return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, to_folder=to_folder)) def _elem_to_obj(self, elem): return Item.id_from_xml(elem) def get_payload(self, items, to_folder): # Takes a list of items and returns their new item IDs payload = create_element(f"m:{self.SERVICE_NAME}") payload.append(folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ToFolderId")) payload.append(item_ids_element(items=items, version=self.account.version)) return payload
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/moveitem-operation
Ancestors
Subclasses
Class variables
var SERVICE_NAME
var element_container_name
Methods
def call(self, items, to_folder)
-
Expand source code
def call(self, items, to_folder): if not isinstance(to_folder, (BaseFolder, FolderId)): raise InvalidTypeError("to_folder", to_folder, (BaseFolder, FolderId)) return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, to_folder=to_folder))
def get_payload(self, items, to_folder)
-
Expand source code
def get_payload(self, items, to_folder): # Takes a list of items and returns their new item IDs payload = create_element(f"m:{self.SERVICE_NAME}") payload.append(folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ToFolderId")) payload.append(item_ids_element(items=items, version=self.account.version)) return payload
Inherited members
class ResolveNames (*args, **kwargs)
-
Expand source code
class ResolveNames(EWSService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/resolvenames-operation""" SERVICE_NAME = "ResolveNames" element_container_name = f"{{{MNS}}}ResolutionSet" ERRORS_TO_CATCH_IN_RESPONSE = ErrorNameResolutionNoResults WARNINGS_TO_IGNORE_IN_RESPONSE = ErrorNameResolutionMultipleResults # According to the 'Remarks' section of the MSDN documentation referenced above, at most 100 candidates are # returned for a lookup. # Note: paging information is returned as attrs on the 'ResolutionSet' element, but this service does not # support the 'IndexedPageItemView' element, so it's not really a paging service. candidates_limit = 100 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.return_full_contact_data = False # A hack to communicate parsing args to _elems_to_objs() def call( self, unresolved_entries, parent_folders=None, return_full_contact_data=False, search_scope=None, contact_data_shape=None, ): if self.chunk_size > 100: raise ValueError( f"Chunk size {self.chunk_size} is too high. {self.SERVICE_NAME} supports returning at most 100 " f"candidates for a lookup", ) if search_scope and search_scope not in SEARCH_SCOPE_CHOICES: raise InvalidEnumValue("search_scope", search_scope, SEARCH_SCOPE_CHOICES) if contact_data_shape and contact_data_shape not in SHAPE_CHOICES: raise InvalidEnumValue("contact_data_shape", contact_data_shape, SHAPE_CHOICES) self.return_full_contact_data = return_full_contact_data return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=unresolved_entries, parent_folders=parent_folders, return_full_contact_data=return_full_contact_data, search_scope=search_scope, contact_data_shape=contact_data_shape, ) ) def _get_element_container(self, message, name=None): container_or_exc = super()._get_element_container(message=message, name=name) if isinstance(container_or_exc, Exception): return container_or_exc is_last_page = container_or_exc.get("IncludesLastItemInRange").lower() in ("true", "0") log.debug("Includes last item in range: %s", is_last_page) if not is_last_page: warnings.warn( f"The {self.__class__.__name__} service returns at most {self.candidates_limit} candidates and does " f"not support paging. You have reached this limit and have not received the exhaustive list of " f"candidates." ) return container_or_exc def _elem_to_obj(self, elem): if self.return_full_contact_data: mailbox_elem = elem.find(Mailbox.response_tag()) contact_elem = elem.find(Contact.response_tag()) return ( None if mailbox_elem is None else Mailbox.from_xml(elem=mailbox_elem, account=None), None if contact_elem is None else Contact.from_xml(elem=contact_elem, account=None), ) return Mailbox.from_xml(elem=elem.find(Mailbox.response_tag()), account=None) def get_payload( self, unresolved_entries, parent_folders, return_full_contact_data, search_scope, contact_data_shape ): attrs = dict(ReturnFullContactData=return_full_contact_data) if search_scope: attrs["SearchScope"] = search_scope if contact_data_shape: if self.protocol.version.build < EXCHANGE_2010_SP2: raise NotImplementedError( "'contact_data_shape' is only supported for Exchange 2010 SP2 servers and later" ) attrs["ContactDataShape"] = contact_data_shape payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs) if parent_folders: payload.append( folder_ids_element(folders=parent_folders, version=self.protocol.version, tag="m:ParentFolderIds") ) for entry in unresolved_entries: add_xml_child(payload, "m:UnresolvedEntry", entry) return payload
Ancestors
Class variables
var ERRORS_TO_CATCH_IN_RESPONSE
-
Global error type within this module.
var SERVICE_NAME
var WARNINGS_TO_IGNORE_IN_RESPONSE
-
Global error type within this module.
var candidates_limit
var element_container_name
Methods
def call(self,
unresolved_entries,
parent_folders=None,
return_full_contact_data=False,
search_scope=None,
contact_data_shape=None)-
Expand source code
def call( self, unresolved_entries, parent_folders=None, return_full_contact_data=False, search_scope=None, contact_data_shape=None, ): if self.chunk_size > 100: raise ValueError( f"Chunk size {self.chunk_size} is too high. {self.SERVICE_NAME} supports returning at most 100 " f"candidates for a lookup", ) if search_scope and search_scope not in SEARCH_SCOPE_CHOICES: raise InvalidEnumValue("search_scope", search_scope, SEARCH_SCOPE_CHOICES) if contact_data_shape and contact_data_shape not in SHAPE_CHOICES: raise InvalidEnumValue("contact_data_shape", contact_data_shape, SHAPE_CHOICES) self.return_full_contact_data = return_full_contact_data return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=unresolved_entries, parent_folders=parent_folders, return_full_contact_data=return_full_contact_data, search_scope=search_scope, contact_data_shape=contact_data_shape, ) )
def get_payload(self,
unresolved_entries,
parent_folders,
return_full_contact_data,
search_scope,
contact_data_shape)-
Expand source code
def get_payload( self, unresolved_entries, parent_folders, return_full_contact_data, search_scope, contact_data_shape ): attrs = dict(ReturnFullContactData=return_full_contact_data) if search_scope: attrs["SearchScope"] = search_scope if contact_data_shape: if self.protocol.version.build < EXCHANGE_2010_SP2: raise NotImplementedError( "'contact_data_shape' is only supported for Exchange 2010 SP2 servers and later" ) attrs["ContactDataShape"] = contact_data_shape payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs) if parent_folders: payload.append( folder_ids_element(folders=parent_folders, version=self.protocol.version, tag="m:ParentFolderIds") ) for entry in unresolved_entries: add_xml_child(payload, "m:UnresolvedEntry", entry) return payload
Inherited members
class SendItem (*args, **kwargs)
-
Expand source code
class SendItem(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/senditem-operation""" SERVICE_NAME = "SendItem" returns_elements = False def call(self, items, saved_item_folder): if saved_item_folder and not isinstance(saved_item_folder, (BaseFolder, FolderId)): raise InvalidTypeError("saved_item_folder", saved_item_folder, (BaseFolder, FolderId)) return self._chunked_get_elements(self.get_payload, items=items, saved_item_folder=saved_item_folder) def get_payload(self, items, saved_item_folder): payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(SaveItemToFolder=bool(saved_item_folder))) payload.append(item_ids_element(items=items, version=self.account.version)) if saved_item_folder: payload.append( folder_ids_element(folders=[saved_item_folder], version=self.account.version, tag="m:SavedItemFolderId") ) return payload
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/senditem-operation
Ancestors
Class variables
var SERVICE_NAME
var returns_elements
Methods
def call(self, items, saved_item_folder)
-
Expand source code
def call(self, items, saved_item_folder): if saved_item_folder and not isinstance(saved_item_folder, (BaseFolder, FolderId)): raise InvalidTypeError("saved_item_folder", saved_item_folder, (BaseFolder, FolderId)) return self._chunked_get_elements(self.get_payload, items=items, saved_item_folder=saved_item_folder)
def get_payload(self, items, saved_item_folder)
-
Expand source code
def get_payload(self, items, saved_item_folder): payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(SaveItemToFolder=bool(saved_item_folder))) payload.append(item_ids_element(items=items, version=self.account.version)) if saved_item_folder: payload.append( folder_ids_element(folders=[saved_item_folder], version=self.account.version, tag="m:SavedItemFolderId") ) return payload
Inherited members
class SendNotification (protocol, chunk_size=None, timeout=None)
-
Expand source code
class SendNotification(EWSService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/sendnotification This service is implemented backwards compared to other services. We use it to parse the XML body of push notifications we receive on the callback URL defined in a push subscription, and to create responses to these push notifications. """ SERVICE_NAME = "SendNotification" OK = "OK" UNSUBSCRIBE = "Unsubscribe" STATUS_CHOICES = (OK, UNSUBSCRIBE) def ok_payload(self): return self.wrap(content=self.get_payload(status=self.OK)) def unsubscribe_payload(self): return self.wrap(content=self.get_payload(status=self.UNSUBSCRIBE)) def _elem_to_obj(self, elem): return Notification.from_xml(elem=elem, account=None) @classmethod def _response_tag(cls): """Return the name of the element containing the service response.""" return f"{{{MNS}}}{cls.SERVICE_NAME}" @classmethod def _get_elements_in_container(cls, container): return container.findall(Notification.response_tag()) def get_payload(self, status): if status not in self.STATUS_CHOICES: raise InvalidEnumValue("status", status, self.STATUS_CHOICES) payload = create_element(f"m:{self.SERVICE_NAME}Result") add_xml_child(payload, "m:SubscriptionStatus", status) return payload
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/sendnotification
This service is implemented backwards compared to other services. We use it to parse the XML body of push notifications we receive on the callback URL defined in a push subscription, and to create responses to these push notifications.
Ancestors
Class variables
var OK
var SERVICE_NAME
var STATUS_CHOICES
var UNSUBSCRIBE
Methods
def get_payload(self, status)
-
Expand source code
def get_payload(self, status): if status not in self.STATUS_CHOICES: raise InvalidEnumValue("status", status, self.STATUS_CHOICES) payload = create_element(f"m:{self.SERVICE_NAME}Result") add_xml_child(payload, "m:SubscriptionStatus", status) return payload
def ok_payload(self)
-
Expand source code
def ok_payload(self): return self.wrap(content=self.get_payload(status=self.OK))
def unsubscribe_payload(self)
-
Expand source code
def unsubscribe_payload(self): return self.wrap(content=self.get_payload(status=self.UNSUBSCRIBE))
Inherited members
class SetInboxRule (*args, **kwargs)
-
Expand source code
class SetInboxRule(UpdateInboxRules): """ MSDN: https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/updateinboxrules-operation#updateinboxrules-set-rule-request-example """ def _get_operation(self, rule): return Operations(set_rule_operation=SetRuleOperation(rule=rule))
Ancestors
Inherited members
class SetUserOofSettings (*args, **kwargs)
-
Expand source code
class SetUserOofSettings(EWSAccountService): """Set automatic replies for the specified mailbox. MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/setuseroofsettings-operation """ SERVICE_NAME = "SetUserOofSettings" returns_elements = False def call(self, oof_settings, mailbox): if not isinstance(oof_settings, OofSettings): raise InvalidTypeError("oof_settings", oof_settings, OofSettings) if not isinstance(mailbox, Mailbox): raise InvalidTypeError("mailbox", mailbox, Mailbox) return self._get_elements(payload=self.get_payload(oof_settings=oof_settings, mailbox=mailbox)) def get_payload(self, oof_settings, mailbox): payload = create_element(f"m:{self.SERVICE_NAME}Request") set_xml_value(payload, AvailabilityMailbox.from_mailbox(mailbox), version=self.account.version) return set_xml_value(payload, oof_settings, version=self.account.version) def _get_element_container(self, message, name=None): message = message.find(self._response_message_tag()) return super()._get_element_container(message=message, name=name) @classmethod def _response_message_tag(cls): return f"{{{MNS}}}ResponseMessage"
Set automatic replies for the specified mailbox. MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/setuseroofsettings-operation
Ancestors
Class variables
var SERVICE_NAME
var returns_elements
Methods
def call(self, oof_settings, mailbox)
-
Expand source code
def call(self, oof_settings, mailbox): if not isinstance(oof_settings, OofSettings): raise InvalidTypeError("oof_settings", oof_settings, OofSettings) if not isinstance(mailbox, Mailbox): raise InvalidTypeError("mailbox", mailbox, Mailbox) return self._get_elements(payload=self.get_payload(oof_settings=oof_settings, mailbox=mailbox))
def get_payload(self, oof_settings, mailbox)
-
Expand source code
def get_payload(self, oof_settings, mailbox): payload = create_element(f"m:{self.SERVICE_NAME}Request") set_xml_value(payload, AvailabilityMailbox.from_mailbox(mailbox), version=self.account.version) return set_xml_value(payload, oof_settings, version=self.account.version)
Inherited members
class SubscribeToPull (*args, **kwargs)
-
Expand source code
class SubscribeToPull(Subscribe): # https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/pullsubscriptionrequest subscription_request_elem_tag = "m:PullSubscriptionRequest" prefer_affinity = True def call(self, folders, event_types, watermark, timeout): yield from self._partial_call( payload_func=self.get_payload, folders=folders, event_types=event_types, timeout=timeout, watermark=watermark, ) def get_payload(self, folders, event_types, watermark, timeout): payload = create_element(f"m:{self.SERVICE_NAME}") request_elem = self._partial_payload(folders=folders, event_types=event_types) if watermark: add_xml_child(request_elem, "m:Watermark", watermark) add_xml_child(request_elem, "t:Timeout", timeout) # In minutes payload.append(request_elem) return payload
Base class for subscription classes.
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/subscribe-operation
Ancestors
Class variables
var prefer_affinity
var subscription_request_elem_tag
Methods
def call(self, folders, event_types, watermark, timeout)
-
Expand source code
def call(self, folders, event_types, watermark, timeout): yield from self._partial_call( payload_func=self.get_payload, folders=folders, event_types=event_types, timeout=timeout, watermark=watermark, )
def get_payload(self, folders, event_types, watermark, timeout)
-
Expand source code
def get_payload(self, folders, event_types, watermark, timeout): payload = create_element(f"m:{self.SERVICE_NAME}") request_elem = self._partial_payload(folders=folders, event_types=event_types) if watermark: add_xml_child(request_elem, "m:Watermark", watermark) add_xml_child(request_elem, "t:Timeout", timeout) # In minutes payload.append(request_elem) return payload
Inherited members
class SubscribeToPush (*args, **kwargs)
-
Expand source code
class SubscribeToPush(Subscribe): # https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/pushsubscriptionrequest subscription_request_elem_tag = "m:PushSubscriptionRequest" def call(self, folders, event_types, watermark, status_frequency, url): yield from self._partial_call( payload_func=self.get_payload, folders=folders, event_types=event_types, status_frequency=status_frequency, url=url, watermark=watermark, ) def get_payload(self, folders, event_types, watermark, status_frequency, url): payload = create_element(f"m:{self.SERVICE_NAME}") request_elem = self._partial_payload(folders=folders, event_types=event_types) if watermark: add_xml_child(request_elem, "m:Watermark", watermark) add_xml_child(request_elem, "t:StatusFrequency", status_frequency) # In minutes add_xml_child(request_elem, "t:URL", url) payload.append(request_elem) return payload
Base class for subscription classes.
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/subscribe-operation
Ancestors
Class variables
var subscription_request_elem_tag
Methods
def call(self, folders, event_types, watermark, status_frequency, url)
-
Expand source code
def call(self, folders, event_types, watermark, status_frequency, url): yield from self._partial_call( payload_func=self.get_payload, folders=folders, event_types=event_types, status_frequency=status_frequency, url=url, watermark=watermark, )
def get_payload(self, folders, event_types, watermark, status_frequency, url)
-
Expand source code
def get_payload(self, folders, event_types, watermark, status_frequency, url): payload = create_element(f"m:{self.SERVICE_NAME}") request_elem = self._partial_payload(folders=folders, event_types=event_types) if watermark: add_xml_child(request_elem, "m:Watermark", watermark) add_xml_child(request_elem, "t:StatusFrequency", status_frequency) # In minutes add_xml_child(request_elem, "t:URL", url) payload.append(request_elem) return payload
Inherited members
class SubscribeToStreaming (*args, **kwargs)
-
Expand source code
class SubscribeToStreaming(Subscribe): # https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/streamingsubscriptionrequest subscription_request_elem_tag = "m:StreamingSubscriptionRequest" prefer_affinity = True def call(self, folders, event_types): yield from self._partial_call(payload_func=self.get_payload, folders=folders, event_types=event_types) def _elem_to_obj(self, elem): return elem.text @classmethod def _get_elements_in_container(cls, container): return [container.find(f"{{{MNS}}}SubscriptionId")] def get_payload(self, folders, event_types): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append(self._partial_payload(folders=folders, event_types=event_types)) return payload
Base class for subscription classes.
MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/subscribe-operation
Ancestors
Class variables
var prefer_affinity
var subscription_request_elem_tag
Methods
def call(self, folders, event_types)
-
Expand source code
def call(self, folders, event_types): yield from self._partial_call(payload_func=self.get_payload, folders=folders, event_types=event_types)
def get_payload(self, folders, event_types)
-
Expand source code
def get_payload(self, folders, event_types): payload = create_element(f"m:{self.SERVICE_NAME}") payload.append(self._partial_payload(folders=folders, event_types=event_types)) return payload
Inherited members
class SyncFolderHierarchy (*args, **kwargs)
-
Expand source code
class SyncFolderHierarchy(SyncFolder): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/syncfolderhierarchy-operation """ SERVICE_NAME = "SyncFolderHierarchy" shape_tag = "m:FolderShape" last_in_range_name = f"{{{MNS}}}IncludesLastFolderInRange" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.folder = None # A hack to communicate parsing args to _elems_to_objs() def call(self, folder, shape, additional_fields, sync_state): self.sync_state = sync_state self.folder = folder return self._elems_to_objs( self._get_elements( payload=self.get_payload( folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state, ) ) ) def _elem_to_obj(self, elem): change_type = self.change_types_map[elem.tag] if change_type == self.DELETE: folder = FolderId.from_xml(elem=elem.find(FolderId.response_tag()), account=self.account) else: # We can't find() the element because we don't know which tag to look for. The change element can # contain multiple folder types, each with their own tag. folder_elem = elem[0] folder = parse_folder_elem(elem=folder_elem, folder=self.folder) return change_type, folder def get_payload(self, folder, shape, additional_fields, sync_state): return self._partial_get_payload( folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state )
Ancestors
Class variables
var SERVICE_NAME
var last_in_range_name
var shape_tag
Methods
def call(self, folder, shape, additional_fields, sync_state)
-
Expand source code
def call(self, folder, shape, additional_fields, sync_state): self.sync_state = sync_state self.folder = folder return self._elems_to_objs( self._get_elements( payload=self.get_payload( folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state, ) ) )
def get_payload(self, folder, shape, additional_fields, sync_state)
-
Expand source code
def get_payload(self, folder, shape, additional_fields, sync_state): return self._partial_get_payload( folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state )
Inherited members
class SyncFolderItems (*args, **kwargs)
-
Expand source code
class SyncFolderItems(SyncFolder): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/syncfolderitems-operation """ SERVICE_NAME = "SyncFolderItems" SYNC_SCOPES = ( "NormalItems", "NormalAndAssociatedItems", ) # Extra change type READ_FLAG_CHANGE = "read_flag_change" CHANGE_TYPES = SyncFolder.CHANGE_TYPES + (READ_FLAG_CHANGE,) shape_tag = "m:ItemShape" last_in_range_name = f"{{{MNS}}}IncludesLastItemInRange" change_types_map = SyncFolder.change_types_map change_types_map[f"{{{TNS}}}ReadFlagChange"] = READ_FLAG_CHANGE def call(self, folder, shape, additional_fields, sync_state, ignore, max_changes_returned, sync_scope): self.sync_state = sync_state if max_changes_returned is None: max_changes_returned = 100 if not isinstance(max_changes_returned, int): raise InvalidTypeError("max_changes_returned", max_changes_returned, int) if max_changes_returned <= 0: raise ValueError(f"'max_changes_returned' {max_changes_returned} must be a positive integer") if sync_scope is not None and sync_scope not in self.SYNC_SCOPES: raise InvalidEnumValue("sync_scope", sync_scope, self.SYNC_SCOPES) return self._elems_to_objs( self._get_elements( payload=self.get_payload( folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state, ignore=ignore, max_changes_returned=max_changes_returned, sync_scope=sync_scope, ) ) ) def _elem_to_obj(self, elem): change_type = self.change_types_map[elem.tag] if change_type == self.READ_FLAG_CHANGE: item = ( ItemId.from_xml(elem=elem.find(ItemId.response_tag()), account=self.account), xml_text_to_value(elem.find(f"{{{TNS}}}IsRead").text, bool), ) elif change_type == self.DELETE: item = ItemId.from_xml(elem=elem.find(ItemId.response_tag()), account=self.account) else: # We can't find() the element because we don't know which tag to look for. The change element can # contain multiple item types, each with their own tag. item_elem = elem[0] item = BaseFolder.item_model_from_tag(item_elem.tag).from_xml(elem=item_elem, account=self.account) return change_type, item def get_payload(self, folder, shape, additional_fields, sync_state, ignore, max_changes_returned, sync_scope): sync_folder_items = self._partial_get_payload( folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state ) is_empty, ignore = (True, None) if ignore is None else peek(ignore) if not is_empty: sync_folder_items.append(item_ids_element(items=ignore, version=self.account.version, tag="m:Ignore")) add_xml_child(sync_folder_items, "m:MaxChangesReturned", max_changes_returned) if sync_scope: add_xml_child(sync_folder_items, "m:SyncScope", sync_scope) return sync_folder_items
Ancestors
Class variables
var CHANGE_TYPES
var READ_FLAG_CHANGE
var SERVICE_NAME
var SYNC_SCOPES
var change_types_map
var last_in_range_name
var shape_tag
Methods
def call(self,
folder,
shape,
additional_fields,
sync_state,
ignore,
max_changes_returned,
sync_scope)-
Expand source code
def call(self, folder, shape, additional_fields, sync_state, ignore, max_changes_returned, sync_scope): self.sync_state = sync_state if max_changes_returned is None: max_changes_returned = 100 if not isinstance(max_changes_returned, int): raise InvalidTypeError("max_changes_returned", max_changes_returned, int) if max_changes_returned <= 0: raise ValueError(f"'max_changes_returned' {max_changes_returned} must be a positive integer") if sync_scope is not None and sync_scope not in self.SYNC_SCOPES: raise InvalidEnumValue("sync_scope", sync_scope, self.SYNC_SCOPES) return self._elems_to_objs( self._get_elements( payload=self.get_payload( folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state, ignore=ignore, max_changes_returned=max_changes_returned, sync_scope=sync_scope, ) ) )
def get_payload(self,
folder,
shape,
additional_fields,
sync_state,
ignore,
max_changes_returned,
sync_scope)-
Expand source code
def get_payload(self, folder, shape, additional_fields, sync_state, ignore, max_changes_returned, sync_scope): sync_folder_items = self._partial_get_payload( folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state ) is_empty, ignore = (True, None) if ignore is None else peek(ignore) if not is_empty: sync_folder_items.append(item_ids_element(items=ignore, version=self.account.version, tag="m:Ignore")) add_xml_child(sync_folder_items, "m:MaxChangesReturned", max_changes_returned) if sync_scope: add_xml_child(sync_folder_items, "m:SyncScope", sync_scope) return sync_folder_items
Inherited members
class Unsubscribe (*args, **kwargs)
-
Expand source code
class Unsubscribe(EWSAccountService): """Unsubscribing is only valid for pull and streaming notifications. MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/unsubscribe-operation """ SERVICE_NAME = "Unsubscribe" returns_elements = False prefer_affinity = True def call(self, subscription_id): return self._get_elements(payload=self.get_payload(subscription_id=subscription_id)) def get_payload(self, subscription_id): payload = create_element(f"m:{self.SERVICE_NAME}") add_xml_child(payload, "m:SubscriptionId", subscription_id) return payload
Unsubscribing is only valid for pull and streaming notifications.
Ancestors
Class variables
var SERVICE_NAME
var prefer_affinity
var returns_elements
Methods
def call(self, subscription_id)
-
Expand source code
def call(self, subscription_id): return self._get_elements(payload=self.get_payload(subscription_id=subscription_id))
def get_payload(self, subscription_id)
-
Expand source code
def get_payload(self, subscription_id): payload = create_element(f"m:{self.SERVICE_NAME}") add_xml_child(payload, "m:SubscriptionId", subscription_id) return payload
Inherited members
class UpdateFolder (*args, **kwargs)
-
Expand source code
class UpdateFolder(BaseUpdateService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/updatefolder-operation""" SERVICE_NAME = "UpdateFolder" SET_FIELD_ELEMENT_NAME = "t:SetFolderField" DELETE_FIELD_ELEMENT_NAME = "t:DeleteFolderField" CHANGE_ELEMENT_NAME = "t:FolderChange" CHANGES_ELEMENT_NAME = "m:FolderChanges" element_container_name = f"{{{MNS}}}Folders" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.folders = [] # A hack to communicate parsing args to _elems_to_objs() def call(self, folders): # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same # class as the folder instance it was requested with. self.folders = list(folders) # Convert to a list, in case 'folders' is a generator. We're iterating twice. return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=self.folders)) def _elems_to_objs(self, elems): for (folder, _), elem in zip(self.folders, elems): if isinstance(elem, Exception): yield elem continue yield parse_folder_elem(elem=elem, folder=folder) @staticmethod def _target_elem(target): return to_item_id(target, FolderId) def get_payload(self, folders): # Takes a list of (Folder, fieldnames) tuples where 'Folder' is an instance of a subclass of Folder and # 'fieldnames' are the attribute names that were updated. payload = create_element(f"m:{self.SERVICE_NAME}") payload.append(self._changes_elem(target_changes=folders)) return payload
Ancestors
Class variables
var CHANGES_ELEMENT_NAME
var CHANGE_ELEMENT_NAME
var DELETE_FIELD_ELEMENT_NAME
var SERVICE_NAME
var SET_FIELD_ELEMENT_NAME
var element_container_name
Methods
def call(self, folders)
-
Expand source code
def call(self, folders): # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same # class as the folder instance it was requested with. self.folders = list(folders) # Convert to a list, in case 'folders' is a generator. We're iterating twice. return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=self.folders))
def get_payload(self, folders)
-
Expand source code
def get_payload(self, folders): # Takes a list of (Folder, fieldnames) tuples where 'Folder' is an instance of a subclass of Folder and # 'fieldnames' are the attribute names that were updated. payload = create_element(f"m:{self.SERVICE_NAME}") payload.append(self._changes_elem(target_changes=folders)) return payload
Inherited members
class UpdateItem (*args, **kwargs)
-
Expand source code
class UpdateItem(BaseUpdateService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/updateitem-operation""" SERVICE_NAME = "UpdateItem" SET_FIELD_ELEMENT_NAME = "t:SetItemField" DELETE_FIELD_ELEMENT_NAME = "t:DeleteItemField" CHANGE_ELEMENT_NAME = "t:ItemChange" CHANGES_ELEMENT_NAME = "m:ItemChanges" element_container_name = f"{{{MNS}}}Items" def call( self, items, conflict_resolution, message_disposition, send_meeting_invitations_or_cancellations, suppress_read_receipts, ): if conflict_resolution not in CONFLICT_RESOLUTION_CHOICES: raise InvalidEnumValue("conflict_resolution", conflict_resolution, CONFLICT_RESOLUTION_CHOICES) if message_disposition not in MESSAGE_DISPOSITION_CHOICES: raise InvalidEnumValue("message_disposition", message_disposition, MESSAGE_DISPOSITION_CHOICES) if send_meeting_invitations_or_cancellations not in SEND_MEETING_INVITATIONS_AND_CANCELLATIONS_CHOICES: raise InvalidEnumValue( "send_meeting_invitations_or_cancellations", send_meeting_invitations_or_cancellations, SEND_MEETING_INVITATIONS_AND_CANCELLATIONS_CHOICES, ) if message_disposition == SEND_ONLY: raise ValueError("Cannot send-only existing objects. Use SendItem service instead") return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=items, conflict_resolution=conflict_resolution, message_disposition=message_disposition, send_meeting_invitations_or_cancellations=send_meeting_invitations_or_cancellations, suppress_read_receipts=suppress_read_receipts, ) ) def _elem_to_obj(self, elem): return Item.id_from_xml(elem) def _update_elems(self, target, fieldnames): fieldnames_copy = list(fieldnames) if target.__class__ == CalendarItem: # For CalendarItem items where we update 'start' or 'end', we want to update internal timezone fields target.clean_timezone_fields(version=self.account.version) # Possibly also sets timezone values for field_name in ("start", "end"): if field_name in fieldnames_copy: tz_field_name = target.tz_field_for_field_name(field_name).name if tz_field_name not in fieldnames_copy: fieldnames_copy.append(tz_field_name) yield from super()._update_elems(target=target, fieldnames=fieldnames_copy) def _get_value(self, target, field): value = super()._get_value(target, field) if target.__class__ == CalendarItem: # For CalendarItem items where we update 'start' or 'end', we want to send values in the local timezone if field.name in ("start", "end"): if type(value) is EWSDate: # EWS always expects a datetime return target.date_to_datetime(field_name=field.name) tz_field_name = target.tz_field_for_field_name(field.name).name return value.astimezone(getattr(target, tz_field_name)) return value @staticmethod def _target_elem(target): return to_item_id(target, ItemId) def get_payload( self, items, conflict_resolution, message_disposition, send_meeting_invitations_or_cancellations, suppress_read_receipts, ): # Takes a list of (Item, fieldnames) tuples where 'Item' is an instance of a subclass of Item and 'fieldnames' # are the attribute names that were updated. attrs = dict( ConflictResolution=conflict_resolution, MessageDisposition=message_disposition, SendMeetingInvitationsOrCancellations=send_meeting_invitations_or_cancellations, ) if self.account.version.build >= EXCHANGE_2013_SP1: attrs["SuppressReadReceipts"] = suppress_read_receipts payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs) payload.append(self._changes_elem(target_changes=items)) return payload
Ancestors
Class variables
var CHANGES_ELEMENT_NAME
var CHANGE_ELEMENT_NAME
var DELETE_FIELD_ELEMENT_NAME
var SERVICE_NAME
var SET_FIELD_ELEMENT_NAME
var element_container_name
Methods
def call(self,
items,
conflict_resolution,
message_disposition,
send_meeting_invitations_or_cancellations,
suppress_read_receipts)-
Expand source code
def call( self, items, conflict_resolution, message_disposition, send_meeting_invitations_or_cancellations, suppress_read_receipts, ): if conflict_resolution not in CONFLICT_RESOLUTION_CHOICES: raise InvalidEnumValue("conflict_resolution", conflict_resolution, CONFLICT_RESOLUTION_CHOICES) if message_disposition not in MESSAGE_DISPOSITION_CHOICES: raise InvalidEnumValue("message_disposition", message_disposition, MESSAGE_DISPOSITION_CHOICES) if send_meeting_invitations_or_cancellations not in SEND_MEETING_INVITATIONS_AND_CANCELLATIONS_CHOICES: raise InvalidEnumValue( "send_meeting_invitations_or_cancellations", send_meeting_invitations_or_cancellations, SEND_MEETING_INVITATIONS_AND_CANCELLATIONS_CHOICES, ) if message_disposition == SEND_ONLY: raise ValueError("Cannot send-only existing objects. Use SendItem service instead") return self._elems_to_objs( self._chunked_get_elements( self.get_payload, items=items, conflict_resolution=conflict_resolution, message_disposition=message_disposition, send_meeting_invitations_or_cancellations=send_meeting_invitations_or_cancellations, suppress_read_receipts=suppress_read_receipts, ) )
def get_payload(self,
items,
conflict_resolution,
message_disposition,
send_meeting_invitations_or_cancellations,
suppress_read_receipts)-
Expand source code
def get_payload( self, items, conflict_resolution, message_disposition, send_meeting_invitations_or_cancellations, suppress_read_receipts, ): # Takes a list of (Item, fieldnames) tuples where 'Item' is an instance of a subclass of Item and 'fieldnames' # are the attribute names that were updated. attrs = dict( ConflictResolution=conflict_resolution, MessageDisposition=message_disposition, SendMeetingInvitationsOrCancellations=send_meeting_invitations_or_cancellations, ) if self.account.version.build >= EXCHANGE_2013_SP1: attrs["SuppressReadReceipts"] = suppress_read_receipts payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs) payload.append(self._changes_elem(target_changes=items)) return payload
Inherited members
class UpdateUserConfiguration (*args, **kwargs)
-
Expand source code
class UpdateUserConfiguration(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/updateuserconfiguration-operation """ SERVICE_NAME = "UpdateUserConfiguration" returns_elements = False def call(self, user_configuration): return self._get_elements(payload=self.get_payload(user_configuration=user_configuration)) def get_payload(self, user_configuration): return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), user_configuration, version=self.account.version)
Ancestors
Class variables
var SERVICE_NAME
var returns_elements
Methods
def call(self, user_configuration)
-
Expand source code
def call(self, user_configuration): return self._get_elements(payload=self.get_payload(user_configuration=user_configuration))
def get_payload(self, user_configuration)
-
Expand source code
def get_payload(self, user_configuration): return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), user_configuration, version=self.account.version)
Inherited members
class UploadItems (*args, **kwargs)
-
Expand source code
class UploadItems(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/uploaditems-operation""" SERVICE_NAME = "UploadItems" element_container_name = f"{{{MNS}}}ItemId" def call(self, items): # _pool_requests expects 'items', not 'data' return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items)) def get_payload(self, items): """Upload given items to given account. 'items' is an iterable of tuples where the first element is a Folder instance representing the ParentFolder that the item will be placed in and the second element is a tuple containing an optional ItemId, an optional Item.is_associated boolean, and a Data string returned from an ExportItems. call. :param items: """ payload = create_element(f"m:{self.SERVICE_NAME}") items_elem = create_element("m:Items") payload.append(items_elem) for parent_folder, (item_id, is_associated, data_str) in items: # TODO: The full spec also allows the "UpdateOrCreate" create action. attrs = dict(CreateAction="Update" if item_id else "CreateNew") if is_associated is not None: attrs["IsAssociated"] = is_associated item = create_element("t:Item", attrs=attrs) set_xml_value(item, ParentFolderId(parent_folder.id, parent_folder.changekey), version=self.account.version) if item_id: set_xml_value(item, to_item_id(item_id, ItemId), version=self.account.version) add_xml_child(item, "t:Data", data_str) items_elem.append(item) return payload def _elem_to_obj(self, elem): return elem.get(ItemId.ID_ATTR), elem.get(ItemId.CHANGEKEY_ATTR) @classmethod def _get_elements_in_container(cls, container): return [container]
Ancestors
Class variables
var SERVICE_NAME
var element_container_name
Methods
def call(self, items)
-
Expand source code
def call(self, items): # _pool_requests expects 'items', not 'data' return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items))
def get_payload(self, items)
-
Expand source code
def get_payload(self, items): """Upload given items to given account. 'items' is an iterable of tuples where the first element is a Folder instance representing the ParentFolder that the item will be placed in and the second element is a tuple containing an optional ItemId, an optional Item.is_associated boolean, and a Data string returned from an ExportItems. call. :param items: """ payload = create_element(f"m:{self.SERVICE_NAME}") items_elem = create_element("m:Items") payload.append(items_elem) for parent_folder, (item_id, is_associated, data_str) in items: # TODO: The full spec also allows the "UpdateOrCreate" create action. attrs = dict(CreateAction="Update" if item_id else "CreateNew") if is_associated is not None: attrs["IsAssociated"] = is_associated item = create_element("t:Item", attrs=attrs) set_xml_value(item, ParentFolderId(parent_folder.id, parent_folder.changekey), version=self.account.version) if item_id: set_xml_value(item, to_item_id(item_id, ItemId), version=self.account.version) add_xml_child(item, "t:Data", data_str) items_elem.append(item) return payload
Upload given items to given account.
'items' is an iterable of tuples where the first element is a Folder instance representing the ParentFolder that the item will be placed in and the second element is a tuple containing an optional ItemId, an optional Item.is_associated boolean, and a Data string returned from an ExportItems. call.
:param items:
Inherited members