Module exchangelib.services

Implement a selection of EWS services (operations).

Exchange is very picky about things like the order of XML elements in SOAP requests, so we need to generate XML automatically instead of taking advantage of Python SOAP libraries and the WSDL file.

Exchange EWS operations overview: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/ews-operations-in-exchange

Expand source code
"""Implement a selection of EWS services (operations).

Exchange is very picky about things like the order of XML elements in SOAP requests, so we need to generate XML
automatically instead of taking advantage of Python SOAP libraries and the WSDL file.

Exchange EWS operations overview:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/ews-operations-in-exchange
"""

from .archive_item import ArchiveItem
from .common import EWSService
from .convert_id import ConvertId
from .copy_item import CopyItem
from .create_attachment import CreateAttachment
from .create_folder import CreateFolder
from .create_item import CreateItem
from .create_user_configuration import CreateUserConfiguration
from .delete_attachment import DeleteAttachment
from .delete_folder import DeleteFolder
from .delete_item import DeleteItem
from .delete_user_configuration import DeleteUserConfiguration
from .empty_folder import EmptyFolder
from .expand_dl import ExpandDL
from .export_items import ExportItems
from .find_folder import FindFolder
from .find_item import FindItem
from .find_people import FindPeople
from .get_attachment import GetAttachment
from .get_delegate import GetDelegate
from .get_events import GetEvents
from .get_folder import GetFolder
from .get_item import GetItem
from .get_mail_tips import GetMailTips
from .get_persona import GetPersona
from .get_room_lists import GetRoomLists
from .get_rooms import GetRooms
from .get_searchable_mailboxes import GetSearchableMailboxes
from .get_server_time_zones import GetServerTimeZones
from .get_streaming_events import GetStreamingEvents
from .get_user_availability import GetUserAvailability
from .get_user_configuration import GetUserConfiguration
from .get_user_oof_settings import GetUserOofSettings
from .get_user_settings import GetUserSettings
from .inbox_rules import CreateInboxRule, DeleteInboxRule, GetInboxRules, SetInboxRule
from .mark_as_junk import MarkAsJunk
from .move_folder import MoveFolder
from .move_item import MoveItem
from .resolve_names import ResolveNames
from .send_item import SendItem
from .send_notification import SendNotification
from .set_user_oof_settings import SetUserOofSettings
from .subscribe import SubscribeToPull, SubscribeToPush, SubscribeToStreaming
from .sync_folder_hierarchy import SyncFolderHierarchy
from .sync_folder_items import SyncFolderItems
from .unsubscribe import Unsubscribe
from .update_folder import UpdateFolder
from .update_item import UpdateItem
from .update_user_configuration import UpdateUserConfiguration
from .upload_items import UploadItems

__all__ = [
    "ArchiveItem",
    "ConvertId",
    "CopyItem",
    "CreateAttachment",
    "CreateFolder",
    "CreateItem",
    "CreateUserConfiguration",
    "DeleteAttachment",
    "DeleteFolder",
    "DeleteUserConfiguration",
    "DeleteItem",
    "EmptyFolder",
    "EWSService",
    "ExpandDL",
    "ExportItems",
    "FindFolder",
    "FindItem",
    "FindPeople",
    "GetAttachment",
    "GetDelegate",
    "GetEvents",
    "GetFolder",
    "GetItem",
    "GetMailTips",
    "GetPersona",
    "GetRoomLists",
    "GetRooms",
    "GetSearchableMailboxes",
    "GetServerTimeZones",
    "GetStreamingEvents",
    "GetUserAvailability",
    "GetUserConfiguration",
    "GetUserOofSettings",
    "GetUserSettings",
    "MarkAsJunk",
    "MoveFolder",
    "MoveItem",
    "ResolveNames",
    "SendItem",
    "SendNotification",
    "SetUserOofSettings",
    "SubscribeToPull",
    "SubscribeToPush",
    "SubscribeToStreaming",
    "SyncFolderHierarchy",
    "SyncFolderItems",
    "Unsubscribe",
    "UpdateFolder",
    "UpdateItem",
    "UpdateUserConfiguration",
    "UploadItems",
    "GetInboxRules",
    "CreateInboxRule",
    "SetInboxRule",
    "DeleteInboxRule",
]

Sub-modules

exchangelib.services.archive_item
exchangelib.services.common
exchangelib.services.convert_id
exchangelib.services.copy_item
exchangelib.services.create_attachment
exchangelib.services.create_folder
exchangelib.services.create_item
exchangelib.services.create_user_configuration
exchangelib.services.delete_attachment
exchangelib.services.delete_folder
exchangelib.services.delete_item
exchangelib.services.delete_user_configuration
exchangelib.services.empty_folder
exchangelib.services.expand_dl
exchangelib.services.export_items
exchangelib.services.find_folder
exchangelib.services.find_item
exchangelib.services.find_people
exchangelib.services.get_attachment
exchangelib.services.get_delegate
exchangelib.services.get_events
exchangelib.services.get_folder
exchangelib.services.get_item
exchangelib.services.get_mail_tips
exchangelib.services.get_persona
exchangelib.services.get_room_lists
exchangelib.services.get_rooms
exchangelib.services.get_searchable_mailboxes
exchangelib.services.get_server_time_zones
exchangelib.services.get_streaming_events
exchangelib.services.get_user_availability
exchangelib.services.get_user_configuration
exchangelib.services.get_user_oof_settings
exchangelib.services.get_user_settings
exchangelib.services.inbox_rules
exchangelib.services.mark_as_junk
exchangelib.services.move_folder
exchangelib.services.move_item
exchangelib.services.resolve_names
exchangelib.services.send_item
exchangelib.services.send_notification
exchangelib.services.set_user_oof_settings
exchangelib.services.subscribe

The 'Subscribe' service has three different modes - pull, push and streaming - with different signatures. Implement as three distinct classes.

exchangelib.services.sync_folder_hierarchy
exchangelib.services.sync_folder_items
exchangelib.services.unsubscribe
exchangelib.services.update_folder
exchangelib.services.update_item
exchangelib.services.update_user_configuration
exchangelib.services.upload_items

Classes

class ArchiveItem (*args, **kwargs)
Expand source code
class ArchiveItem(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/archiveitem-operation"""

    SERVICE_NAME = "ArchiveItem"
    element_container_name = f"{{{MNS}}}Items"
    supported_from = EXCHANGE_2013

    def call(self, items, to_folder):
        """Move a list of items to a specific folder in the archive mailbox.

        :param items: a list of (id, changekey) tuples or Item objects
        :param to_folder:

        :return: None
        """
        return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, to_folder=to_folder))

    def _elem_to_obj(self, elem):
        return Item.id_from_xml(elem)

    def get_payload(self, items, to_folder):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        payload.append(
            folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ArchiveSourceFolderId")
        )
        payload.append(item_ids_element(items=items, version=self.account.version))
        return payload

Ancestors

Class variables

var SERVICE_NAME
var element_container_name
var supported_from

Methods

def call(self, items, to_folder)

Move a list of items to a specific folder in the archive mailbox.

:param items: a list of (id, changekey) tuples or Item objects :param to_folder:

:return: None

Expand source code
def call(self, items, to_folder):
    """Move a list of items to a specific folder in the archive mailbox.

    :param items: a list of (id, changekey) tuples or Item objects
    :param to_folder:

    :return: None
    """
    return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, to_folder=to_folder))
def get_payload(self, items, to_folder)
Expand source code
def get_payload(self, items, to_folder):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    payload.append(
        folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ArchiveSourceFolderId")
    )
    payload.append(item_ids_element(items=items, version=self.account.version))
    return payload

Inherited members

class ConvertId (protocol, chunk_size=None, timeout=None)

Take a list of IDs to convert. Returns a list of converted IDs or exception instances, in the same order as the input list.

MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/convertid-operation

Expand source code
class ConvertId(EWSService):
    """Take a list of IDs to convert. Returns a list of converted IDs or exception instances, in the same order as the
    input list.

    MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/convertid-operation
    """

    SERVICE_NAME = "ConvertId"
    cls_map = {cls.response_tag(): cls for cls in (AlternateId, AlternatePublicFolderId, AlternatePublicFolderItemId)}

    def call(self, items, destination_format):
        if destination_format not in ID_FORMATS:
            raise InvalidEnumValue("destination_format", destination_format, ID_FORMATS)
        return self._elems_to_objs(
            self._chunked_get_elements(self.get_payload, items=items, destination_format=destination_format)
        )

    def _elem_to_obj(self, elem):
        return self.cls_map[elem.tag].from_xml(elem, account=None)

    def get_payload(self, items, destination_format):
        supported_item_classes = AlternateId, AlternatePublicFolderId, AlternatePublicFolderItemId
        payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(DestinationFormat=destination_format))
        item_ids = create_element("m:SourceIds")
        for item in items:
            if not isinstance(item, supported_item_classes):
                raise InvalidTypeError("item", item, supported_item_classes)
            set_xml_value(item_ids, item, version=None)
        payload.append(item_ids)
        return payload

    @classmethod
    def _get_elements_in_container(cls, container):
        # We may have other elements in here, e.g. 'ResponseCode'. Filter away those.
        return (
            container.findall(AlternateId.response_tag())
            + container.findall(AlternatePublicFolderId.response_tag())
            + container.findall(AlternatePublicFolderItemId.response_tag())
        )

Ancestors

Class variables

var SERVICE_NAME
var cls_map

Methods

def call(self, items, destination_format)
Expand source code
def call(self, items, destination_format):
    if destination_format not in ID_FORMATS:
        raise InvalidEnumValue("destination_format", destination_format, ID_FORMATS)
    return self._elems_to_objs(
        self._chunked_get_elements(self.get_payload, items=items, destination_format=destination_format)
    )
def get_payload(self, items, destination_format)
Expand source code
def get_payload(self, items, destination_format):
    supported_item_classes = AlternateId, AlternatePublicFolderId, AlternatePublicFolderItemId
    payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(DestinationFormat=destination_format))
    item_ids = create_element("m:SourceIds")
    for item in items:
        if not isinstance(item, supported_item_classes):
            raise InvalidTypeError("item", item, supported_item_classes)
        set_xml_value(item_ids, item, version=None)
    payload.append(item_ids)
    return payload

Inherited members

class CopyItem (*args, **kwargs)
Expand source code
class CopyItem(move_item.MoveItem):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/copyitem-operation"""

    SERVICE_NAME = "CopyItem"

Ancestors

Class variables

var SERVICE_NAME

Inherited members

class CreateAttachment (*args, **kwargs)
Expand source code
class CreateAttachment(EWSAccountService):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createattachment-operation
    """

    SERVICE_NAME = "CreateAttachment"
    element_container_name = f"{{{MNS}}}Attachments"
    cls_map = {cls.response_tag(): cls for cls in (FileAttachment, ItemAttachment)}

    def call(self, parent_item, items):
        return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, parent_item=parent_item))

    def _elem_to_obj(self, elem):
        return self.cls_map[elem.tag].from_xml(elem=elem, account=self.account)

    def get_payload(self, items, parent_item):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        version = self.account.version
        if isinstance(parent_item, BaseItem):
            # to_item_id() would convert this to a normal ItemId, but the service wants a ParentItemId
            parent_item = ParentItemId(parent_item.id, parent_item.changekey)
        set_xml_value(payload, to_item_id(parent_item, ParentItemId), version=self.account.version)
        attachments = create_element("m:Attachments")
        for item in items:
            set_xml_value(attachments, item, version=version)
        payload.append(attachments)
        return payload

Ancestors

Class variables

var SERVICE_NAME
var cls_map
var element_container_name

Methods

def call(self, parent_item, items)
Expand source code
def call(self, parent_item, items):
    return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, parent_item=parent_item))
def get_payload(self, items, parent_item)
Expand source code
def get_payload(self, items, parent_item):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    version = self.account.version
    if isinstance(parent_item, BaseItem):
        # to_item_id() would convert this to a normal ItemId, but the service wants a ParentItemId
        parent_item = ParentItemId(parent_item.id, parent_item.changekey)
    set_xml_value(payload, to_item_id(parent_item, ParentItemId), version=self.account.version)
    attachments = create_element("m:Attachments")
    for item in items:
        set_xml_value(attachments, item, version=version)
    payload.append(attachments)
    return payload

Inherited members

class CreateFolder (*args, **kwargs)
Expand source code
class CreateFolder(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createfolder-operation"""

    SERVICE_NAME = "CreateFolder"
    element_container_name = f"{{{MNS}}}Folders"
    ERRORS_TO_CATCH_IN_RESPONSE = EWSAccountService.ERRORS_TO_CATCH_IN_RESPONSE + (ErrorFolderExists,)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.folders = []  # A hack to communicate parsing args to _elems_to_objs()

    def call(self, parent_folder, folders):
        # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same
        # class as the folder instance it was requested with.
        self.folders = list(folders)  # Convert to a list, in case 'folders' is a generator. We're iterating twice.
        return self._elems_to_objs(
            self._chunked_get_elements(
                self.get_payload,
                items=self.folders,
                parent_folder=parent_folder,
            )
        )

    def _elems_to_objs(self, elems):
        for folder, elem in zip(self.folders, elems):
            if isinstance(elem, Exception):
                yield elem
                continue
            yield parse_folder_elem(elem=elem, folder=folder, account=self.account)

    def get_payload(self, folders, parent_folder):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        payload.append(
            folder_ids_element(folders=[parent_folder], version=self.account.version, tag="m:ParentFolderId")
        )
        folder_elems = create_element("m:Folders")
        for folder in folders:
            set_xml_value(folder_elems, folder, version=self.account.version)
        payload.append(folder_elems)
        return payload

Ancestors

Class variables

var ERRORS_TO_CATCH_IN_RESPONSE
var SERVICE_NAME
var element_container_name

Methods

def call(self, parent_folder, folders)
Expand source code
def call(self, parent_folder, folders):
    # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same
    # class as the folder instance it was requested with.
    self.folders = list(folders)  # Convert to a list, in case 'folders' is a generator. We're iterating twice.
    return self._elems_to_objs(
        self._chunked_get_elements(
            self.get_payload,
            items=self.folders,
            parent_folder=parent_folder,
        )
    )
def get_payload(self, folders, parent_folder)
Expand source code
def get_payload(self, folders, parent_folder):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    payload.append(
        folder_ids_element(folders=[parent_folder], version=self.account.version, tag="m:ParentFolderId")
    )
    folder_elems = create_element("m:Folders")
    for folder in folders:
        set_xml_value(folder_elems, folder, version=self.account.version)
    payload.append(folder_elems)
    return payload

Inherited members

class CreateInboxRule (*args, **kwargs)
Expand source code
class CreateInboxRule(UpdateInboxRules):
    """
    MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/updateinboxrules-operation#updateinboxrules-create-rule-request-example
    """

    def _get_operation(self, rule):
        return Operations(create_rule_operation=CreateRuleOperation(rule=rule))

Ancestors

Inherited members

class CreateItem (*args, **kwargs)

Take a folder and a list of items. Return the result of creation as a list of tuples (success[True|False], errormessage), in the same order as the input list.

MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-operation

Expand source code
class CreateItem(EWSAccountService):
    """Take a folder and a list of items. Return the result of creation as a list of tuples (success[True|False],
    errormessage), in the same order as the input list.

    MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-operation
    """

    SERVICE_NAME = "CreateItem"
    element_container_name = f"{{{MNS}}}Items"

    def call(self, items, folder, message_disposition, send_meeting_invitations):
        if message_disposition not in MESSAGE_DISPOSITION_CHOICES:
            raise InvalidEnumValue("message_disposition", message_disposition, MESSAGE_DISPOSITION_CHOICES)
        if send_meeting_invitations not in SEND_MEETING_INVITATIONS_CHOICES:
            raise InvalidEnumValue(
                "send_meeting_invitations", send_meeting_invitations, SEND_MEETING_INVITATIONS_CHOICES
            )
        if folder is not None:
            if not isinstance(folder, (BaseFolder, FolderId)):
                raise InvalidTypeError("folder", folder, (BaseFolder, FolderId))
            if folder.account != self.account:
                raise ValueError("Folder must belong to account")
        if message_disposition == SAVE_ONLY and folder is None:
            raise AttributeError("Folder must be supplied when in save-only mode")
        if message_disposition == SEND_AND_SAVE_COPY and folder is None:
            folder = self.account.sent  # 'Sent' is default EWS behaviour
        if message_disposition == SEND_ONLY and folder is not None:
            raise AttributeError("Folder must be None in send-ony mode")
        return self._elems_to_objs(
            self._chunked_get_elements(
                self.get_payload,
                items=items,
                folder=folder,
                message_disposition=message_disposition,
                send_meeting_invitations=send_meeting_invitations,
            )
        )

    def _elem_to_obj(self, elem):
        if isinstance(elem, bool):
            return elem
        return BulkCreateResult.from_xml(elem=elem, account=self.account)

    @classmethod
    def _get_elements_in_container(cls, container):
        res = super()._get_elements_in_container(container)
        return res or [True]

    def get_payload(self, items, folder, message_disposition, send_meeting_invitations):
        """Take a list of Item objects (CalendarItem, Message etc.) and return the XML for a CreateItem request.
        convert items to XML Elements.

        MessageDisposition is only applicable to email messages, where it is required.

        SendMeetingInvitations is required for calendar items. It is also applicable to tasks, meeting request
        responses (see
        https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-operation-meeting-request
        ) and sharing
        invitation accepts (see
        https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-acceptsharinginvitation
        ). The last two are not supported yet.

        :param items:
        :param folder:
        :param message_disposition:
        :param send_meeting_invitations:
        """
        payload = create_element(
            f"m:{self.SERVICE_NAME}",
            attrs=dict(MessageDisposition=message_disposition, SendMeetingInvitations=send_meeting_invitations),
        )
        if folder:
            payload.append(
                folder_ids_element(folders=[folder], version=self.account.version, tag="m:SavedItemFolderId")
            )
        item_elems = create_element("m:Items")
        for item in items:
            if not item.account:
                item.account = self.account
            set_xml_value(item_elems, item, version=self.account.version)
        payload.append(item_elems)
        return payload

Ancestors

Class variables

var SERVICE_NAME
var element_container_name

Methods

def call(self, items, folder, message_disposition, send_meeting_invitations)
Expand source code
def call(self, items, folder, message_disposition, send_meeting_invitations):
    if message_disposition not in MESSAGE_DISPOSITION_CHOICES:
        raise InvalidEnumValue("message_disposition", message_disposition, MESSAGE_DISPOSITION_CHOICES)
    if send_meeting_invitations not in SEND_MEETING_INVITATIONS_CHOICES:
        raise InvalidEnumValue(
            "send_meeting_invitations", send_meeting_invitations, SEND_MEETING_INVITATIONS_CHOICES
        )
    if folder is not None:
        if not isinstance(folder, (BaseFolder, FolderId)):
            raise InvalidTypeError("folder", folder, (BaseFolder, FolderId))
        if folder.account != self.account:
            raise ValueError("Folder must belong to account")
    if message_disposition == SAVE_ONLY and folder is None:
        raise AttributeError("Folder must be supplied when in save-only mode")
    if message_disposition == SEND_AND_SAVE_COPY and folder is None:
        folder = self.account.sent  # 'Sent' is default EWS behaviour
    if message_disposition == SEND_ONLY and folder is not None:
        raise AttributeError("Folder must be None in send-ony mode")
    return self._elems_to_objs(
        self._chunked_get_elements(
            self.get_payload,
            items=items,
            folder=folder,
            message_disposition=message_disposition,
            send_meeting_invitations=send_meeting_invitations,
        )
    )
def get_payload(self, items, folder, message_disposition, send_meeting_invitations)

Take a list of Item objects (CalendarItem, Message etc.) and return the XML for a CreateItem request. convert items to XML Elements.

MessageDisposition is only applicable to email messages, where it is required.

SendMeetingInvitations is required for calendar items. It is also applicable to tasks, meeting request responses (see https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-operation-meeting-request ) and sharing invitation accepts (see https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-acceptsharinginvitation ). The last two are not supported yet.

:param items: :param folder: :param message_disposition: :param send_meeting_invitations:

Expand source code
def get_payload(self, items, folder, message_disposition, send_meeting_invitations):
    """Take a list of Item objects (CalendarItem, Message etc.) and return the XML for a CreateItem request.
    convert items to XML Elements.

    MessageDisposition is only applicable to email messages, where it is required.

    SendMeetingInvitations is required for calendar items. It is also applicable to tasks, meeting request
    responses (see
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-operation-meeting-request
    ) and sharing
    invitation accepts (see
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createitem-acceptsharinginvitation
    ). The last two are not supported yet.

    :param items:
    :param folder:
    :param message_disposition:
    :param send_meeting_invitations:
    """
    payload = create_element(
        f"m:{self.SERVICE_NAME}",
        attrs=dict(MessageDisposition=message_disposition, SendMeetingInvitations=send_meeting_invitations),
    )
    if folder:
        payload.append(
            folder_ids_element(folders=[folder], version=self.account.version, tag="m:SavedItemFolderId")
        )
    item_elems = create_element("m:Items")
    for item in items:
        if not item.account:
            item.account = self.account
        set_xml_value(item_elems, item, version=self.account.version)
    payload.append(item_elems)
    return payload

Inherited members

class CreateUserConfiguration (*args, **kwargs)
Expand source code
class CreateUserConfiguration(EWSAccountService):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/createuserconfiguration-operation
    """

    SERVICE_NAME = "CreateUserConfiguration"
    returns_elements = False

    def call(self, user_configuration):
        return self._get_elements(payload=self.get_payload(user_configuration=user_configuration))

    def get_payload(self, user_configuration):
        return set_xml_value(
            create_element(f"m:{self.SERVICE_NAME}"), user_configuration, version=self.protocol.version
        )

Ancestors

Class variables

var SERVICE_NAME
var returns_elements

Methods

def call(self, user_configuration)
Expand source code
def call(self, user_configuration):
    return self._get_elements(payload=self.get_payload(user_configuration=user_configuration))
def get_payload(self, user_configuration)
Expand source code
def get_payload(self, user_configuration):
    return set_xml_value(
        create_element(f"m:{self.SERVICE_NAME}"), user_configuration, version=self.protocol.version
    )

Inherited members

class DeleteAttachment (*args, **kwargs)
Expand source code
class DeleteAttachment(EWSAccountService):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/deleteattachment-operation
    """

    SERVICE_NAME = "DeleteAttachment"

    def call(self, items):
        return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items))

    def _elem_to_obj(self, elem):
        return RootItemId.from_xml(elem=elem, account=self.account)

    @classmethod
    def _get_elements_in_container(cls, container):
        return container.findall(RootItemId.response_tag())

    def get_payload(self, items):
        return set_xml_value(
            create_element(f"m:{self.SERVICE_NAME}"),
            attachment_ids_element(items=items, version=self.account.version),
            version=self.account.version,
        )

Ancestors

Class variables

var SERVICE_NAME

Methods

def call(self, items)
Expand source code
def call(self, items):
    return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items))
def get_payload(self, items)
Expand source code
def get_payload(self, items):
    return set_xml_value(
        create_element(f"m:{self.SERVICE_NAME}"),
        attachment_ids_element(items=items, version=self.account.version),
        version=self.account.version,
    )

Inherited members

class DeleteFolder (*args, **kwargs)
Expand source code
class DeleteFolder(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/deletefolder-operation"""

    SERVICE_NAME = "DeleteFolder"
    returns_elements = False

    def call(self, folders, delete_type):
        if delete_type not in DELETE_TYPE_CHOICES:
            raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES)
        return self._chunked_get_elements(self.get_payload, items=folders, delete_type=delete_type)

    def get_payload(self, folders, delete_type):
        payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(DeleteType=delete_type))
        payload.append(folder_ids_element(folders=folders, version=self.account.version))
        return payload

Ancestors

Class variables

var SERVICE_NAME
var returns_elements

Methods

def call(self, folders, delete_type)
Expand source code
def call(self, folders, delete_type):
    if delete_type not in DELETE_TYPE_CHOICES:
        raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES)
    return self._chunked_get_elements(self.get_payload, items=folders, delete_type=delete_type)
def get_payload(self, folders, delete_type)
Expand source code
def get_payload(self, folders, delete_type):
    payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(DeleteType=delete_type))
    payload.append(folder_ids_element(folders=folders, version=self.account.version))
    return payload

Inherited members

class DeleteInboxRule (*args, **kwargs)
Expand source code
class DeleteInboxRule(UpdateInboxRules):
    """
    MSDN:
    https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/updateinboxrules-operation#updateinboxrules-delete-rule-request-example
    """

    def _get_operation(self, rule):
        return Operations(delete_rule_operation=DeleteRuleOperation(id=rule.id))

Ancestors

Inherited members

class DeleteItem (*args, **kwargs)

Take a folder and a list of (id, changekey) tuples. Return result of deletion as a list of tuples (success[True|False], errormessage), in the same order as the input list.

MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/deleteitem-operation

Expand source code
class DeleteItem(EWSAccountService):
    """Take a folder and a list of (id, changekey) tuples. Return result of deletion as a list of tuples
    (success[True|False], errormessage), in the same order as the input list.

    MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/deleteitem-operation
    """

    SERVICE_NAME = "DeleteItem"
    returns_elements = False

    def call(self, items, delete_type, send_meeting_cancellations, affected_task_occurrences, suppress_read_receipts):
        if delete_type not in DELETE_TYPE_CHOICES:
            raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES)
        if send_meeting_cancellations not in SEND_MEETING_CANCELLATIONS_CHOICES:
            raise InvalidEnumValue(
                "send_meeting_cancellations", send_meeting_cancellations, SEND_MEETING_CANCELLATIONS_CHOICES
            )
        if affected_task_occurrences not in AFFECTED_TASK_OCCURRENCES_CHOICES:
            raise InvalidEnumValue(
                "affected_task_occurrences", affected_task_occurrences, AFFECTED_TASK_OCCURRENCES_CHOICES
            )
        return self._chunked_get_elements(
            self.get_payload,
            items=items,
            delete_type=delete_type,
            send_meeting_cancellations=send_meeting_cancellations,
            affected_task_occurrences=affected_task_occurrences,
            suppress_read_receipts=suppress_read_receipts,
        )

    def get_payload(
        self, items, delete_type, send_meeting_cancellations, affected_task_occurrences, suppress_read_receipts
    ):
        # Takes a list of (id, changekey) tuples or Item objects and returns the XML for a DeleteItem request.
        attrs = dict(
            DeleteType=delete_type,
            SendMeetingCancellations=send_meeting_cancellations,
            AffectedTaskOccurrences=affected_task_occurrences,
        )
        if self.account.version.build >= EXCHANGE_2013_SP1:
            attrs["SuppressReadReceipts"] = suppress_read_receipts
        payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs)
        payload.append(item_ids_element(items=items, version=self.account.version))
        return payload

Ancestors

Class variables

var SERVICE_NAME
var returns_elements

Methods

def call(self, items, delete_type, send_meeting_cancellations, affected_task_occurrences, suppress_read_receipts)
Expand source code
def call(self, items, delete_type, send_meeting_cancellations, affected_task_occurrences, suppress_read_receipts):
    if delete_type not in DELETE_TYPE_CHOICES:
        raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES)
    if send_meeting_cancellations not in SEND_MEETING_CANCELLATIONS_CHOICES:
        raise InvalidEnumValue(
            "send_meeting_cancellations", send_meeting_cancellations, SEND_MEETING_CANCELLATIONS_CHOICES
        )
    if affected_task_occurrences not in AFFECTED_TASK_OCCURRENCES_CHOICES:
        raise InvalidEnumValue(
            "affected_task_occurrences", affected_task_occurrences, AFFECTED_TASK_OCCURRENCES_CHOICES
        )
    return self._chunked_get_elements(
        self.get_payload,
        items=items,
        delete_type=delete_type,
        send_meeting_cancellations=send_meeting_cancellations,
        affected_task_occurrences=affected_task_occurrences,
        suppress_read_receipts=suppress_read_receipts,
    )
def get_payload(self, items, delete_type, send_meeting_cancellations, affected_task_occurrences, suppress_read_receipts)
Expand source code
def get_payload(
    self, items, delete_type, send_meeting_cancellations, affected_task_occurrences, suppress_read_receipts
):
    # Takes a list of (id, changekey) tuples or Item objects and returns the XML for a DeleteItem request.
    attrs = dict(
        DeleteType=delete_type,
        SendMeetingCancellations=send_meeting_cancellations,
        AffectedTaskOccurrences=affected_task_occurrences,
    )
    if self.account.version.build >= EXCHANGE_2013_SP1:
        attrs["SuppressReadReceipts"] = suppress_read_receipts
    payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs)
    payload.append(item_ids_element(items=items, version=self.account.version))
    return payload

Inherited members

class DeleteUserConfiguration (*args, **kwargs)
Expand source code
class DeleteUserConfiguration(EWSAccountService):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/deleteuserconfiguration-operation
    """

    SERVICE_NAME = "DeleteUserConfiguration"
    returns_elements = False

    def call(self, user_configuration_name):
        return self._get_elements(payload=self.get_payload(user_configuration_name=user_configuration_name))

    def get_payload(self, user_configuration_name):
        return set_xml_value(
            create_element(f"m:{self.SERVICE_NAME}"), user_configuration_name, version=self.account.version
        )

Ancestors

Class variables

var SERVICE_NAME
var returns_elements

Methods

def call(self, user_configuration_name)
Expand source code
def call(self, user_configuration_name):
    return self._get_elements(payload=self.get_payload(user_configuration_name=user_configuration_name))
def get_payload(self, user_configuration_name)
Expand source code
def get_payload(self, user_configuration_name):
    return set_xml_value(
        create_element(f"m:{self.SERVICE_NAME}"), user_configuration_name, version=self.account.version
    )

Inherited members

class EWSService (protocol, chunk_size=None, timeout=None)

Base class for all EWS services.

Expand source code
class EWSService(SupportedVersionClassMixIn, metaclass=abc.ABCMeta):
    """Base class for all EWS services."""

    CHUNK_SIZE = 100  # A default chunk size for all services. This is the number of items we send in a single request

    SERVICE_NAME = None  # The name of the SOAP service
    element_container_name = None  # The name of the XML element wrapping the collection of returned items
    returns_elements = True  # If False, the service does not return response elements, just the ResponseCode status
    # Return exception instance instead of raising exceptions for the following errors when contained in an element
    ERRORS_TO_CATCH_IN_RESPONSE = (
        EWSWarning,
        ErrorCannotDeleteObject,
        ErrorInvalidChangeKey,
        ErrorItemNotFound,
        ErrorItemSave,
        ErrorInvalidIdMalformed,
        ErrorMessageSizeExceeded,
        ErrorCannotDeleteTaskOccurrence,
        ErrorMimeContentConversionFailed,
        ErrorRecurrenceHasNoOccurrence,
        ErrorCorruptData,
        ErrorItemCorrupt,
        ErrorMailRecipientNotFound,
    )
    # Similarly, define the warnings we want to return un-raised
    WARNINGS_TO_CATCH_IN_RESPONSE = ErrorBatchProcessingStopped
    # Define the warnings we want to ignore, to let response processing proceed
    WARNINGS_TO_IGNORE_IN_RESPONSE = ()
    # The exception type to raise when all attempted API versions failed
    NO_VALID_SERVER_VERSIONS = ErrorInvalidServerVersion

    NS_MAP = {k: v for k, v in ns_translation.items() if k in ("s", "m", "t")}

    def __init__(self, protocol, chunk_size=None, timeout=None):
        self.chunk_size = chunk_size or self.CHUNK_SIZE
        if not isinstance(self.chunk_size, int):
            raise InvalidTypeError("chunk_size", chunk_size, int)
        if self.chunk_size < 1:
            raise ValueError(f"'chunk_size' {self.chunk_size} must be a positive number")
        if self.supported_from and protocol.version.build < self.supported_from:
            raise NotImplementedError(
                f"Service {self.SERVICE_NAME!r} only supports server versions from {self.supported_from or '*'} to "
                f"{self.deprecated_from or '*'} (server has {protocol.version})"
            )
        self.protocol = protocol
        # Allow a service to override the default protocol timeout. Useful for streaming services
        self.timeout = timeout
        # Controls whether the HTTP request should be streaming or fetch everything at once
        self.streaming = False
        # Streaming connection variables
        self._streaming_session = None
        self._streaming_response = None

    def __del__(self):
        # pylint: disable=bare-except
        try:
            if self.streaming:
                # Make sure to clean up lingering resources
                self.stop_streaming()
        except Exception:  # nosec
            # __del__ should never fail
            pass

    # The following two methods are the minimum required to be implemented by subclasses, but the name and number of
    # kwargs differs between services. Therefore, we cannot make these methods abstract.

    # @abc.abstractmethod
    # def call(self, **kwargs):
    #     """Defines the arguments required by the service. Arguments are basic Python types or EWSElement objects.
    #     Returns either XML objects or EWSElement objects.
    #     """"
    #     pass

    # @abc.abstractmethod
    # def get_payload(self, **kwargs):
    #     """Using the arguments from .call(), return the payload expected by the service, as an XML object. The XML
    #     object should consist of a SERVICE_NAME element and everything within that.
    #     """
    #     pass

    def get(self, expect_result=True, **kwargs):
        """Like .call(), but expects exactly one result from the server, or zero when expect_result=False, or either
        zero or one when expect_result=None. Returns either one object or None.

        :param expect_result: None, True, or False
        :param kwargs: Same as arguments for .call()
        :return: Same as .call(), but returns either None or exactly one item
        """
        res = list(self.call(**kwargs))
        # Raise any errors
        for r in res:
            if isinstance(r, Exception):
                raise r
        if expect_result is None and not res:
            # Allow empty result
            return None
        if expect_result is False:
            if res:
                raise ValueError(f"Expected result length 0, but got {res}")
            return None
        if len(res) != 1:
            raise ValueError(f"Expected result length 1, but got {res}")
        return res[0]

    def parse(self, xml):
        """Used mostly for testing, when we want to parse static XML data."""
        resp = DummyResponse(content=xml, streaming=self.streaming)
        _, body = self._get_soap_parts(response=resp)
        return self._elems_to_objs(self._get_elements_in_response(response=self._get_soap_messages(body=body)))

    def wrap(self, content, api_version=None):
        """Generate the necessary boilerplate XML for a raw SOAP request. The XML is specific to the server version.
        ExchangeImpersonation allows to act as the user we want to impersonate.

        RequestServerVersion element on MSDN:
        https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/requestserverversion

        ExchangeImpersonation element on MSDN:
        https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/exchangeimpersonation

        TimeZoneContent element on MSDN:
        https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/timezonecontext

        :param content:
        :param api_version:
        """
        envelope = create_element("s:Envelope", nsmap=self.NS_MAP)
        header = create_element("s:Header")
        if api_version:
            request_server_version = create_element("t:RequestServerVersion", attrs=dict(Version=api_version))
            header.append(request_server_version)
        identity = self._account_to_impersonate
        if identity:
            add_xml_child(header, "t:ExchangeImpersonation", identity)
        timezone = self._timezone
        if timezone:
            timezone_context = create_element("t:TimeZoneContext")
            timezone_definition = create_element("t:TimeZoneDefinition", attrs=dict(Id=timezone.ms_id))
            timezone_context.append(timezone_definition)
            header.append(timezone_context)
        if len(header):
            envelope.append(header)
        body = create_element("s:Body")
        body.append(content)
        envelope.append(body)
        return xml_to_str(envelope, encoding=DEFAULT_ENCODING, xml_declaration=True)

    def _elems_to_objs(self, elems):
        """Takes a generator of XML elements and exceptions. Returns the equivalent Python objects (or exceptions)."""
        for elem in elems:
            # Allow None here. Some services don't return an ID if the target folder is outside the mailbox.
            if isinstance(elem, (Exception, type(None))):
                yield elem
                continue
            yield self._elem_to_obj(elem)

    def _elem_to_obj(self, elem):
        if not self.returns_elements:
            raise RuntimeError("Incorrect call to method when 'returns_elements' is False")
        raise NotImplementedError()

    @property
    def _version_hint(self):
        # We may be here due to version guessing in Protocol.version, so we can't use the self.protocol.version property
        return self.protocol.config.version

    @_version_hint.setter
    def _version_hint(self, value):
        self.protocol.config.version = value

    def _extra_headers(self):
        headers = {}
        identity = self._account_to_impersonate
        if identity and identity.primary_smtp_address:
            # See
            # https://blogs.msdn.microsoft.com/webdav_101/2015/05/11/best-practices-ews-authentication-and-access-issues/
            headers["X-AnchorMailbox"] = identity.primary_smtp_address
        return headers

    @property
    def _account_to_impersonate(self):
        if self.protocol and isinstance(self.protocol.credentials, BaseOAuth2Credentials):
            return self.protocol.credentials.identity
        return None

    @property
    def _timezone(self):
        return None

    def _response_generator(self, payload):
        """Send the payload to the server, and return the response.

        :param payload: payload as an XML object
        :return: the response, as XML objects
        """
        response = self._get_response_xml(payload=payload)
        return self._get_elements_in_response(response=response)

    def _chunked_get_elements(self, payload_func, items, **kwargs):
        """Yield elements in a response. Like ._get_elements(), but chop items into suitable chunks and send multiple
        requests.

        :param payload_func: A reference to .payload()
        :param items: An iterable of items (messages, folders, etc.) to process
        :param kwargs: Same as arguments for .call(), except for the 'items' argument
        :return: Same as ._get_elements()
        """
        # If the input for a service is a QuerySet, it can be difficult to remove exceptions before now
        filtered_items = filter(lambda item: not isinstance(item, Exception), items)
        for i, chunk in enumerate(chunkify(filtered_items, self.chunk_size), start=1):
            log.debug("Processing chunk %s containing %s items", i, len(chunk))
            yield from self._get_elements(payload=payload_func(chunk, **kwargs))

    def stop_streaming(self):
        if not self.streaming:
            raise RuntimeError("Attempt to stop a non-streaming service")
        if self._streaming_response:
            self._streaming_response.close()  # Release memory
            self._streaming_response = None
        if self._streaming_session:
            self.protocol.release_session(self._streaming_session)
            self._streaming_session = None

    def _get_elements(self, payload):
        """Send the payload to be sent and parsed. Handles and re-raise exceptions that are not meant to be returned
        to the caller as exception objects. Retry the request according to the retry policy.
        """
        wait = self.protocol.RETRY_WAIT
        while True:
            try:
                # Create a generator over the response elements so exceptions in response elements are also raised
                # here and can be handled.
                yield from self._response_generator(payload=payload)
                # TODO: Restore session pool size on succeeding request?
                return
            except TokenExpiredError:
                # Retry immediately
                continue
            except ErrorServerBusy as e:
                if not e.back_off:
                    e.back_off = wait
                self._handle_backoff(e)
            except ErrorExceededConnectionCount as e:
                # ErrorExceededConnectionCount indicates that the connecting user has too many open TCP connections to
                # the server. Decrease our session pool size and retry immediately.
                try:
                    self.protocol.decrease_poolsize()
                    continue
                except SessionPoolMinSizeReached:
                    # We're already as low as we can go. Let the user handle this.
                    raise e
            except ErrorTimeoutExpired as e:
                # ErrorTimeoutExpired can be caused by a busy server, or by an overly large request. If it's the latter,
                # we don't want to continue hammering the server with this request indefinitely. Instead, lower the
                # connection count, if possible, and retry the request.
                if self.protocol.session_pool_size <= 1:
                    # We're already as low as we can go. We can no longer use the session count to put less load
                    # on the server. If this is a chunked request we could lower the chunk size, but we don't have a
                    # way of doing that from this part of the code yet. Let the user handle this.
                    raise e
                self._handle_backoff(ErrorServerBusy(f"Reraised from {e.__class__.__name__}({e})", back_off=wait))
            except (ErrorTooManyObjectsOpened, ErrorInternalServerTransientError) as e:
                # ErrorTooManyObjectsOpened means there are too many connections to the Exchange database. This is very
                # often a symptom of sending too many requests.
                self._handle_backoff(ErrorServerBusy(f"Reraised from {e.__class__.__name__}({e})", back_off=wait))
            finally:
                wait *= 2  # Increase delay for every retry
                if self.streaming:
                    self.stop_streaming()

    def _handle_response_cookies(self, session):
        pass

    def _get_response(self, payload, api_version):
        """Send the actual HTTP request and get the response."""
        if self.streaming:
            # Make sure to clean up lingering resources
            self.stop_streaming()
        session = self.protocol.get_session()
        r, session = post_ratelimited(
            protocol=self.protocol,
            session=session,
            url=self.protocol.service_endpoint,
            headers=self._extra_headers(),
            data=self.wrap(
                content=payload,
                api_version=api_version,
            ),
            stream=self.streaming,
            timeout=self.timeout or self.protocol.TIMEOUT,
        )
        self._handle_response_cookies(session)
        if self.streaming:
            # We con only release the session when we have fully consumed the response. Save session and response
            # objects for later.
            self._streaming_session, self._streaming_response = session, r
        else:
            self.protocol.release_session(session)
        return r

    @classmethod
    def supported_api_versions(cls):
        """Return API versions supported by the service, sorted from newest to oldest"""
        return sorted({v.api_version for v in Version.all_versions() if cls.supports_version(v)}, reverse=True)

    def _api_versions_to_try(self):
        # Put the hint first in the list, and then all other versions except the hint, from newest to oldest
        return (self._version_hint.api_version,) + tuple(
            v for v in self.supported_api_versions() if v != self._version_hint.api_version
        )

    def _get_response_xml(self, payload, **parse_opts):
        """Send the payload to the server and return relevant elements from the result. Several things happen here:
          * The payload is wrapped in SOAP headers and sent to the server
          * The Exchange API version is negotiated and stored in the protocol object
          * Connection errors are handled and possibly reraised as ErrorServerBusy
          * SOAP errors are raised
          * EWS errors are raised, or passed on to the caller

        :param payload: The request payload, as an XML object
        :return: A generator of XML objects or None if the service does not return a result
        """
        # Microsoft really doesn't want to make our lives easy. The server may report one version in our initial version
        # guessing tango, but then the server may decide that any arbitrary legacy backend server may actually process
        # the request for an account. Prepare to handle version-related errors and set the server version per-account.
        log.debug("Calling service %s", self.SERVICE_NAME)
        for api_version in self._api_versions_to_try():
            log.debug("Trying API version %s", api_version)
            r = self._get_response(payload=payload, api_version=api_version)
            if self.streaming:
                # Let 'requests' decode raw data automatically
                r.raw.decode_content = True
            try:
                header, body = self._get_soap_parts(response=r, **parse_opts)
            except Exception:
                r.close()  # Release memory
                raise
            # The body may contain error messages from Exchange, but we still want to collect version info
            if header is not None:
                self._update_api_version(api_version=api_version, header=header, **parse_opts)
            try:
                return self._get_soap_messages(body=body, **parse_opts)
            except (
                ErrorInvalidServerVersion,
                ErrorIncorrectSchemaVersion,
                ErrorInvalidRequest,
                ErrorInvalidSchemaVersionForMailboxVersion,
            ):
                # The guessed server version is wrong. Try the next version
                log.debug("API version %s was invalid", api_version)
                continue
            finally:
                if not self.streaming:
                    # In streaming mode, we may not have accessed the raw stream yet. Caller must handle this.
                    r.close()  # Release memory

        raise self.NO_VALID_SERVER_VERSIONS(f"Tried versions {self._api_versions_to_try()} but all were invalid")

    def _handle_backoff(self, e):
        """Take a request from the server to back off and checks the retry policy for what to do. Re-raise the
        exception if conditions are not met.

        :param e: An ErrorServerBusy instance
        :return:
        """
        log.debug("Got ErrorServerBusy (back off %s seconds)", e.back_off)
        # ErrorServerBusy is very often a symptom of sending too many requests. Scale back connections if possible.
        with suppress(SessionPoolMinSizeReached):
            self.protocol.decrease_poolsize()
        if self.protocol.retry_policy.fail_fast:
            raise e
        self.protocol.retry_policy.back_off(e.back_off)
        # We'll warn about this later if we actually need to sleep

    def _update_api_version(self, api_version, header, **parse_opts):
        """Parse the server version contained in SOAP headers and update the version hint stored by the caller, if
        necessary.
        """
        try:
            head_version = Version.from_soap_header(requested_api_version=api_version, header=header)
        except TransportError as te:
            log.debug("Failed to update version info (%s)", te)
            return
        if self._version_hint == head_version:
            # Nothing to do
            return
        log.debug("Found new version (%s -> %s)", self._version_hint, head_version)
        # The api_version that worked was different from our hint, or we never got a build version. Store the working
        # version.
        self._version_hint = head_version

    @classmethod
    def _response_tag(cls):
        """Return the name of the element containing the service response."""
        return f"{{{MNS}}}{cls.SERVICE_NAME}Response"

    @staticmethod
    def _response_messages_tag():
        """Return the name of the element containing service response messages."""
        return f"{{{MNS}}}ResponseMessages"

    @classmethod
    def _response_message_tag(cls):
        """Return the name of the element of a single response message."""
        return f"{{{MNS}}}{cls.SERVICE_NAME}ResponseMessage"

    @classmethod
    def _get_soap_parts(cls, response, **parse_opts):
        """Split the SOAP response into its headers and body elements."""
        try:
            root = to_xml(response.iter_content())
        except ParseError as e:
            raise SOAPError(f"Bad SOAP response: {e}")
        header = root.find(f"{{{SOAPNS}}}Header")
        if header is None:
            # This is normal when the response contains SOAP-level errors
            log.debug("No header in XML response")
        body = root.find(f"{{{SOAPNS}}}Body")
        if body is None:
            raise MalformedResponseError("No Body element in SOAP response")
        return header, body

    def _get_soap_messages(self, body, **parse_opts):
        """Return the elements in the response containing the response messages. Raises any SOAP exceptions."""
        response = body.find(self._response_tag())
        if response is None:
            fault = body.find(f"{{{SOAPNS}}}Fault")
            if fault is None:
                raise SOAPError(f"Unknown SOAP response (expected {self._response_tag()} or Fault): {xml_to_str(body)}")
            self._raise_soap_errors(fault=fault)  # Will throw SOAPError or custom EWS error
        response_messages = response.find(self._response_messages_tag())
        if response_messages is None:
            # Result isn't delivered in a list of FooResponseMessages, but directly in the FooResponse. Consumers expect
            # a list, so return a list
            return [response]
        return response_messages.findall(self._response_message_tag())

    @classmethod
    def _raise_soap_errors(cls, fault):
        """Parse error messages contained in SOAP headers and raise as exceptions defined in this package."""
        # Fault: See http://www.w3.org/TR/2000/NOTE-SOAP-20000508/#_Toc478383507
        fault_code = get_xml_attr(fault, "faultcode")
        fault_string = get_xml_attr(fault, "faultstring")
        fault_actor = get_xml_attr(fault, "faultactor")
        detail = fault.find("detail")
        if detail is not None:
            code = get_xml_attr(detail, f"{{{ENS}}}ResponseCode")
            if code:
                code = code.strip()
            msg = get_xml_attr(detail, f"{{{ENS}}}Message")
            if msg:
                msg = msg.strip()
            msg_xml = detail.find(f"{{{TNS}}}MessageXml")  # Crazy. Here, it's in the TNS namespace
            if code == "ErrorServerBusy":
                back_off = None
                with suppress(TypeError, AttributeError):
                    value = msg_xml.find(f"{{{TNS}}}Value")
                    if value.get("Name") == "BackOffMilliseconds":
                        back_off = int(value.text) / 1000.0  # Convert to seconds
                raise ErrorServerBusy(msg, back_off=back_off)
            if code == "ErrorSchemaValidation" and msg_xml is not None:
                line_number = get_xml_attr(msg_xml, f"{{{TNS}}}LineNumber")
                line_position = get_xml_attr(msg_xml, f"{{{TNS}}}LinePosition")
                violation = get_xml_attr(msg_xml, f"{{{TNS}}}Violation")
                if violation:
                    msg = f"{msg} {violation}"
                if line_number or line_position:
                    msg = f"{msg} (line: {line_number} position: {line_position})"
            try:
                raise vars(errors)[code](msg)
            except KeyError:
                detail = f"{cls.SERVICE_NAME}: code: {code} msg: {msg} ({xml_to_str(detail)})"
        with suppress(KeyError):
            raise vars(errors)[fault_code](fault_string)
        raise SOAPError(f"SOAP error code: {fault_code} string: {fault_string} actor: {fault_actor} detail: {detail}")

    def _get_element_container(self, message, name=None):
        """Return the XML element in a response element that contains the elements we want the service to return. For
        example, in a GetFolder response, 'message' is the GetFolderResponseMessage element, and we return the 'Folders'
        element:

        <m:GetFolderResponseMessage ResponseClass="Success">
          <m:ResponseCode>NoError</m:ResponseCode>
          <m:Folders>
            <t:Folder>
              <t:FolderId Id="AQApA=" ChangeKey="AQAAAB" />
              [...]
            </t:Folder>
          </m:Folders>
        </m:GetFolderResponseMessage>

        Some service responses don't have a containing element for the returned elements ('name' is None). In
        that case, we return the 'SomeServiceResponseMessage' element.

        If the response contains a warning or an error message, we raise the relevant exception, unless the error class
        is contained in WARNINGS_TO_CATCH_IN_RESPONSE or ERRORS_TO_CATCH_IN_RESPONSE, in which case we return the
        exception instance.
        """
        # ResponseClass is an XML attribute of various SomeServiceResponseMessage elements: Possible values are:
        # Success, Warning, Error. See e.g.
        # https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/finditemresponsemessage
        response_class = message.get("ResponseClass")
        # ResponseCode, MessageText: See
        # https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/responsecode
        response_code = get_xml_attr(message, f"{{{MNS}}}ResponseCode")
        if response_class == "Success" and response_code == "NoError":
            if not name:
                return message
            container = message.find(name)
            if container is None:
                raise MalformedResponseError(f"No {name} elements in ResponseMessage ({xml_to_str(message)})")
            return container
        if response_code == "NoError":
            return True
        # Raise any non-acceptable errors in the container, or return the container or the acceptable exception instance
        msg_text = get_xml_attr(message, f"{{{MNS}}}MessageText")
        msg_xml = message.find(f"{{{MNS}}}MessageXml")
        if response_class == "Warning":
            try:
                raise self._get_exception(code=response_code, text=msg_text, msg_xml=msg_xml)
            except self.WARNINGS_TO_CATCH_IN_RESPONSE as e:
                return e
            except self.WARNINGS_TO_IGNORE_IN_RESPONSE as e:
                log.warning(str(e))
                container = message.find(name)
                if container is None:
                    raise MalformedResponseError(f"No {name} elements in ResponseMessage ({xml_to_str(message)})")
                return container
        # response_class == 'Error', or 'Success' and not 'NoError'
        try:
            raise self._get_exception(code=response_code, text=msg_text, msg_xml=msg_xml)
        except self.ERRORS_TO_CATCH_IN_RESPONSE as e:
            return e

    @staticmethod
    def _get_exception(code, text, msg_xml):
        """Parse error messages contained in EWS responses and raise as exceptions defined in this package."""
        if not code:
            return TransportError(f"Empty ResponseCode in ResponseMessage (MessageText: {text}, MessageXml: {msg_xml})")
        if msg_xml is not None:
            # If this is an ErrorInvalidPropertyRequest error, the xml may contain a specific FieldURI
            for elem_cls in (FieldURI, IndexedFieldURI, ExtendedFieldURI, ExceptionFieldURI):
                elem = msg_xml.find(elem_cls.response_tag())
                if elem is not None:
                    field_uri = elem_cls.from_xml(elem, account=None)
                    text += f" (field: {field_uri})"
                    break

            # If this is an ErrorInvalidValueForProperty error, the xml may contain the name and value of the property
            if code == "ErrorInvalidValueForProperty":
                msg_parts = {}
                for elem in msg_xml.findall(f"{{{TNS}}}Value"):
                    key, val = elem.get("Name"), elem.text
                    if key:
                        msg_parts[key] = val
                if msg_parts:
                    text += f" ({', '.join(f'{k}: {v}' for k, v in msg_parts.items())})"

            # If this is an ErrorInternalServerError error, the xml may contain a more specific error code
            inner_code, inner_text = None, None
            for value_elem in msg_xml.findall(f"{{{TNS}}}Value"):
                name = value_elem.get("Name")
                if name == "InnerErrorResponseCode":
                    inner_code = value_elem.text
                elif name == "InnerErrorMessageText":
                    inner_text = value_elem.text
            if inner_code:
                try:
                    # Raise the error as the inner error code
                    return vars(errors)[inner_code](f"{inner_text} (raised from: {code}({text!r}))")
                except KeyError:
                    # Inner code is unknown to us. Just append to the original text
                    text += f" (inner error: {inner_code}({inner_text!r}))"
        try:
            # Raise the error corresponding to the ResponseCode
            return vars(errors)[code](text)
        except KeyError:
            # Should not happen
            return TransportError(
                f"Unknown ResponseCode in ResponseMessage: {code} (MessageText: {text}, MessageXml: {msg_xml})"
            )

    def _get_elements_in_response(self, response):
        """Take a list of 'SomeServiceResponseMessage' elements and return the elements in each response message that
        we want the service to return. With e.g. 'CreateItem', we get a list of 'CreateItemResponseMessage' elements
        and return the 'Message' elements.

        <m:CreateItemResponseMessage ResponseClass="Success">
          <m:ResponseCode>NoError</m:ResponseCode>
          <m:Items>
            <t:Message>
              <t:ItemId Id="AQApA=" ChangeKey="AQAAAB"/>
            </t:Message>
          </m:Items>
        </m:CreateItemResponseMessage>
        <m:CreateItemResponseMessage ResponseClass="Success">
          <m:ResponseCode>NoError</m:ResponseCode>
          <m:Items>
            <t:Message>
              <t:ItemId Id="AQApB=" ChangeKey="AQAAAC"/>
            </t:Message>
          </m:Items>
        </m:CreateItemResponseMessage>

        :param response: a list of 'SomeServiceResponseMessage' XML objects
        :return: a generator of items as returned by '_get_elements_in_container()
        """
        for msg in response:
            container_or_exc = self._get_element_container(message=msg, name=self.element_container_name)
            if isinstance(container_or_exc, (bool, Exception)):
                yield container_or_exc
            else:
                for c in self._get_elements_in_container(container=container_or_exc):
                    yield c

    @classmethod
    def _get_elements_in_container(cls, container):
        """Return a list of response elements from an XML response element container. With e.g.
        'CreateItem', 'Items' is the container element, and we return the 'Message' child elements:

          <m:Items>
            <t:Message>
              <t:ItemId Id="AQApA=" ChangeKey="AQAAAB"/>
            </t:Message>
          </m:Items>

        If the service does not return response elements, return True to indicate the status. Errors have already been
        raised.
        """
        if cls.returns_elements:
            return list(container)
        return [True]

Ancestors

Subclasses

Class variables

var CHUNK_SIZE
var ERRORS_TO_CATCH_IN_RESPONSE
var NO_VALID_SERVER_VERSIONS

Global error type within this module.

var NS_MAP
var SERVICE_NAME
var WARNINGS_TO_CATCH_IN_RESPONSE

Global error type within this module.

var WARNINGS_TO_IGNORE_IN_RESPONSE
var element_container_name
var returns_elements

Static methods

def supported_api_versions()

Return API versions supported by the service, sorted from newest to oldest

Expand source code
@classmethod
def supported_api_versions(cls):
    """Return API versions supported by the service, sorted from newest to oldest"""
    return sorted({v.api_version for v in Version.all_versions() if cls.supports_version(v)}, reverse=True)

Methods

def get(self, expect_result=True, **kwargs)

Like .call(), but expects exactly one result from the server, or zero when expect_result=False, or either zero or one when expect_result=None. Returns either one object or None.

:param expect_result: None, True, or False :param kwargs: Same as arguments for .call() :return: Same as .call(), but returns either None or exactly one item

Expand source code
def get(self, expect_result=True, **kwargs):
    """Like .call(), but expects exactly one result from the server, or zero when expect_result=False, or either
    zero or one when expect_result=None. Returns either one object or None.

    :param expect_result: None, True, or False
    :param kwargs: Same as arguments for .call()
    :return: Same as .call(), but returns either None or exactly one item
    """
    res = list(self.call(**kwargs))
    # Raise any errors
    for r in res:
        if isinstance(r, Exception):
            raise r
    if expect_result is None and not res:
        # Allow empty result
        return None
    if expect_result is False:
        if res:
            raise ValueError(f"Expected result length 0, but got {res}")
        return None
    if len(res) != 1:
        raise ValueError(f"Expected result length 1, but got {res}")
    return res[0]
def parse(self, xml)

Used mostly for testing, when we want to parse static XML data.

Expand source code
def parse(self, xml):
    """Used mostly for testing, when we want to parse static XML data."""
    resp = DummyResponse(content=xml, streaming=self.streaming)
    _, body = self._get_soap_parts(response=resp)
    return self._elems_to_objs(self._get_elements_in_response(response=self._get_soap_messages(body=body)))
def stop_streaming(self)
Expand source code
def stop_streaming(self):
    if not self.streaming:
        raise RuntimeError("Attempt to stop a non-streaming service")
    if self._streaming_response:
        self._streaming_response.close()  # Release memory
        self._streaming_response = None
    if self._streaming_session:
        self.protocol.release_session(self._streaming_session)
        self._streaming_session = None
def wrap(self, content, api_version=None)

Generate the necessary boilerplate XML for a raw SOAP request. The XML is specific to the server version. ExchangeImpersonation allows to act as the user we want to impersonate.

RequestServerVersion element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/requestserverversion

ExchangeImpersonation element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/exchangeimpersonation

TimeZoneContent element on MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/timezonecontext

:param content: :param api_version:

Expand source code
def wrap(self, content, api_version=None):
    """Generate the necessary boilerplate XML for a raw SOAP request. The XML is specific to the server version.
    ExchangeImpersonation allows to act as the user we want to impersonate.

    RequestServerVersion element on MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/requestserverversion

    ExchangeImpersonation element on MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/exchangeimpersonation

    TimeZoneContent element on MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/timezonecontext

    :param content:
    :param api_version:
    """
    envelope = create_element("s:Envelope", nsmap=self.NS_MAP)
    header = create_element("s:Header")
    if api_version:
        request_server_version = create_element("t:RequestServerVersion", attrs=dict(Version=api_version))
        header.append(request_server_version)
    identity = self._account_to_impersonate
    if identity:
        add_xml_child(header, "t:ExchangeImpersonation", identity)
    timezone = self._timezone
    if timezone:
        timezone_context = create_element("t:TimeZoneContext")
        timezone_definition = create_element("t:TimeZoneDefinition", attrs=dict(Id=timezone.ms_id))
        timezone_context.append(timezone_definition)
        header.append(timezone_context)
    if len(header):
        envelope.append(header)
    body = create_element("s:Body")
    body.append(content)
    envelope.append(body)
    return xml_to_str(envelope, encoding=DEFAULT_ENCODING, xml_declaration=True)
class EmptyFolder (*args, **kwargs)
Expand source code
class EmptyFolder(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/emptyfolder-operation"""

    SERVICE_NAME = "EmptyFolder"
    returns_elements = False

    def call(self, folders, delete_type, delete_sub_folders):
        if delete_type not in DELETE_TYPE_CHOICES:
            raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES)
        return self._chunked_get_elements(
            self.get_payload, items=folders, delete_type=delete_type, delete_sub_folders=delete_sub_folders
        )

    def get_payload(self, folders, delete_type, delete_sub_folders):
        payload = create_element(
            f"m:{self.SERVICE_NAME}", attrs=dict(DeleteType=delete_type, DeleteSubFolders=delete_sub_folders)
        )
        payload.append(folder_ids_element(folders=folders, version=self.account.version))
        return payload

Ancestors

Class variables

var SERVICE_NAME
var returns_elements

Methods

def call(self, folders, delete_type, delete_sub_folders)
Expand source code
def call(self, folders, delete_type, delete_sub_folders):
    if delete_type not in DELETE_TYPE_CHOICES:
        raise InvalidEnumValue("delete_type", delete_type, DELETE_TYPE_CHOICES)
    return self._chunked_get_elements(
        self.get_payload, items=folders, delete_type=delete_type, delete_sub_folders=delete_sub_folders
    )
def get_payload(self, folders, delete_type, delete_sub_folders)
Expand source code
def get_payload(self, folders, delete_type, delete_sub_folders):
    payload = create_element(
        f"m:{self.SERVICE_NAME}", attrs=dict(DeleteType=delete_type, DeleteSubFolders=delete_sub_folders)
    )
    payload.append(folder_ids_element(folders=folders, version=self.account.version))
    return payload

Inherited members

class ExpandDL (protocol, chunk_size=None, timeout=None)
Expand source code
class ExpandDL(EWSService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/expanddl-operation"""

    SERVICE_NAME = "ExpandDL"
    element_container_name = f"{{{MNS}}}DLExpansion"
    WARNINGS_TO_IGNORE_IN_RESPONSE = ErrorNameResolutionMultipleResults

    def call(self, distribution_list):
        return self._elems_to_objs(self._get_elements(payload=self.get_payload(distribution_list=distribution_list)))

    def _elem_to_obj(self, elem):
        return Mailbox.from_xml(elem, account=None)

    def get_payload(self, distribution_list):
        return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), distribution_list, version=self.protocol.version)

Ancestors

Class variables

var SERVICE_NAME
var WARNINGS_TO_IGNORE_IN_RESPONSE

Global error type within this module.

var element_container_name

Methods

def call(self, distribution_list)
Expand source code
def call(self, distribution_list):
    return self._elems_to_objs(self._get_elements(payload=self.get_payload(distribution_list=distribution_list)))
def get_payload(self, distribution_list)
Expand source code
def get_payload(self, distribution_list):
    return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), distribution_list, version=self.protocol.version)

Inherited members

class ExportItems (*args, **kwargs)
Expand source code
class ExportItems(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/exportitems-operation"""

    ERRORS_TO_CATCH_IN_RESPONSE = ResponseMessageError
    SERVICE_NAME = "ExportItems"
    element_container_name = f"{{{MNS}}}Data"

    def call(self, items):
        return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items))

    def _elem_to_obj(self, elem):
        return elem.text  # All we want is the 64bit string in the 'Data' tag

    def get_payload(self, items):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        payload.append(item_ids_element(items=items, version=self.account.version))
        return payload

    # We need to override this since ExportItemsResponseMessage is formatted a bit differently.
    @classmethod
    def _get_elements_in_container(cls, container):
        return [container]

Ancestors

Class variables

var ERRORS_TO_CATCH_IN_RESPONSE

Global error type within this module.

var SERVICE_NAME
var element_container_name

Methods

def call(self, items)
Expand source code
def call(self, items):
    return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items))
def get_payload(self, items)
Expand source code
def get_payload(self, items):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    payload.append(item_ids_element(items=items, version=self.account.version))
    return payload

Inherited members

class FindFolder (*args, **kwargs)
Expand source code
class FindFolder(EWSPagingService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/findfolder-operation"""

    SERVICE_NAME = "FindFolder"
    element_container_name = f"{{{TNS}}}Folders"
    paging_container_name = f"{{{MNS}}}RootFolder"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.root = None  # A hack to communicate parsing args to _elems_to_objs()

    def call(self, folders, additional_fields, restriction, shape, depth, max_items, offset):
        """Find sub-folders of a folder.

        :param folders: the folders to act on
        :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects
        :param restriction: Restriction object that defines the filters for the query
        :param shape: The set of attributes to return
        :param depth: How deep in the folder structure to search for folders
        :param max_items: The maximum number of items to return
        :param offset: the offset relative to the first item in the item collection. Usually 0.

        :return: XML elements for the matching folders
        """
        if shape not in SHAPE_CHOICES:
            raise InvalidEnumValue("shape", shape, SHAPE_CHOICES)
        if depth not in FOLDER_TRAVERSAL_CHOICES:
            raise InvalidEnumValue("depth", depth, FOLDER_TRAVERSAL_CHOICES)
        roots = {f.root for f in folders}
        if len(roots) != 1:
            raise ValueError(f"All folders must have the same root hierarchy ({roots})")
        self.root = roots.pop()
        return self._elems_to_objs(
            self._paged_call(
                payload_func=self.get_payload,
                max_items=max_items,
                folders=folders,
                **dict(
                    additional_fields=additional_fields,
                    restriction=restriction,
                    shape=shape,
                    depth=depth,
                    page_size=self.page_size,
                    offset=offset,
                ),
            )
        )

    def _elem_to_obj(self, elem):
        return Folder.from_xml_with_root(elem=elem, root=self.root)

    def get_payload(self, folders, additional_fields, restriction, shape, depth, page_size, offset=0):
        payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth))
        payload.append(
            shape_element(
                tag="m:FolderShape", shape=shape, additional_fields=additional_fields, version=self.account.version
            )
        )
        if self.account.version.build >= EXCHANGE_2010:
            indexed_page_folder_view = create_element(
                "m:IndexedPageFolderView",
                attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning"),
            )
            payload.append(indexed_page_folder_view)
        else:
            if offset != 0:
                raise NotImplementedError("'offset' is only supported for Exchange 2010 servers and later")
        if restriction:
            payload.append(restriction.to_xml(version=self.account.version))
        payload.append(folder_ids_element(folders=folders, version=self.protocol.version, tag="m:ParentFolderIds"))
        return payload

Ancestors

Class variables

var SERVICE_NAME
var element_container_name
var paging_container_name

Methods

def call(self, folders, additional_fields, restriction, shape, depth, max_items, offset)

Find sub-folders of a folder.

:param folders: the folders to act on :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects :param restriction: Restriction object that defines the filters for the query :param shape: The set of attributes to return :param depth: How deep in the folder structure to search for folders :param max_items: The maximum number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0.

:return: XML elements for the matching folders

Expand source code
def call(self, folders, additional_fields, restriction, shape, depth, max_items, offset):
    """Find sub-folders of a folder.

    :param folders: the folders to act on
    :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects
    :param restriction: Restriction object that defines the filters for the query
    :param shape: The set of attributes to return
    :param depth: How deep in the folder structure to search for folders
    :param max_items: The maximum number of items to return
    :param offset: the offset relative to the first item in the item collection. Usually 0.

    :return: XML elements for the matching folders
    """
    if shape not in SHAPE_CHOICES:
        raise InvalidEnumValue("shape", shape, SHAPE_CHOICES)
    if depth not in FOLDER_TRAVERSAL_CHOICES:
        raise InvalidEnumValue("depth", depth, FOLDER_TRAVERSAL_CHOICES)
    roots = {f.root for f in folders}
    if len(roots) != 1:
        raise ValueError(f"All folders must have the same root hierarchy ({roots})")
    self.root = roots.pop()
    return self._elems_to_objs(
        self._paged_call(
            payload_func=self.get_payload,
            max_items=max_items,
            folders=folders,
            **dict(
                additional_fields=additional_fields,
                restriction=restriction,
                shape=shape,
                depth=depth,
                page_size=self.page_size,
                offset=offset,
            ),
        )
    )
def get_payload(self, folders, additional_fields, restriction, shape, depth, page_size, offset=0)
Expand source code
def get_payload(self, folders, additional_fields, restriction, shape, depth, page_size, offset=0):
    payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth))
    payload.append(
        shape_element(
            tag="m:FolderShape", shape=shape, additional_fields=additional_fields, version=self.account.version
        )
    )
    if self.account.version.build >= EXCHANGE_2010:
        indexed_page_folder_view = create_element(
            "m:IndexedPageFolderView",
            attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning"),
        )
        payload.append(indexed_page_folder_view)
    else:
        if offset != 0:
            raise NotImplementedError("'offset' is only supported for Exchange 2010 servers and later")
    if restriction:
        payload.append(restriction.to_xml(version=self.account.version))
    payload.append(folder_ids_element(folders=folders, version=self.protocol.version, tag="m:ParentFolderIds"))
    return payload

Inherited members

class FindItem (*args, **kwargs)
Expand source code
class FindItem(EWSPagingService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/finditem-operation"""

    SERVICE_NAME = "FindItem"
    element_container_name = f"{{{TNS}}}Items"
    paging_container_name = f"{{{MNS}}}RootFolder"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # A hack to communicate parsing args to _elems_to_objs()
        self.additional_fields = None
        self.shape = None

    def call(
        self,
        folders,
        additional_fields,
        restriction,
        order_fields,
        shape,
        query_string,
        depth,
        calendar_view,
        max_items,
        offset,
    ):
        """Find items in an account.

        :param folders: the folders to act on
        :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects
        :param restriction: a Restriction object for
        :param order_fields: the fields to sort the results by
        :param shape: The set of attributes to return
        :param query_string: a QueryString object
        :param depth: How deep in the folder structure to search for items
        :param calendar_view: If set, returns recurring calendar items unfolded
        :param max_items: the max number of items to return
        :param offset: the offset relative to the first item in the item collection. Usually 0.

        :return: XML elements for the matching items
        """
        if shape not in SHAPE_CHOICES:
            raise InvalidEnumValue("shape", shape, SHAPE_CHOICES)
        if depth not in ITEM_TRAVERSAL_CHOICES:
            raise InvalidEnumValue("depth", depth, ITEM_TRAVERSAL_CHOICES)
        self.additional_fields = additional_fields
        self.shape = shape
        return self._elems_to_objs(
            self._paged_call(
                payload_func=self.get_payload,
                max_items=max_items,
                folders=folders,
                **dict(
                    additional_fields=additional_fields,
                    restriction=restriction,
                    order_fields=order_fields,
                    query_string=query_string,
                    shape=shape,
                    depth=depth,
                    calendar_view=calendar_view,
                    page_size=self.page_size,
                    offset=offset,
                ),
            )
        )

    def _elem_to_obj(self, elem):
        if self.shape == ID_ONLY and self.additional_fields is None:
            return Item.id_from_xml(elem)
        return BaseFolder.item_model_from_tag(elem.tag).from_xml(elem=elem, account=self.account)

    def get_payload(
        self,
        folders,
        additional_fields,
        restriction,
        order_fields,
        query_string,
        shape,
        depth,
        calendar_view,
        page_size,
        offset=0,
    ):
        payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth))
        payload.append(
            shape_element(
                tag="m:ItemShape", shape=shape, additional_fields=additional_fields, version=self.account.version
            )
        )
        if calendar_view is None:
            view_type = create_element(
                "m:IndexedPageItemView", attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning")
            )
        else:
            view_type = calendar_view.to_xml(version=self.account.version)
        payload.append(view_type)
        if restriction:
            payload.append(restriction.to_xml(version=self.account.version))
        if order_fields:
            payload.append(set_xml_value(create_element("m:SortOrder"), order_fields, version=self.account.version))
        payload.append(folder_ids_element(folders=folders, version=self.protocol.version, tag="m:ParentFolderIds"))
        if query_string:
            payload.append(query_string.to_xml(version=self.account.version))
        return payload

Ancestors

Class variables

var SERVICE_NAME
var element_container_name
var paging_container_name

Methods

def call(self, folders, additional_fields, restriction, order_fields, shape, query_string, depth, calendar_view, max_items, offset)

Find items in an account.

:param folders: the folders to act on :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param restriction: a Restriction object for :param order_fields: the fields to sort the results by :param shape: The set of attributes to return :param query_string: a QueryString object :param depth: How deep in the folder structure to search for items :param calendar_view: If set, returns recurring calendar items unfolded :param max_items: the max number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0.

:return: XML elements for the matching items

Expand source code
def call(
    self,
    folders,
    additional_fields,
    restriction,
    order_fields,
    shape,
    query_string,
    depth,
    calendar_view,
    max_items,
    offset,
):
    """Find items in an account.

    :param folders: the folders to act on
    :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects
    :param restriction: a Restriction object for
    :param order_fields: the fields to sort the results by
    :param shape: The set of attributes to return
    :param query_string: a QueryString object
    :param depth: How deep in the folder structure to search for items
    :param calendar_view: If set, returns recurring calendar items unfolded
    :param max_items: the max number of items to return
    :param offset: the offset relative to the first item in the item collection. Usually 0.

    :return: XML elements for the matching items
    """
    if shape not in SHAPE_CHOICES:
        raise InvalidEnumValue("shape", shape, SHAPE_CHOICES)
    if depth not in ITEM_TRAVERSAL_CHOICES:
        raise InvalidEnumValue("depth", depth, ITEM_TRAVERSAL_CHOICES)
    self.additional_fields = additional_fields
    self.shape = shape
    return self._elems_to_objs(
        self._paged_call(
            payload_func=self.get_payload,
            max_items=max_items,
            folders=folders,
            **dict(
                additional_fields=additional_fields,
                restriction=restriction,
                order_fields=order_fields,
                query_string=query_string,
                shape=shape,
                depth=depth,
                calendar_view=calendar_view,
                page_size=self.page_size,
                offset=offset,
            ),
        )
    )
def get_payload(self, folders, additional_fields, restriction, order_fields, query_string, shape, depth, calendar_view, page_size, offset=0)
Expand source code
def get_payload(
    self,
    folders,
    additional_fields,
    restriction,
    order_fields,
    query_string,
    shape,
    depth,
    calendar_view,
    page_size,
    offset=0,
):
    payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth))
    payload.append(
        shape_element(
            tag="m:ItemShape", shape=shape, additional_fields=additional_fields, version=self.account.version
        )
    )
    if calendar_view is None:
        view_type = create_element(
            "m:IndexedPageItemView", attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning")
        )
    else:
        view_type = calendar_view.to_xml(version=self.account.version)
    payload.append(view_type)
    if restriction:
        payload.append(restriction.to_xml(version=self.account.version))
    if order_fields:
        payload.append(set_xml_value(create_element("m:SortOrder"), order_fields, version=self.account.version))
    payload.append(folder_ids_element(folders=folders, version=self.protocol.version, tag="m:ParentFolderIds"))
    if query_string:
        payload.append(query_string.to_xml(version=self.account.version))
    return payload

Inherited members

class FindPeople (*args, **kwargs)
Expand source code
class FindPeople(EWSPagingService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/findpeople-operation"""

    SERVICE_NAME = "FindPeople"
    element_container_name = f"{{{MNS}}}People"
    supported_from = EXCHANGE_2013

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # A hack to communicate parsing args to _elems_to_objs()
        self.additional_fields = None
        self.shape = None

    def call(self, folder, additional_fields, restriction, order_fields, shape, query_string, depth, max_items, offset):
        """Find items in an account. This service can only be called on a single folder.

        :param folder: the Folder object to query
        :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects
        :param restriction: a Restriction object for
        :param order_fields: the fields to sort the results by
        :param shape: The set of attributes to return
        :param query_string: a QueryString object
        :param depth: How deep in the folder structure to search for items
        :param max_items: the max number of items to return
        :param offset: the offset relative to the first item in the item collection. Usually 0.

        :return: XML elements for the matching items
        """
        if shape not in SHAPE_CHOICES:
            raise InvalidEnumValue("shape", shape, SHAPE_CHOICES)
        if depth not in ITEM_TRAVERSAL_CHOICES:
            raise InvalidEnumValue("depth", depth, ITEM_TRAVERSAL_CHOICES)
        self.additional_fields = additional_fields
        self.shape = shape
        return self._elems_to_objs(
            self._paged_call(
                payload_func=self.get_payload,
                max_items=max_items,
                folders=[folder],  # We just need the list to satisfy self._paged_call()
                **dict(
                    additional_fields=additional_fields,
                    restriction=restriction,
                    order_fields=order_fields,
                    query_string=query_string,
                    shape=shape,
                    depth=depth,
                    page_size=self.page_size,
                    offset=offset,
                ),
            )
        )

    def _elem_to_obj(self, elem):
        if self.shape == ID_ONLY and self.additional_fields is None:
            return Persona.id_from_xml(elem)
        return Persona.from_xml(elem, account=self.account)

    def get_payload(
        self, folders, additional_fields, restriction, order_fields, query_string, shape, depth, page_size, offset=0
    ):
        # We actually only support a single folder, but self._paged_call() sends us a list
        payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth))
        payload.append(
            shape_element(
                tag="m:PersonaShape", shape=shape, additional_fields=additional_fields, version=self.account.version
            )
        )
        payload.append(
            create_element(
                "m:IndexedPageItemView", attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning")
            )
        )
        if restriction:
            payload.append(restriction.to_xml(version=self.account.version))
        if order_fields:
            payload.append(set_xml_value(create_element("m:SortOrder"), order_fields, version=self.account.version))
        payload.append(folder_ids_element(folders=folders, version=self.account.version, tag="m:ParentFolderId"))
        if query_string:
            payload.append(query_string.to_xml(version=self.account.version))
        return payload

    @staticmethod
    def _get_paging_values(elem):
        """Find paging values. The paging element from FindPeople is different from other paging containers."""
        item_count = int(elem.find(f"{{{MNS}}}TotalNumberOfPeopleInView").text)
        first_matching = int(elem.find(f"{{{MNS}}}FirstMatchingRowIndex").text)
        first_loaded = int(elem.find(f"{{{MNS}}}FirstLoadedRowIndex").text)
        log.debug(
            "Got page with total items %s, first matching %s, first loaded %s ",
            item_count,
            first_matching,
            first_loaded,
        )
        next_offset = None  # GetPersona does not support fetching more pages
        return item_count, next_offset

Ancestors

Class variables

var SERVICE_NAME
var element_container_name
var supported_from

Methods

def call(self, folder, additional_fields, restriction, order_fields, shape, query_string, depth, max_items, offset)

Find items in an account. This service can only be called on a single folder.

:param folder: the Folder object to query :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param restriction: a Restriction object for :param order_fields: the fields to sort the results by :param shape: The set of attributes to return :param query_string: a QueryString object :param depth: How deep in the folder structure to search for items :param max_items: the max number of items to return :param offset: the offset relative to the first item in the item collection. Usually 0.

:return: XML elements for the matching items

Expand source code
def call(self, folder, additional_fields, restriction, order_fields, shape, query_string, depth, max_items, offset):
    """Find items in an account. This service can only be called on a single folder.

    :param folder: the Folder object to query
    :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects
    :param restriction: a Restriction object for
    :param order_fields: the fields to sort the results by
    :param shape: The set of attributes to return
    :param query_string: a QueryString object
    :param depth: How deep in the folder structure to search for items
    :param max_items: the max number of items to return
    :param offset: the offset relative to the first item in the item collection. Usually 0.

    :return: XML elements for the matching items
    """
    if shape not in SHAPE_CHOICES:
        raise InvalidEnumValue("shape", shape, SHAPE_CHOICES)
    if depth not in ITEM_TRAVERSAL_CHOICES:
        raise InvalidEnumValue("depth", depth, ITEM_TRAVERSAL_CHOICES)
    self.additional_fields = additional_fields
    self.shape = shape
    return self._elems_to_objs(
        self._paged_call(
            payload_func=self.get_payload,
            max_items=max_items,
            folders=[folder],  # We just need the list to satisfy self._paged_call()
            **dict(
                additional_fields=additional_fields,
                restriction=restriction,
                order_fields=order_fields,
                query_string=query_string,
                shape=shape,
                depth=depth,
                page_size=self.page_size,
                offset=offset,
            ),
        )
    )
def get_payload(self, folders, additional_fields, restriction, order_fields, query_string, shape, depth, page_size, offset=0)
Expand source code
def get_payload(
    self, folders, additional_fields, restriction, order_fields, query_string, shape, depth, page_size, offset=0
):
    # We actually only support a single folder, but self._paged_call() sends us a list
    payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(Traversal=depth))
    payload.append(
        shape_element(
            tag="m:PersonaShape", shape=shape, additional_fields=additional_fields, version=self.account.version
        )
    )
    payload.append(
        create_element(
            "m:IndexedPageItemView", attrs=dict(MaxEntriesReturned=page_size, Offset=offset, BasePoint="Beginning")
        )
    )
    if restriction:
        payload.append(restriction.to_xml(version=self.account.version))
    if order_fields:
        payload.append(set_xml_value(create_element("m:SortOrder"), order_fields, version=self.account.version))
    payload.append(folder_ids_element(folders=folders, version=self.account.version, tag="m:ParentFolderId"))
    if query_string:
        payload.append(query_string.to_xml(version=self.account.version))
    return payload

Inherited members

class GetAttachment (*args, **kwargs)
Expand source code
class GetAttachment(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getattachment-operation"""

    SERVICE_NAME = "GetAttachment"
    element_container_name = f"{{{MNS}}}Attachments"
    cls_map = {cls.response_tag(): cls for cls in (FileAttachment, ItemAttachment)}

    def call(self, items, include_mime_content, body_type, filter_html_content, additional_fields):
        if body_type and body_type not in BODY_TYPE_CHOICES:
            raise InvalidEnumValue("body_type", body_type, BODY_TYPE_CHOICES)
        return self._elems_to_objs(
            self._chunked_get_elements(
                self.get_payload,
                items=items,
                include_mime_content=include_mime_content,
                body_type=body_type,
                filter_html_content=filter_html_content,
                additional_fields=additional_fields,
            )
        )

    def _elem_to_obj(self, elem):
        return self.cls_map[elem.tag].from_xml(elem=elem, account=self.account)

    def get_payload(self, items, include_mime_content, body_type, filter_html_content, additional_fields):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        shape_elem = create_element("m:AttachmentShape")
        if include_mime_content:
            add_xml_child(shape_elem, "t:IncludeMimeContent", "true")
        if body_type:
            add_xml_child(shape_elem, "t:BodyType", body_type)
        if filter_html_content is not None:
            add_xml_child(shape_elem, "t:FilterHtmlContent", "true" if filter_html_content else "false")
        if additional_fields:
            additional_properties = create_element("t:AdditionalProperties")
            expanded_fields = chain(*(f.expand(version=self.account.version) for f in additional_fields))
            set_xml_value(
                additional_properties,
                sorted(expanded_fields, key=lambda f: (getattr(f.field, "field_uri", ""), f.path)),
                version=self.account.version,
            )
            shape_elem.append(additional_properties)
        if len(shape_elem):
            payload.append(shape_elem)
        payload.append(attachment_ids_element(items=items, version=self.account.version))
        return payload

    def _update_api_version(self, api_version, header, **parse_opts):
        if not parse_opts.get("stream_file_content", False):
            super()._update_api_version(api_version, header, **parse_opts)
        # TODO: We're skipping this part in streaming mode because StreamingBase64Parser cannot parse the SOAP header

    @classmethod
    def _get_soap_parts(cls, response, **parse_opts):
        if not parse_opts.get("stream_file_content", False):
            return super()._get_soap_parts(response, **parse_opts)

        # Pass the response unaltered. We want to use our custom streaming parser
        return None, response

    def _get_soap_messages(self, body, **parse_opts):
        if not parse_opts.get("stream_file_content", False):
            return super()._get_soap_messages(body, **parse_opts)

        # 'body' is actually the raw response passed on by '_get_soap_parts'
        r = body
        parser = StreamingBase64Parser()
        field = FileAttachment.get_field_by_fieldname("_content")
        handler = StreamingContentHandler(parser=parser, ns=field.namespace, element_name=field.field_uri)
        parser.setContentHandler(handler)
        return parser.parse(r)

    def stream_file_content(self, attachment_id):
        # The streaming XML parser can only stream content of one attachment
        payload = self.get_payload(
            items=[attachment_id],
            include_mime_content=False,
            body_type=None,
            filter_html_content=None,
            additional_fields=None,
        )
        self.streaming = True
        try:
            yield from self._get_response_xml(payload=payload, stream_file_content=True)
        except ElementNotFound as enf:
            # When the returned XML does not contain a Content element, ElementNotFound is thrown by parser.parse().
            # Let the non-streaming SOAP parser parse the response and hook into the normal exception handling.
            # Wrap in DummyResponse because _get_soap_parts() expects an iter_content() method.
            response = DummyResponse(content=enf.data)
            _, body = super()._get_soap_parts(response=response)
            res = super()._get_soap_messages(body=body)
            for e in self._get_elements_in_response(response=res):
                if isinstance(e, Exception):
                    raise e
            # The returned content did not contain any EWS exceptions. Give up and re-raise the original exception.
            raise enf
        finally:
            self.stop_streaming()
            self.streaming = False

Ancestors

Class variables

var SERVICE_NAME
var cls_map
var element_container_name

Methods

def call(self, items, include_mime_content, body_type, filter_html_content, additional_fields)
Expand source code
def call(self, items, include_mime_content, body_type, filter_html_content, additional_fields):
    if body_type and body_type not in BODY_TYPE_CHOICES:
        raise InvalidEnumValue("body_type", body_type, BODY_TYPE_CHOICES)
    return self._elems_to_objs(
        self._chunked_get_elements(
            self.get_payload,
            items=items,
            include_mime_content=include_mime_content,
            body_type=body_type,
            filter_html_content=filter_html_content,
            additional_fields=additional_fields,
        )
    )
def get_payload(self, items, include_mime_content, body_type, filter_html_content, additional_fields)
Expand source code
def get_payload(self, items, include_mime_content, body_type, filter_html_content, additional_fields):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    shape_elem = create_element("m:AttachmentShape")
    if include_mime_content:
        add_xml_child(shape_elem, "t:IncludeMimeContent", "true")
    if body_type:
        add_xml_child(shape_elem, "t:BodyType", body_type)
    if filter_html_content is not None:
        add_xml_child(shape_elem, "t:FilterHtmlContent", "true" if filter_html_content else "false")
    if additional_fields:
        additional_properties = create_element("t:AdditionalProperties")
        expanded_fields = chain(*(f.expand(version=self.account.version) for f in additional_fields))
        set_xml_value(
            additional_properties,
            sorted(expanded_fields, key=lambda f: (getattr(f.field, "field_uri", ""), f.path)),
            version=self.account.version,
        )
        shape_elem.append(additional_properties)
    if len(shape_elem):
        payload.append(shape_elem)
    payload.append(attachment_ids_element(items=items, version=self.account.version))
    return payload
def stream_file_content(self, attachment_id)
Expand source code
def stream_file_content(self, attachment_id):
    # The streaming XML parser can only stream content of one attachment
    payload = self.get_payload(
        items=[attachment_id],
        include_mime_content=False,
        body_type=None,
        filter_html_content=None,
        additional_fields=None,
    )
    self.streaming = True
    try:
        yield from self._get_response_xml(payload=payload, stream_file_content=True)
    except ElementNotFound as enf:
        # When the returned XML does not contain a Content element, ElementNotFound is thrown by parser.parse().
        # Let the non-streaming SOAP parser parse the response and hook into the normal exception handling.
        # Wrap in DummyResponse because _get_soap_parts() expects an iter_content() method.
        response = DummyResponse(content=enf.data)
        _, body = super()._get_soap_parts(response=response)
        res = super()._get_soap_messages(body=body)
        for e in self._get_elements_in_response(response=res):
            if isinstance(e, Exception):
                raise e
        # The returned content did not contain any EWS exceptions. Give up and re-raise the original exception.
        raise enf
    finally:
        self.stop_streaming()
        self.streaming = False

Inherited members

class GetDelegate (*args, **kwargs)
Expand source code
class GetDelegate(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getdelegate-operation"""

    SERVICE_NAME = "GetDelegate"
    ERRORS_TO_CATCH_IN_RESPONSE = ()
    supported_from = EXCHANGE_2007_SP1

    def call(self, user_ids, include_permissions):
        return self._elems_to_objs(
            self._chunked_get_elements(
                self.get_payload,
                items=user_ids or [None],
                mailbox=DLMailbox(email_address=self.account.primary_smtp_address),
                include_permissions=include_permissions,
            )
        )

    def _elem_to_obj(self, elem):
        return DelegateUser.from_xml(elem=elem, account=self.account)

    def get_payload(self, user_ids, mailbox, include_permissions):
        payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(IncludePermissions=include_permissions))
        set_xml_value(payload, mailbox, version=self.protocol.version)
        if user_ids != [None]:
            user_ids_elem = create_element("m:UserIds")
            for user_id in user_ids:
                if isinstance(user_id, str):
                    user_id = UserId(primary_smtp_address=user_id)
                set_xml_value(user_ids_elem, user_id, version=self.protocol.version)
            set_xml_value(payload, user_ids_elem, version=self.protocol.version)
        return payload

    @classmethod
    def _get_elements_in_container(cls, container):
        return container.findall(DelegateUser.response_tag())

    @classmethod
    def _response_message_tag(cls):
        return f"{{{MNS}}}DelegateUserResponseMessageType"

Ancestors

Class variables

var ERRORS_TO_CATCH_IN_RESPONSE
var SERVICE_NAME
var supported_from

Methods

def call(self, user_ids, include_permissions)
Expand source code
def call(self, user_ids, include_permissions):
    return self._elems_to_objs(
        self._chunked_get_elements(
            self.get_payload,
            items=user_ids or [None],
            mailbox=DLMailbox(email_address=self.account.primary_smtp_address),
            include_permissions=include_permissions,
        )
    )
def get_payload(self, user_ids, mailbox, include_permissions)
Expand source code
def get_payload(self, user_ids, mailbox, include_permissions):
    payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(IncludePermissions=include_permissions))
    set_xml_value(payload, mailbox, version=self.protocol.version)
    if user_ids != [None]:
        user_ids_elem = create_element("m:UserIds")
        for user_id in user_ids:
            if isinstance(user_id, str):
                user_id = UserId(primary_smtp_address=user_id)
            set_xml_value(user_ids_elem, user_id, version=self.protocol.version)
        set_xml_value(payload, user_ids_elem, version=self.protocol.version)
    return payload

Inherited members

class GetEvents (*args, **kwargs)
Expand source code
class GetEvents(EWSAccountService):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getevents-operation
    """

    SERVICE_NAME = "GetEvents"
    prefer_affinity = True

    def call(self, subscription_id, watermark):
        return self._elems_to_objs(
            self._get_elements(
                payload=self.get_payload(
                    subscription_id=subscription_id,
                    watermark=watermark,
                )
            )
        )

    def _elem_to_obj(self, elem):
        return Notification.from_xml(elem=elem, account=None)

    @classmethod
    def _get_elements_in_container(cls, container):
        return container.findall(Notification.response_tag())

    def get_payload(self, subscription_id, watermark):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        add_xml_child(payload, "m:SubscriptionId", subscription_id)
        add_xml_child(payload, "m:Watermark", watermark)
        return payload

Ancestors

Class variables

var SERVICE_NAME
var prefer_affinity

Methods

def call(self, subscription_id, watermark)
Expand source code
def call(self, subscription_id, watermark):
    return self._elems_to_objs(
        self._get_elements(
            payload=self.get_payload(
                subscription_id=subscription_id,
                watermark=watermark,
            )
        )
    )
def get_payload(self, subscription_id, watermark)
Expand source code
def get_payload(self, subscription_id, watermark):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    add_xml_child(payload, "m:SubscriptionId", subscription_id)
    add_xml_child(payload, "m:Watermark", watermark)
    return payload

Inherited members

class GetFolder (*args, **kwargs)
Expand source code
class GetFolder(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getfolder-operation"""

    SERVICE_NAME = "GetFolder"
    element_container_name = f"{{{MNS}}}Folders"
    ERRORS_TO_CATCH_IN_RESPONSE = EWSAccountService.ERRORS_TO_CATCH_IN_RESPONSE + (
        ErrorAccessDenied,
        ErrorFolderNotFound,
        ErrorNoPublicFolderReplicaAvailable,
        ErrorInvalidOperation,
        ErrorItemNotFound,
    )

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.folders = []  # A hack to communicate parsing args to _elems_to_objs()

    def call(self, folders, additional_fields, shape):
        """Take a folder ID and returns the full information for that folder.

        :param folders: a list of Folder objects
        :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects
        :param shape: The set of attributes to return

        :return: XML elements for the folders, in stable order
        """
        # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same
        # class as the folder instance it was requested with.
        self.folders = list(folders)  # Convert to a list, in case 'folders' is a generator. We're iterating twice.
        return self._elems_to_objs(
            self._chunked_get_elements(
                self.get_payload,
                items=self.folders,
                additional_fields=additional_fields,
                shape=shape,
            )
        )

    def _elems_to_objs(self, elems):
        for folder, elem in zip(self.folders, elems):
            if isinstance(elem, Exception):
                yield elem
                continue
            yield parse_folder_elem(elem=elem, folder=folder, account=self.account)

    def get_payload(self, folders, additional_fields, shape):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        payload.append(
            shape_element(
                tag="m:FolderShape", shape=shape, additional_fields=additional_fields, version=self.account.version
            )
        )
        payload.append(folder_ids_element(folders=folders, version=self.account.version))
        return payload

Ancestors

Class variables

var ERRORS_TO_CATCH_IN_RESPONSE
var SERVICE_NAME
var element_container_name

Methods

def call(self, folders, additional_fields, shape)

Take a folder ID and returns the full information for that folder.

:param folders: a list of Folder objects :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects :param shape: The set of attributes to return

:return: XML elements for the folders, in stable order

Expand source code
def call(self, folders, additional_fields, shape):
    """Take a folder ID and returns the full information for that folder.

    :param folders: a list of Folder objects
    :param additional_fields: the extra fields that should be returned with the folder, as FieldPath objects
    :param shape: The set of attributes to return

    :return: XML elements for the folders, in stable order
    """
    # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same
    # class as the folder instance it was requested with.
    self.folders = list(folders)  # Convert to a list, in case 'folders' is a generator. We're iterating twice.
    return self._elems_to_objs(
        self._chunked_get_elements(
            self.get_payload,
            items=self.folders,
            additional_fields=additional_fields,
            shape=shape,
        )
    )
def get_payload(self, folders, additional_fields, shape)
Expand source code
def get_payload(self, folders, additional_fields, shape):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    payload.append(
        shape_element(
            tag="m:FolderShape", shape=shape, additional_fields=additional_fields, version=self.account.version
        )
    )
    payload.append(folder_ids_element(folders=folders, version=self.account.version))
    return payload

Inherited members

class GetInboxRules (*args, **kwargs)

MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getinboxrules-operation

The GetInboxRules operation uses Exchange Web Services to retrieve Inbox rules in the identified user's mailbox.

Expand source code
class GetInboxRules(EWSAccountService):
    """
    MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getinboxrules-operation

    The GetInboxRules operation uses Exchange Web Services to retrieve Inbox rules in the identified user's mailbox.
    """

    SERVICE_NAME = "GetInboxRules"
    element_container_name = InboxRules.response_tag()
    ERRORS_TO_CATCH_IN_RESPONSE = EWSAccountService.ERRORS_TO_CATCH_IN_RESPONSE + (ErrorInvalidOperation,)

    def call(self, mailbox: Optional[str] = None) -> Generator[Union[Rule, Exception, None], Any, None]:
        if not mailbox:
            mailbox = self.account.primary_smtp_address
        payload = self.get_payload(mailbox=mailbox)
        elements = self._get_elements(payload=payload)
        return self._elems_to_objs(elements)

    def _elem_to_obj(self, elem):
        return Rule.from_xml(elem=elem, account=self.account)

    def get_payload(self, mailbox):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        add_xml_child(payload, "m:MailboxSmtpAddress", mailbox)
        return payload

    def _get_element_container(self, message, name=None):
        if name:
            response_class = message.get("ResponseClass")
            response_code = get_xml_attr(message, f"{{{MNS}}}ResponseCode")
            if response_class == "Success" and response_code == "NoError" and message.find(name) is None:
                return []
        return super()._get_element_container(message, name)

Ancestors

Class variables

var ERRORS_TO_CATCH_IN_RESPONSE
var SERVICE_NAME
var element_container_name

Methods

def call(self, mailbox: Optional[str] = None) ‑> Generator[Union[Rule, Exception, ForwardRef(None)], Any, None]
Expand source code
def call(self, mailbox: Optional[str] = None) -> Generator[Union[Rule, Exception, None], Any, None]:
    if not mailbox:
        mailbox = self.account.primary_smtp_address
    payload = self.get_payload(mailbox=mailbox)
    elements = self._get_elements(payload=payload)
    return self._elems_to_objs(elements)
def get_payload(self, mailbox)
Expand source code
def get_payload(self, mailbox):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    add_xml_child(payload, "m:MailboxSmtpAddress", mailbox)
    return payload

Inherited members

class GetItem (*args, **kwargs)
Expand source code
class GetItem(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getitem-operation"""

    SERVICE_NAME = "GetItem"
    element_container_name = f"{{{MNS}}}Items"

    def call(self, items, additional_fields, shape):
        """Return all items in an account that correspond to a list of ID's, in stable order.

        :param items: a list of (id, changekey) tuples or Item objects
        :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects
        :param shape: The shape of returned objects

        :return: XML elements for the items, in stable order
        """
        return self._elems_to_objs(
            self._chunked_get_elements(
                self.get_payload,
                items=items,
                additional_fields=additional_fields,
                shape=shape,
            )
        )

    def _elem_to_obj(self, elem):
        return BaseFolder.item_model_from_tag(elem.tag).from_xml(elem=elem, account=self.account)

    def get_payload(self, items, additional_fields, shape):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        payload.append(
            shape_element(
                tag="m:ItemShape", shape=shape, additional_fields=additional_fields, version=self.account.version
            )
        )
        payload.append(item_ids_element(items=items, version=self.account.version))
        return payload

Ancestors

Class variables

var SERVICE_NAME
var element_container_name

Methods

def call(self, items, additional_fields, shape)

Return all items in an account that correspond to a list of ID's, in stable order.

:param items: a list of (id, changekey) tuples or Item objects :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects :param shape: The shape of returned objects

:return: XML elements for the items, in stable order

Expand source code
def call(self, items, additional_fields, shape):
    """Return all items in an account that correspond to a list of ID's, in stable order.

    :param items: a list of (id, changekey) tuples or Item objects
    :param additional_fields: the extra fields that should be returned with the item, as FieldPath objects
    :param shape: The shape of returned objects

    :return: XML elements for the items, in stable order
    """
    return self._elems_to_objs(
        self._chunked_get_elements(
            self.get_payload,
            items=items,
            additional_fields=additional_fields,
            shape=shape,
        )
    )
def get_payload(self, items, additional_fields, shape)
Expand source code
def get_payload(self, items, additional_fields, shape):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    payload.append(
        shape_element(
            tag="m:ItemShape", shape=shape, additional_fields=additional_fields, version=self.account.version
        )
    )
    payload.append(item_ids_element(items=items, version=self.account.version))
    return payload

Inherited members

class GetMailTips (protocol, chunk_size=None, timeout=None)
Expand source code
class GetMailTips(EWSService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getmailtips-operation"""

    SERVICE_NAME = "GetMailTips"

    def call(self, sending_as, recipients, mail_tips_requested):
        return self._elems_to_objs(
            self._chunked_get_elements(
                self.get_payload,
                items=recipients,
                sending_as=sending_as,
                mail_tips_requested=mail_tips_requested,
            )
        )

    def _elem_to_obj(self, elem):
        return MailTips.from_xml(elem=elem, account=None)

    def get_payload(self, recipients, sending_as, mail_tips_requested):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        set_xml_value(payload, sending_as, version=self.protocol.version)

        recipients_elem = create_element("m:Recipients")
        for recipient in recipients:
            set_xml_value(recipients_elem, recipient, version=self.protocol.version)
        payload.append(recipients_elem)

        if mail_tips_requested:
            set_xml_value(payload, mail_tips_requested, version=self.protocol.version)
        return payload

    def _get_elements_in_response(self, response):
        for msg in response:
            yield self._get_element_container(message=msg, name=MailTips.response_tag())

    @classmethod
    def _response_message_tag(cls):
        return f"{{{MNS}}}MailTipsResponseMessageType"

Ancestors

Class variables

var SERVICE_NAME

Methods

def call(self, sending_as, recipients, mail_tips_requested)
Expand source code
def call(self, sending_as, recipients, mail_tips_requested):
    return self._elems_to_objs(
        self._chunked_get_elements(
            self.get_payload,
            items=recipients,
            sending_as=sending_as,
            mail_tips_requested=mail_tips_requested,
        )
    )
def get_payload(self, recipients, sending_as, mail_tips_requested)
Expand source code
def get_payload(self, recipients, sending_as, mail_tips_requested):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    set_xml_value(payload, sending_as, version=self.protocol.version)

    recipients_elem = create_element("m:Recipients")
    for recipient in recipients:
        set_xml_value(recipients_elem, recipient, version=self.protocol.version)
    payload.append(recipients_elem)

    if mail_tips_requested:
        set_xml_value(payload, mail_tips_requested, version=self.protocol.version)
    return payload

Inherited members

class GetPersona (*args, **kwargs)
Expand source code
class GetPersona(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getpersona-operation"""

    SERVICE_NAME = "GetPersona"

    def call(self, personas):
        # GetPersona only accepts one persona ID per request. Crazy.
        for persona in personas:
            yield from self._elems_to_objs(self._get_elements(payload=self.get_payload(persona=persona)))

    def _elem_to_obj(self, elem):
        return Persona.from_xml(elem=elem, account=None)

    def get_payload(self, persona):
        return set_xml_value(
            create_element(f"m:{self.SERVICE_NAME}"), to_item_id(persona, PersonaId), version=self.protocol.version
        )

    @classmethod
    def _get_elements_in_container(cls, container):
        return container.findall(f"{{{MNS}}}{Persona.ELEMENT_NAME}")

    @classmethod
    def _response_tag(cls):
        return f"{{{MNS}}}{cls.SERVICE_NAME}ResponseMessage"

Ancestors

Class variables

var SERVICE_NAME

Methods

def call(self, personas)
Expand source code
def call(self, personas):
    # GetPersona only accepts one persona ID per request. Crazy.
    for persona in personas:
        yield from self._elems_to_objs(self._get_elements(payload=self.get_payload(persona=persona)))
def get_payload(self, persona)
Expand source code
def get_payload(self, persona):
    return set_xml_value(
        create_element(f"m:{self.SERVICE_NAME}"), to_item_id(persona, PersonaId), version=self.protocol.version
    )

Inherited members

class GetRoomLists (protocol, chunk_size=None, timeout=None)
Expand source code
class GetRoomLists(EWSService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getroomlists-operation"""

    SERVICE_NAME = "GetRoomLists"
    element_container_name = f"{{{MNS}}}RoomLists"
    supported_from = EXCHANGE_2010

    def call(self):
        return self._elems_to_objs(self._get_elements(payload=self.get_payload()))

    def _elem_to_obj(self, elem):
        return RoomList.from_xml(elem=elem, account=None)

    def get_payload(self):
        return create_element(f"m:{self.SERVICE_NAME}")

Ancestors

Class variables

var SERVICE_NAME
var element_container_name
var supported_from

Methods

def call(self)
Expand source code
def call(self):
    return self._elems_to_objs(self._get_elements(payload=self.get_payload()))
def get_payload(self)
Expand source code
def get_payload(self):
    return create_element(f"m:{self.SERVICE_NAME}")

Inherited members

class GetRooms (protocol, chunk_size=None, timeout=None)
Expand source code
class GetRooms(EWSService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getrooms-operation"""

    SERVICE_NAME = "GetRooms"
    element_container_name = f"{{{MNS}}}Rooms"
    supported_from = EXCHANGE_2010

    def call(self, room_list):
        return self._elems_to_objs(self._get_elements(payload=self.get_payload(room_list=room_list)))

    def _elem_to_obj(self, elem):
        return Room.from_xml(elem=elem, account=None)

    def get_payload(self, room_list):
        return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), room_list, version=self.protocol.version)

Ancestors

Class variables

var SERVICE_NAME
var element_container_name
var supported_from

Methods

def call(self, room_list)
Expand source code
def call(self, room_list):
    return self._elems_to_objs(self._get_elements(payload=self.get_payload(room_list=room_list)))
def get_payload(self, room_list)
Expand source code
def get_payload(self, room_list):
    return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), room_list, version=self.protocol.version)

Inherited members

class GetSearchableMailboxes (protocol, chunk_size=None, timeout=None)
Expand source code
class GetSearchableMailboxes(EWSService):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getsearchablemailboxes-operation
    """

    SERVICE_NAME = "GetSearchableMailboxes"
    element_container_name = f"{{{MNS}}}SearchableMailboxes"
    failed_mailboxes_container_name = f"{{{MNS}}}FailedMailboxes"
    supported_from = EXCHANGE_2013
    cls_map = {cls.response_tag(): cls for cls in (SearchableMailbox, FailedMailbox)}

    def call(self, search_filter, expand_group_membership):
        return self._elems_to_objs(
            self._get_elements(
                payload=self.get_payload(
                    search_filter=search_filter,
                    expand_group_membership=expand_group_membership,
                )
            )
        )

    def _elem_to_obj(self, elem):
        return self.cls_map[elem.tag].from_xml(elem=elem, account=None)

    def get_payload(self, search_filter, expand_group_membership):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        if search_filter:
            add_xml_child(payload, "m:SearchFilter", search_filter)
        if expand_group_membership is not None:
            add_xml_child(payload, "m:ExpandGroupMembership", "true" if expand_group_membership else "false")
        return payload

    def _get_elements_in_response(self, response):
        for msg in response:
            for container_name in (self.element_container_name, self.failed_mailboxes_container_name):
                try:
                    container = self._get_element_container(message=msg, name=container_name)
                except MalformedResponseError:
                    # Responses may contain no mailboxes of either kind. _get_element_container() does not accept this.
                    continue
                yield from self._get_elements_in_container(container=container)

Ancestors

Class variables

var SERVICE_NAME
var cls_map
var element_container_name
var failed_mailboxes_container_name
var supported_from

Methods

def call(self, search_filter, expand_group_membership)
Expand source code
def call(self, search_filter, expand_group_membership):
    return self._elems_to_objs(
        self._get_elements(
            payload=self.get_payload(
                search_filter=search_filter,
                expand_group_membership=expand_group_membership,
            )
        )
    )
def get_payload(self, search_filter, expand_group_membership)
Expand source code
def get_payload(self, search_filter, expand_group_membership):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    if search_filter:
        add_xml_child(payload, "m:SearchFilter", search_filter)
    if expand_group_membership is not None:
        add_xml_child(payload, "m:ExpandGroupMembership", "true" if expand_group_membership else "false")
    return payload

Inherited members

class GetServerTimeZones (protocol, chunk_size=None, timeout=None)
Expand source code
class GetServerTimeZones(EWSService):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getservertimezones-operation
    """

    SERVICE_NAME = "GetServerTimeZones"
    element_container_name = f"{{{MNS}}}TimeZoneDefinitions"
    supported_from = EXCHANGE_2010

    def call(self, timezones=None, return_full_timezone_data=False):
        return self._elems_to_objs(
            self._get_elements(
                payload=self.get_payload(timezones=timezones, return_full_timezone_data=return_full_timezone_data)
            )
        )

    def get_payload(self, timezones, return_full_timezone_data):
        payload = create_element(
            f"m:{self.SERVICE_NAME}",
            attrs=dict(ReturnFullTimeZoneData=return_full_timezone_data),
        )
        if timezones is not None:
            is_empty, timezones = peek(timezones)
            if not is_empty:
                tz_ids = create_element("m:Ids")
                for timezone in timezones:
                    tz_id = set_xml_value(create_element("t:Id"), timezone.ms_id)
                    tz_ids.append(tz_id)
                payload.append(tz_ids)
        return payload

    def _elem_to_obj(self, elem):
        return TimeZoneDefinition.from_xml(elem=elem, account=None)

Ancestors

Class variables

var SERVICE_NAME
var element_container_name
var supported_from

Methods

def call(self, timezones=None, return_full_timezone_data=False)
Expand source code
def call(self, timezones=None, return_full_timezone_data=False):
    return self._elems_to_objs(
        self._get_elements(
            payload=self.get_payload(timezones=timezones, return_full_timezone_data=return_full_timezone_data)
        )
    )
def get_payload(self, timezones, return_full_timezone_data)
Expand source code
def get_payload(self, timezones, return_full_timezone_data):
    payload = create_element(
        f"m:{self.SERVICE_NAME}",
        attrs=dict(ReturnFullTimeZoneData=return_full_timezone_data),
    )
    if timezones is not None:
        is_empty, timezones = peek(timezones)
        if not is_empty:
            tz_ids = create_element("m:Ids")
            for timezone in timezones:
                tz_id = set_xml_value(create_element("t:Id"), timezone.ms_id)
                tz_ids.append(tz_id)
            payload.append(tz_ids)
    return payload

Inherited members

class GetStreamingEvents (*args, **kwargs)
Expand source code
class GetStreamingEvents(EWSAccountService):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getstreamingevents-operation
    """

    SERVICE_NAME = "GetStreamingEvents"
    element_container_name = f"{{{MNS}}}Notifications"
    prefer_affinity = True

    # Connection status values
    OK = "OK"
    CLOSED = "Closed"

    def __init__(self, *args, **kwargs):
        # These values are set each time call() is consumed
        self.connection_status = None
        super().__init__(*args, **kwargs)
        self.streaming = True

    def call(self, subscription_ids, connection_timeout):
        if not isinstance(connection_timeout, int):
            raise InvalidTypeError("connection_timeout", connection_timeout, int)
        if connection_timeout < 1:
            raise ValueError(f"'connection_timeout' {connection_timeout} must be a positive integer")
        # Add 60 seconds to the timeout, to allow us to always get the final message containing ConnectionStatus=Closed
        self.timeout = connection_timeout * 60 + 60
        return self._elems_to_objs(
            self._get_elements(
                payload=self.get_payload(
                    subscription_ids=subscription_ids,
                    connection_timeout=connection_timeout,
                )
            )
        )

    def _elem_to_obj(self, elem):
        return Notification.from_xml(elem=elem, account=None)

    @classmethod
    def _get_soap_parts(cls, response, **parse_opts):
        # Pass the response unaltered. We want to use our custom document yielder
        return None, response

    def _get_soap_messages(self, body, **parse_opts):
        # 'body' is actually the raw response passed on by '_get_soap_parts'. We want to continuously read the content,
        # looking for complete XML documents. When we have a full document, we want to parse it as if it was a normal
        # XML response.
        r = body
        for i, doc in enumerate(DocumentYielder(r.iter_content()), start=1):
            xml_log.debug("Response XML (docs counter: %(i)s): %(xml_response)s", dict(i=i, xml_response=doc))
            response = DummyResponse(content=doc)
            try:
                _, body = super()._get_soap_parts(response=response, **parse_opts)
            except Exception:
                r.close()  # Release memory
                raise
            # TODO: We're skipping ._update_api_version() here because we don't have access to the 'api_version' used.
            # TODO: We should be doing a lot of error handling for ._get_soap_messages().
            yield from super()._get_soap_messages(body=body, **parse_opts)
            if self.connection_status == self.CLOSED:
                # Don't wait for the TCP connection to timeout
                break

    def _get_element_container(self, message, name=None):
        error_ids_elem = message.find(f"{{{MNS}}}ErrorSubscriptionIds")
        error_ids = [] if error_ids_elem is None else get_xml_attrs(error_ids_elem, f"{{{MNS}}}SubscriptionId")
        self.connection_status = get_xml_attr(message, f"{{{MNS}}}ConnectionStatus")  # Either 'OK' or 'Closed'
        log.debug("Connection status is: %s", self.connection_status)
        # Upstream normally expects to find a 'name' tag but our response does not always have it. We still want to
        # call upstream, to have exceptions raised. Return an empty list if there is no 'name' tag and no errors.
        if message.find(name) is None:
            name = None
        try:
            res = super()._get_element_container(message=message, name=name)
        except EWSError as e:
            # When the request contains a combination of good and failing subscription IDs, notifications for the good
            # subscriptions seem to never be returned even though the XML spec allows it. This means there's no point in
            # trying to collect any notifications here and delivering a combination of errors and return values.
            if error_ids:
                e.value += f" (subscription IDs: {error_ids})"
            raise e
        return [] if name is None else res

    def get_payload(self, subscription_ids, connection_timeout):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        subscriptions_elem = create_element("m:SubscriptionIds")
        for subscription_id in subscription_ids:
            add_xml_child(subscriptions_elem, "t:SubscriptionId", subscription_id)
        if not len(subscriptions_elem):
            raise ValueError("'subscription_ids' must not be empty")

        payload.append(subscriptions_elem)
        add_xml_child(payload, "m:ConnectionTimeout", connection_timeout)
        return payload

Ancestors

Class variables

var CLOSED
var OK
var SERVICE_NAME
var element_container_name
var prefer_affinity

Methods

def call(self, subscription_ids, connection_timeout)
Expand source code
def call(self, subscription_ids, connection_timeout):
    if not isinstance(connection_timeout, int):
        raise InvalidTypeError("connection_timeout", connection_timeout, int)
    if connection_timeout < 1:
        raise ValueError(f"'connection_timeout' {connection_timeout} must be a positive integer")
    # Add 60 seconds to the timeout, to allow us to always get the final message containing ConnectionStatus=Closed
    self.timeout = connection_timeout * 60 + 60
    return self._elems_to_objs(
        self._get_elements(
            payload=self.get_payload(
                subscription_ids=subscription_ids,
                connection_timeout=connection_timeout,
            )
        )
    )
def get_payload(self, subscription_ids, connection_timeout)
Expand source code
def get_payload(self, subscription_ids, connection_timeout):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    subscriptions_elem = create_element("m:SubscriptionIds")
    for subscription_id in subscription_ids:
        add_xml_child(subscriptions_elem, "t:SubscriptionId", subscription_id)
    if not len(subscriptions_elem):
        raise ValueError("'subscription_ids' must not be empty")

    payload.append(subscriptions_elem)
    add_xml_child(payload, "m:ConnectionTimeout", connection_timeout)
    return payload

Inherited members

class GetUserAvailability (protocol, chunk_size=None, timeout=None)
Expand source code
class GetUserAvailability(EWSService):
    """Get detailed availability information for a list of users.
    MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getuseravailability-operation
    """

    SERVICE_NAME = "GetUserAvailability"

    def call(self, mailbox_data, timezone, free_busy_view_options):
        # TODO: Also supports SuggestionsViewOptions, see
        #  https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/suggestionsviewoptions
        return self._elems_to_objs(
            self._chunked_get_elements(
                self.get_payload,
                items=mailbox_data,
                timezone=timezone,
                free_busy_view_options=free_busy_view_options,
            )
        )

    def _elem_to_obj(self, elem):
        return FreeBusyView.from_xml(elem=elem, account=None)

    def get_payload(self, mailbox_data, timezone, free_busy_view_options):
        payload = create_element(f"m:{self.SERVICE_NAME}Request")
        set_xml_value(payload, timezone, version=self.protocol.version)
        mailbox_data_array = create_element("m:MailboxDataArray")
        set_xml_value(mailbox_data_array, mailbox_data, version=self.protocol.version)
        payload.append(mailbox_data_array)
        return set_xml_value(payload, free_busy_view_options, version=self.protocol.version)

    @staticmethod
    def _response_messages_tag():
        return f"{{{MNS}}}FreeBusyResponseArray"

    @classmethod
    def _response_message_tag(cls):
        return f"{{{MNS}}}FreeBusyResponse"

    def _get_elements_in_response(self, response):
        for msg in response:
            container_or_exc = self._get_element_container(message=msg.find(f"{{{MNS}}}ResponseMessage"))
            if isinstance(container_or_exc, Exception):
                yield container_or_exc
            else:
                yield from self._get_elements_in_container(container=msg)

    @classmethod
    def _get_elements_in_container(cls, container):
        return [container.find(f"{{{MNS}}}FreeBusyView")]

Ancestors

Class variables

var SERVICE_NAME

Methods

def call(self, mailbox_data, timezone, free_busy_view_options)
Expand source code
def call(self, mailbox_data, timezone, free_busy_view_options):
    # TODO: Also supports SuggestionsViewOptions, see
    #  https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/suggestionsviewoptions
    return self._elems_to_objs(
        self._chunked_get_elements(
            self.get_payload,
            items=mailbox_data,
            timezone=timezone,
            free_busy_view_options=free_busy_view_options,
        )
    )
def get_payload(self, mailbox_data, timezone, free_busy_view_options)
Expand source code
def get_payload(self, mailbox_data, timezone, free_busy_view_options):
    payload = create_element(f"m:{self.SERVICE_NAME}Request")
    set_xml_value(payload, timezone, version=self.protocol.version)
    mailbox_data_array = create_element("m:MailboxDataArray")
    set_xml_value(mailbox_data_array, mailbox_data, version=self.protocol.version)
    payload.append(mailbox_data_array)
    return set_xml_value(payload, free_busy_view_options, version=self.protocol.version)

Inherited members

class GetUserConfiguration (*args, **kwargs)
Expand source code
class GetUserConfiguration(EWSAccountService):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getuserconfiguration-operation
    """

    SERVICE_NAME = "GetUserConfiguration"

    def call(self, user_configuration_name, properties):
        if properties not in PROPERTIES_CHOICES:
            raise InvalidEnumValue("properties", properties, PROPERTIES_CHOICES)
        return self._elems_to_objs(
            self._get_elements(
                payload=self.get_payload(user_configuration_name=user_configuration_name, properties=properties)
            )
        )

    def _elem_to_obj(self, elem):
        return UserConfiguration.from_xml(elem=elem, account=self.account)

    @classmethod
    def _get_elements_in_container(cls, container):
        return container.findall(UserConfiguration.response_tag())

    def get_payload(self, user_configuration_name, properties):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        set_xml_value(payload, user_configuration_name, version=self.account.version)
        payload.append(
            set_xml_value(create_element("m:UserConfigurationProperties"), properties, version=self.account.version)
        )
        return payload

Ancestors

Class variables

var SERVICE_NAME

Methods

def call(self, user_configuration_name, properties)
Expand source code
def call(self, user_configuration_name, properties):
    if properties not in PROPERTIES_CHOICES:
        raise InvalidEnumValue("properties", properties, PROPERTIES_CHOICES)
    return self._elems_to_objs(
        self._get_elements(
            payload=self.get_payload(user_configuration_name=user_configuration_name, properties=properties)
        )
    )
def get_payload(self, user_configuration_name, properties)
Expand source code
def get_payload(self, user_configuration_name, properties):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    set_xml_value(payload, user_configuration_name, version=self.account.version)
    payload.append(
        set_xml_value(create_element("m:UserConfigurationProperties"), properties, version=self.account.version)
    )
    return payload

Inherited members

class GetUserOofSettings (*args, **kwargs)
Expand source code
class GetUserOofSettings(EWSAccountService):
    """Get automatic reply settings for the specified mailbox.
    MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getuseroofsettings-operation
    """

    SERVICE_NAME = "GetUserOofSettings"
    element_container_name = f"{{{TNS}}}OofSettings"

    def call(self, mailbox):
        return self._elems_to_objs(self._get_elements(payload=self.get_payload(mailbox=mailbox)))

    def _elem_to_obj(self, elem):
        return OofSettings.from_xml(elem=elem, account=self.account)

    def get_payload(self, mailbox):
        return set_xml_value(
            create_element(f"m:{self.SERVICE_NAME}Request"),
            AvailabilityMailbox.from_mailbox(mailbox),
            version=self.account.version,
        )

    @classmethod
    def _get_elements_in_container(cls, container):
        # This service only returns one result, directly in 'container'
        return [container]

    def _get_element_container(self, message, name=None):
        # This service returns the result container outside the response message
        super()._get_element_container(message=message.find(self._response_message_tag()), name=None)
        return message.find(name)

    @classmethod
    def _response_message_tag(cls):
        return f"{{{MNS}}}ResponseMessage"

Ancestors

Class variables

var SERVICE_NAME
var element_container_name

Methods

def call(self, mailbox)
Expand source code
def call(self, mailbox):
    return self._elems_to_objs(self._get_elements(payload=self.get_payload(mailbox=mailbox)))
def get_payload(self, mailbox)
Expand source code
def get_payload(self, mailbox):
    return set_xml_value(
        create_element(f"m:{self.SERVICE_NAME}Request"),
        AvailabilityMailbox.from_mailbox(mailbox),
        version=self.account.version,
    )

Inherited members

class GetUserSettings (protocol, chunk_size=None, timeout=None)

Take a list of users and requested Autodiscover settings for these users. Returns the requested settings values.

MSDN: https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/getusersettings-operation-soap

Expand source code
class GetUserSettings(EWSService):
    """Take a list of users and requested Autodiscover settings for these users. Returns the requested settings values.

    MSDN:
    https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/getusersettings-operation-soap
    """

    SERVICE_NAME = "GetUserSettings"
    NS_MAP = {k: v for k, v in ns_translation.items() if k in ("s", "t", "a", "wsa", "xsi")}
    element_container_name = f"{{{ANS}}}UserResponses"
    supported_from = EXCHANGE_2010

    def call(self, users, settings):
        return self._elems_to_objs(self._get_elements(self.get_payload(users=users, settings=settings)))

    def wrap(self, content, api_version=None):
        envelope = create_element("s:Envelope", nsmap=self.NS_MAP)
        header = create_element("s:Header")
        if api_version:
            add_xml_child(header, "a:RequestedServerVersion", api_version)
        action = f"http://schemas.microsoft.com/exchange/2010/Autodiscover/Autodiscover/{self.SERVICE_NAME}"
        add_xml_child(header, "wsa:Action", action)
        add_xml_child(header, "wsa:To", self.protocol.service_endpoint)
        identity = self._account_to_impersonate
        if identity:
            add_xml_child(header, "t:ExchangeImpersonation", identity)
        envelope.append(header)
        body = create_element("s:Body")
        body.append(content)
        envelope.append(body)
        return xml_to_str(envelope, encoding=DEFAULT_ENCODING, xml_declaration=True)

    def _elem_to_obj(self, elem):
        return UserResponse.from_xml(elem=elem, account=None)

    def get_payload(self, users, settings):
        payload = create_element(f"a:{self.SERVICE_NAME}RequestMessage")
        request = create_element("a:Request")
        users_elem = create_element("a:Users")
        for user in users:
            mailbox = create_element("a:Mailbox")
            set_xml_value(mailbox, user)
            add_xml_child(users_elem, "a:User", mailbox)
        if not len(users_elem):
            raise ValueError("'users' must not be empty")
        request.append(users_elem)
        requested_settings = create_element("a:RequestedSettings")
        for setting in settings:
            add_xml_child(requested_settings, "a:Setting", UserResponse.SETTINGS_MAP[setting])
        if not len(requested_settings):
            raise ValueError("'requested_settings' must not be empty")
        request.append(requested_settings)
        payload.append(request)
        return payload

    @classmethod
    def _response_tag(cls):
        """Return the name of the element containing the service response."""
        return f"{{{ANS}}}{cls.SERVICE_NAME}ResponseMessage"

    def _get_element_container(self, message, name=None):
        response = message.find(f"{{{ANS}}}Response")
        # ErrorCode: See
        # https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/errorcode-soap
        # There are two 'ErrorCode' elements in the response; one is a child of the 'Response' element, the other is a
        # child of the 'UserResponse' element. Let's handle both with the same code.
        res = UserResponse.parse_elem(response)
        if isinstance(res, Exception):
            raise res
        if res.error_code == "NoError":
            container = response.find(name)
            if container is None:
                raise MalformedResponseError(f"No {name} elements in ResponseMessage ({xml_to_str(response)})")
            return container
        # Raise any non-acceptable errors in the container, or return the acceptable exception instance
        try:
            raise self._get_exception(code=res.error_code, text=res.error_message, msg_xml=None)
        except self.ERRORS_TO_CATCH_IN_RESPONSE as e:
            return e

Ancestors

Class variables

var NS_MAP
var SERVICE_NAME
var element_container_name
var supported_from

Methods

def call(self, users, settings)
Expand source code
def call(self, users, settings):
    return self._elems_to_objs(self._get_elements(self.get_payload(users=users, settings=settings)))
def get_payload(self, users, settings)
Expand source code
def get_payload(self, users, settings):
    payload = create_element(f"a:{self.SERVICE_NAME}RequestMessage")
    request = create_element("a:Request")
    users_elem = create_element("a:Users")
    for user in users:
        mailbox = create_element("a:Mailbox")
        set_xml_value(mailbox, user)
        add_xml_child(users_elem, "a:User", mailbox)
    if not len(users_elem):
        raise ValueError("'users' must not be empty")
    request.append(users_elem)
    requested_settings = create_element("a:RequestedSettings")
    for setting in settings:
        add_xml_child(requested_settings, "a:Setting", UserResponse.SETTINGS_MAP[setting])
    if not len(requested_settings):
        raise ValueError("'requested_settings' must not be empty")
    request.append(requested_settings)
    payload.append(request)
    return payload

Inherited members

class MarkAsJunk (*args, **kwargs)
Expand source code
class MarkAsJunk(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/markasjunk-operation"""

    SERVICE_NAME = "MarkAsJunk"

    def call(self, items, is_junk, move_item):
        return self._elems_to_objs(
            self._chunked_get_elements(self.get_payload, items=items, is_junk=is_junk, move_item=move_item)
        )

    def _elem_to_obj(self, elem):
        return MovedItemId.id_from_xml(elem)

    @classmethod
    def _get_elements_in_container(cls, container):
        return container.findall(MovedItemId.response_tag())

    def get_payload(self, items, is_junk, move_item):
        # Takes a list of items and returns either success or raises an error message
        payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(IsJunk=is_junk, MoveItem=move_item))
        payload.append(item_ids_element(items=items, version=self.account.version))
        return payload

Ancestors

Class variables

var SERVICE_NAME

Methods

def call(self, items, is_junk, move_item)
Expand source code
def call(self, items, is_junk, move_item):
    return self._elems_to_objs(
        self._chunked_get_elements(self.get_payload, items=items, is_junk=is_junk, move_item=move_item)
    )
def get_payload(self, items, is_junk, move_item)
Expand source code
def get_payload(self, items, is_junk, move_item):
    # Takes a list of items and returns either success or raises an error message
    payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(IsJunk=is_junk, MoveItem=move_item))
    payload.append(item_ids_element(items=items, version=self.account.version))
    return payload

Inherited members

class MoveFolder (*args, **kwargs)
Expand source code
class MoveFolder(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/movefolder-operation"""

    SERVICE_NAME = "MoveFolder"
    element_container_name = f"{{{MNS}}}Folders"

    def call(self, folders, to_folder):
        if not isinstance(to_folder, (BaseFolder, FolderId)):
            raise InvalidTypeError("to_folder", to_folder, (BaseFolder, FolderId))
        return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=folders, to_folder=to_folder))

    def _elem_to_obj(self, elem):
        return FolderId.from_xml(elem=elem.find(FolderId.response_tag()), account=self.account)

    def get_payload(self, folders, to_folder):
        # Takes a list of folders and returns their new folder IDs
        payload = create_element(f"m:{self.SERVICE_NAME}")
        payload.append(folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ToFolderId"))
        payload.append(folder_ids_element(folders=folders, version=self.account.version))
        return payload

Ancestors

Class variables

var SERVICE_NAME
var element_container_name

Methods

def call(self, folders, to_folder)
Expand source code
def call(self, folders, to_folder):
    if not isinstance(to_folder, (BaseFolder, FolderId)):
        raise InvalidTypeError("to_folder", to_folder, (BaseFolder, FolderId))
    return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=folders, to_folder=to_folder))
def get_payload(self, folders, to_folder)
Expand source code
def get_payload(self, folders, to_folder):
    # Takes a list of folders and returns their new folder IDs
    payload = create_element(f"m:{self.SERVICE_NAME}")
    payload.append(folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ToFolderId"))
    payload.append(folder_ids_element(folders=folders, version=self.account.version))
    return payload

Inherited members

class MoveItem (*args, **kwargs)
Expand source code
class MoveItem(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/moveitem-operation"""

    SERVICE_NAME = "MoveItem"
    element_container_name = f"{{{MNS}}}Items"

    def call(self, items, to_folder):
        if not isinstance(to_folder, (BaseFolder, FolderId)):
            raise InvalidTypeError("to_folder", to_folder, (BaseFolder, FolderId))
        return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, to_folder=to_folder))

    def _elem_to_obj(self, elem):
        return Item.id_from_xml(elem)

    def get_payload(self, items, to_folder):
        # Takes a list of items and returns their new item IDs
        payload = create_element(f"m:{self.SERVICE_NAME}")
        payload.append(folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ToFolderId"))
        payload.append(item_ids_element(items=items, version=self.account.version))
        return payload

Ancestors

Subclasses

Class variables

var SERVICE_NAME
var element_container_name

Methods

def call(self, items, to_folder)
Expand source code
def call(self, items, to_folder):
    if not isinstance(to_folder, (BaseFolder, FolderId)):
        raise InvalidTypeError("to_folder", to_folder, (BaseFolder, FolderId))
    return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items, to_folder=to_folder))
def get_payload(self, items, to_folder)
Expand source code
def get_payload(self, items, to_folder):
    # Takes a list of items and returns their new item IDs
    payload = create_element(f"m:{self.SERVICE_NAME}")
    payload.append(folder_ids_element(folders=[to_folder], version=self.account.version, tag="m:ToFolderId"))
    payload.append(item_ids_element(items=items, version=self.account.version))
    return payload

Inherited members

class ResolveNames (*args, **kwargs)
Expand source code
class ResolveNames(EWSService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/resolvenames-operation"""

    SERVICE_NAME = "ResolveNames"
    element_container_name = f"{{{MNS}}}ResolutionSet"
    ERRORS_TO_CATCH_IN_RESPONSE = ErrorNameResolutionNoResults
    WARNINGS_TO_IGNORE_IN_RESPONSE = ErrorNameResolutionMultipleResults

    # According to the 'Remarks' section of the MSDN documentation referenced above, at most 100 candidates are
    # returned for a lookup.
    # Note: paging information is returned as attrs on the 'ResolutionSet' element, but this service does not
    # support the 'IndexedPageItemView' element, so it's not really a paging service.
    candidates_limit = 100

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.return_full_contact_data = False  # A hack to communicate parsing args to _elems_to_objs()

    def call(
        self,
        unresolved_entries,
        parent_folders=None,
        return_full_contact_data=False,
        search_scope=None,
        contact_data_shape=None,
    ):
        if self.chunk_size > 100:
            raise ValueError(
                f"Chunk size {self.chunk_size} is too high. {self.SERVICE_NAME} supports returning at most 100 "
                f"candidates for a lookup",
            )
        if search_scope and search_scope not in SEARCH_SCOPE_CHOICES:
            raise InvalidEnumValue("search_scope", search_scope, SEARCH_SCOPE_CHOICES)
        if contact_data_shape and contact_data_shape not in SHAPE_CHOICES:
            raise InvalidEnumValue("contact_data_shape", contact_data_shape, SHAPE_CHOICES)
        self.return_full_contact_data = return_full_contact_data
        return self._elems_to_objs(
            self._chunked_get_elements(
                self.get_payload,
                items=unresolved_entries,
                parent_folders=parent_folders,
                return_full_contact_data=return_full_contact_data,
                search_scope=search_scope,
                contact_data_shape=contact_data_shape,
            )
        )

    def _get_element_container(self, message, name=None):
        container_or_exc = super()._get_element_container(message=message, name=name)
        if isinstance(container_or_exc, Exception):
            return container_or_exc
        is_last_page = container_or_exc.get("IncludesLastItemInRange").lower() in ("true", "0")
        log.debug("Includes last item in range: %s", is_last_page)
        if not is_last_page:
            warnings.warn(
                f"The {self.__class__.__name__} service returns at most {self.candidates_limit} candidates and does "
                f"not support paging. You have reached this limit and have not received the exhaustive list of "
                f"candidates."
            )
        return container_or_exc

    def _elem_to_obj(self, elem):
        if self.return_full_contact_data:
            mailbox_elem = elem.find(Mailbox.response_tag())
            contact_elem = elem.find(Contact.response_tag())
            return (
                None if mailbox_elem is None else Mailbox.from_xml(elem=mailbox_elem, account=None),
                None if contact_elem is None else Contact.from_xml(elem=contact_elem, account=None),
            )
        return Mailbox.from_xml(elem=elem.find(Mailbox.response_tag()), account=None)

    def get_payload(
        self, unresolved_entries, parent_folders, return_full_contact_data, search_scope, contact_data_shape
    ):
        attrs = dict(ReturnFullContactData=return_full_contact_data)
        if search_scope:
            attrs["SearchScope"] = search_scope
        if contact_data_shape:
            if self.protocol.version.build < EXCHANGE_2010_SP2:
                raise NotImplementedError(
                    "'contact_data_shape' is only supported for Exchange 2010 SP2 servers and later"
                )
            attrs["ContactDataShape"] = contact_data_shape
        payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs)
        if parent_folders:
            payload.append(
                folder_ids_element(folders=parent_folders, version=self.protocol.version, tag="m:ParentFolderIds")
            )
        for entry in unresolved_entries:
            add_xml_child(payload, "m:UnresolvedEntry", entry)
        return payload

Ancestors

Class variables

var ERRORS_TO_CATCH_IN_RESPONSE

Global error type within this module.

var SERVICE_NAME
var WARNINGS_TO_IGNORE_IN_RESPONSE

Global error type within this module.

var candidates_limit
var element_container_name

Methods

def call(self, unresolved_entries, parent_folders=None, return_full_contact_data=False, search_scope=None, contact_data_shape=None)
Expand source code
def call(
    self,
    unresolved_entries,
    parent_folders=None,
    return_full_contact_data=False,
    search_scope=None,
    contact_data_shape=None,
):
    if self.chunk_size > 100:
        raise ValueError(
            f"Chunk size {self.chunk_size} is too high. {self.SERVICE_NAME} supports returning at most 100 "
            f"candidates for a lookup",
        )
    if search_scope and search_scope not in SEARCH_SCOPE_CHOICES:
        raise InvalidEnumValue("search_scope", search_scope, SEARCH_SCOPE_CHOICES)
    if contact_data_shape and contact_data_shape not in SHAPE_CHOICES:
        raise InvalidEnumValue("contact_data_shape", contact_data_shape, SHAPE_CHOICES)
    self.return_full_contact_data = return_full_contact_data
    return self._elems_to_objs(
        self._chunked_get_elements(
            self.get_payload,
            items=unresolved_entries,
            parent_folders=parent_folders,
            return_full_contact_data=return_full_contact_data,
            search_scope=search_scope,
            contact_data_shape=contact_data_shape,
        )
    )
def get_payload(self, unresolved_entries, parent_folders, return_full_contact_data, search_scope, contact_data_shape)
Expand source code
def get_payload(
    self, unresolved_entries, parent_folders, return_full_contact_data, search_scope, contact_data_shape
):
    attrs = dict(ReturnFullContactData=return_full_contact_data)
    if search_scope:
        attrs["SearchScope"] = search_scope
    if contact_data_shape:
        if self.protocol.version.build < EXCHANGE_2010_SP2:
            raise NotImplementedError(
                "'contact_data_shape' is only supported for Exchange 2010 SP2 servers and later"
            )
        attrs["ContactDataShape"] = contact_data_shape
    payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs)
    if parent_folders:
        payload.append(
            folder_ids_element(folders=parent_folders, version=self.protocol.version, tag="m:ParentFolderIds")
        )
    for entry in unresolved_entries:
        add_xml_child(payload, "m:UnresolvedEntry", entry)
    return payload

Inherited members

class SendItem (*args, **kwargs)
Expand source code
class SendItem(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/senditem-operation"""

    SERVICE_NAME = "SendItem"
    returns_elements = False

    def call(self, items, saved_item_folder):
        if saved_item_folder and not isinstance(saved_item_folder, (BaseFolder, FolderId)):
            raise InvalidTypeError("saved_item_folder", saved_item_folder, (BaseFolder, FolderId))
        return self._chunked_get_elements(self.get_payload, items=items, saved_item_folder=saved_item_folder)

    def get_payload(self, items, saved_item_folder):
        payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(SaveItemToFolder=bool(saved_item_folder)))
        payload.append(item_ids_element(items=items, version=self.account.version))
        if saved_item_folder:
            payload.append(
                folder_ids_element(folders=[saved_item_folder], version=self.account.version, tag="m:SavedItemFolderId")
            )
        return payload

Ancestors

Class variables

var SERVICE_NAME
var returns_elements

Methods

def call(self, items, saved_item_folder)
Expand source code
def call(self, items, saved_item_folder):
    if saved_item_folder and not isinstance(saved_item_folder, (BaseFolder, FolderId)):
        raise InvalidTypeError("saved_item_folder", saved_item_folder, (BaseFolder, FolderId))
    return self._chunked_get_elements(self.get_payload, items=items, saved_item_folder=saved_item_folder)
def get_payload(self, items, saved_item_folder)
Expand source code
def get_payload(self, items, saved_item_folder):
    payload = create_element(f"m:{self.SERVICE_NAME}", attrs=dict(SaveItemToFolder=bool(saved_item_folder)))
    payload.append(item_ids_element(items=items, version=self.account.version))
    if saved_item_folder:
        payload.append(
            folder_ids_element(folders=[saved_item_folder], version=self.account.version, tag="m:SavedItemFolderId")
        )
    return payload

Inherited members

class SendNotification (protocol, chunk_size=None, timeout=None)

MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/sendnotification

This service is implemented backwards compared to other services. We use it to parse the XML body of push notifications we receive on the callback URL defined in a push subscription, and to create responses to these push notifications.

Expand source code
class SendNotification(EWSService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/sendnotification

    This service is implemented backwards compared to other services. We use it to parse the XML body of push
    notifications we receive on the callback URL defined in a push subscription, and to create responses to these
    push notifications.
    """

    SERVICE_NAME = "SendNotification"
    OK = "OK"
    UNSUBSCRIBE = "Unsubscribe"
    STATUS_CHOICES = (OK, UNSUBSCRIBE)

    def ok_payload(self):
        return self.wrap(content=self.get_payload(status=self.OK))

    def unsubscribe_payload(self):
        return self.wrap(content=self.get_payload(status=self.UNSUBSCRIBE))

    def _elem_to_obj(self, elem):
        return Notification.from_xml(elem=elem, account=None)

    @classmethod
    def _response_tag(cls):
        """Return the name of the element containing the service response."""
        return f"{{{MNS}}}{cls.SERVICE_NAME}"

    @classmethod
    def _get_elements_in_container(cls, container):
        return container.findall(Notification.response_tag())

    def get_payload(self, status):
        if status not in self.STATUS_CHOICES:
            raise InvalidEnumValue("status", status, self.STATUS_CHOICES)
        payload = create_element(f"m:{self.SERVICE_NAME}Result")
        add_xml_child(payload, "m:SubscriptionStatus", status)
        return payload

Ancestors

Class variables

var OK
var SERVICE_NAME
var STATUS_CHOICES
var UNSUBSCRIBE

Methods

def get_payload(self, status)
Expand source code
def get_payload(self, status):
    if status not in self.STATUS_CHOICES:
        raise InvalidEnumValue("status", status, self.STATUS_CHOICES)
    payload = create_element(f"m:{self.SERVICE_NAME}Result")
    add_xml_child(payload, "m:SubscriptionStatus", status)
    return payload
def ok_payload(self)
Expand source code
def ok_payload(self):
    return self.wrap(content=self.get_payload(status=self.OK))
def unsubscribe_payload(self)
Expand source code
def unsubscribe_payload(self):
    return self.wrap(content=self.get_payload(status=self.UNSUBSCRIBE))

Inherited members

class SetInboxRule (*args, **kwargs)
Expand source code
class SetInboxRule(UpdateInboxRules):
    """
    MSDN:
    https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/updateinboxrules-operation#updateinboxrules-set-rule-request-example
    """

    def _get_operation(self, rule):
        return Operations(set_rule_operation=SetRuleOperation(rule=rule))

Ancestors

Inherited members

class SetUserOofSettings (*args, **kwargs)
Expand source code
class SetUserOofSettings(EWSAccountService):
    """Set automatic replies for the specified mailbox.
    MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/setuseroofsettings-operation
    """

    SERVICE_NAME = "SetUserOofSettings"
    returns_elements = False

    def call(self, oof_settings, mailbox):
        if not isinstance(oof_settings, OofSettings):
            raise InvalidTypeError("oof_settings", oof_settings, OofSettings)
        if not isinstance(mailbox, Mailbox):
            raise InvalidTypeError("mailbox", mailbox, Mailbox)
        return self._get_elements(payload=self.get_payload(oof_settings=oof_settings, mailbox=mailbox))

    def get_payload(self, oof_settings, mailbox):
        payload = create_element(f"m:{self.SERVICE_NAME}Request")
        set_xml_value(payload, AvailabilityMailbox.from_mailbox(mailbox), version=self.account.version)
        return set_xml_value(payload, oof_settings, version=self.account.version)

    def _get_element_container(self, message, name=None):
        message = message.find(self._response_message_tag())
        return super()._get_element_container(message=message, name=name)

    @classmethod
    def _response_message_tag(cls):
        return f"{{{MNS}}}ResponseMessage"

Ancestors

Class variables

var SERVICE_NAME
var returns_elements

Methods

def call(self, oof_settings, mailbox)
Expand source code
def call(self, oof_settings, mailbox):
    if not isinstance(oof_settings, OofSettings):
        raise InvalidTypeError("oof_settings", oof_settings, OofSettings)
    if not isinstance(mailbox, Mailbox):
        raise InvalidTypeError("mailbox", mailbox, Mailbox)
    return self._get_elements(payload=self.get_payload(oof_settings=oof_settings, mailbox=mailbox))
def get_payload(self, oof_settings, mailbox)
Expand source code
def get_payload(self, oof_settings, mailbox):
    payload = create_element(f"m:{self.SERVICE_NAME}Request")
    set_xml_value(payload, AvailabilityMailbox.from_mailbox(mailbox), version=self.account.version)
    return set_xml_value(payload, oof_settings, version=self.account.version)

Inherited members

class SubscribeToPull (*args, **kwargs)
Expand source code
class SubscribeToPull(Subscribe):
    # https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/pullsubscriptionrequest
    subscription_request_elem_tag = "m:PullSubscriptionRequest"
    prefer_affinity = True

    def call(self, folders, event_types, watermark, timeout):
        yield from self._partial_call(
            payload_func=self.get_payload,
            folders=folders,
            event_types=event_types,
            timeout=timeout,
            watermark=watermark,
        )

    def get_payload(self, folders, event_types, watermark, timeout):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        request_elem = self._partial_payload(folders=folders, event_types=event_types)
        if watermark:
            add_xml_child(request_elem, "m:Watermark", watermark)
        add_xml_child(request_elem, "t:Timeout", timeout)  # In minutes
        payload.append(request_elem)
        return payload

Ancestors

Class variables

var prefer_affinity
var subscription_request_elem_tag

Methods

def call(self, folders, event_types, watermark, timeout)
Expand source code
def call(self, folders, event_types, watermark, timeout):
    yield from self._partial_call(
        payload_func=self.get_payload,
        folders=folders,
        event_types=event_types,
        timeout=timeout,
        watermark=watermark,
    )
def get_payload(self, folders, event_types, watermark, timeout)
Expand source code
def get_payload(self, folders, event_types, watermark, timeout):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    request_elem = self._partial_payload(folders=folders, event_types=event_types)
    if watermark:
        add_xml_child(request_elem, "m:Watermark", watermark)
    add_xml_child(request_elem, "t:Timeout", timeout)  # In minutes
    payload.append(request_elem)
    return payload

Inherited members

class SubscribeToPush (*args, **kwargs)
Expand source code
class SubscribeToPush(Subscribe):
    # https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/pushsubscriptionrequest
    subscription_request_elem_tag = "m:PushSubscriptionRequest"

    def call(self, folders, event_types, watermark, status_frequency, url):
        yield from self._partial_call(
            payload_func=self.get_payload,
            folders=folders,
            event_types=event_types,
            status_frequency=status_frequency,
            url=url,
            watermark=watermark,
        )

    def get_payload(self, folders, event_types, watermark, status_frequency, url):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        request_elem = self._partial_payload(folders=folders, event_types=event_types)
        if watermark:
            add_xml_child(request_elem, "m:Watermark", watermark)
        add_xml_child(request_elem, "t:StatusFrequency", status_frequency)  # In minutes
        add_xml_child(request_elem, "t:URL", url)
        payload.append(request_elem)
        return payload

Ancestors

Class variables

var subscription_request_elem_tag

Methods

def call(self, folders, event_types, watermark, status_frequency, url)
Expand source code
def call(self, folders, event_types, watermark, status_frequency, url):
    yield from self._partial_call(
        payload_func=self.get_payload,
        folders=folders,
        event_types=event_types,
        status_frequency=status_frequency,
        url=url,
        watermark=watermark,
    )
def get_payload(self, folders, event_types, watermark, status_frequency, url)
Expand source code
def get_payload(self, folders, event_types, watermark, status_frequency, url):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    request_elem = self._partial_payload(folders=folders, event_types=event_types)
    if watermark:
        add_xml_child(request_elem, "m:Watermark", watermark)
    add_xml_child(request_elem, "t:StatusFrequency", status_frequency)  # In minutes
    add_xml_child(request_elem, "t:URL", url)
    payload.append(request_elem)
    return payload

Inherited members

class SubscribeToStreaming (*args, **kwargs)
Expand source code
class SubscribeToStreaming(Subscribe):
    # https://learn.microsoft.com/en-us/exchange/client-developer/web-service-reference/streamingsubscriptionrequest
    subscription_request_elem_tag = "m:StreamingSubscriptionRequest"
    prefer_affinity = True

    def call(self, folders, event_types):
        yield from self._partial_call(payload_func=self.get_payload, folders=folders, event_types=event_types)

    def _elem_to_obj(self, elem):
        return elem.text

    @classmethod
    def _get_elements_in_container(cls, container):
        return [container.find(f"{{{MNS}}}SubscriptionId")]

    def get_payload(self, folders, event_types):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        payload.append(self._partial_payload(folders=folders, event_types=event_types))
        return payload

Ancestors

Class variables

var prefer_affinity
var subscription_request_elem_tag

Methods

def call(self, folders, event_types)
Expand source code
def call(self, folders, event_types):
    yield from self._partial_call(payload_func=self.get_payload, folders=folders, event_types=event_types)
def get_payload(self, folders, event_types)
Expand source code
def get_payload(self, folders, event_types):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    payload.append(self._partial_payload(folders=folders, event_types=event_types))
    return payload

Inherited members

class SyncFolderHierarchy (*args, **kwargs)
Expand source code
class SyncFolderHierarchy(SyncFolder):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/syncfolderhierarchy-operation
    """

    SERVICE_NAME = "SyncFolderHierarchy"
    shape_tag = "m:FolderShape"
    last_in_range_name = f"{{{MNS}}}IncludesLastFolderInRange"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.folder = None  # A hack to communicate parsing args to _elems_to_objs()

    def call(self, folder, shape, additional_fields, sync_state):
        self.sync_state = sync_state
        self.folder = folder
        return self._elems_to_objs(
            self._get_elements(
                payload=self.get_payload(
                    folder=folder,
                    shape=shape,
                    additional_fields=additional_fields,
                    sync_state=sync_state,
                )
            )
        )

    def _elem_to_obj(self, elem):
        change_type = self.change_types_map[elem.tag]
        if change_type == self.DELETE:
            folder = FolderId.from_xml(elem=elem.find(FolderId.response_tag()), account=self.account)
        else:
            # We can't find() the element because we don't know which tag to look for. The change element can
            # contain multiple folder types, each with their own tag.
            folder_elem = elem[0]
            folder = parse_folder_elem(elem=folder_elem, folder=self.folder, account=self.account)
        return change_type, folder

    def get_payload(self, folder, shape, additional_fields, sync_state):
        return self._partial_get_payload(
            folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state
        )

Ancestors

Class variables

var SERVICE_NAME
var last_in_range_name
var shape_tag

Methods

def call(self, folder, shape, additional_fields, sync_state)
Expand source code
def call(self, folder, shape, additional_fields, sync_state):
    self.sync_state = sync_state
    self.folder = folder
    return self._elems_to_objs(
        self._get_elements(
            payload=self.get_payload(
                folder=folder,
                shape=shape,
                additional_fields=additional_fields,
                sync_state=sync_state,
            )
        )
    )
def get_payload(self, folder, shape, additional_fields, sync_state)
Expand source code
def get_payload(self, folder, shape, additional_fields, sync_state):
    return self._partial_get_payload(
        folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state
    )

Inherited members

class SyncFolderItems (*args, **kwargs)
Expand source code
class SyncFolderItems(SyncFolder):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/syncfolderitems-operation
    """

    SERVICE_NAME = "SyncFolderItems"
    SYNC_SCOPES = (
        "NormalItems",
        "NormalAndAssociatedItems",
    )
    # Extra change type
    READ_FLAG_CHANGE = "read_flag_change"
    CHANGE_TYPES = SyncFolder.CHANGE_TYPES + (READ_FLAG_CHANGE,)
    shape_tag = "m:ItemShape"
    last_in_range_name = f"{{{MNS}}}IncludesLastItemInRange"
    change_types_map = SyncFolder.change_types_map
    change_types_map[f"{{{TNS}}}ReadFlagChange"] = READ_FLAG_CHANGE

    def call(self, folder, shape, additional_fields, sync_state, ignore, max_changes_returned, sync_scope):
        self.sync_state = sync_state
        if max_changes_returned is None:
            max_changes_returned = 100
        if not isinstance(max_changes_returned, int):
            raise InvalidTypeError("max_changes_returned", max_changes_returned, int)
        if max_changes_returned <= 0:
            raise ValueError(f"'max_changes_returned' {max_changes_returned} must be a positive integer")
        if sync_scope is not None and sync_scope not in self.SYNC_SCOPES:
            raise InvalidEnumValue("sync_scope", sync_scope, self.SYNC_SCOPES)
        return self._elems_to_objs(
            self._get_elements(
                payload=self.get_payload(
                    folder=folder,
                    shape=shape,
                    additional_fields=additional_fields,
                    sync_state=sync_state,
                    ignore=ignore,
                    max_changes_returned=max_changes_returned,
                    sync_scope=sync_scope,
                )
            )
        )

    def _elem_to_obj(self, elem):
        change_type = self.change_types_map[elem.tag]
        if change_type == self.READ_FLAG_CHANGE:
            item = (
                ItemId.from_xml(elem=elem.find(ItemId.response_tag()), account=self.account),
                xml_text_to_value(elem.find(f"{{{TNS}}}IsRead").text, bool),
            )
        elif change_type == self.DELETE:
            item = ItemId.from_xml(elem=elem.find(ItemId.response_tag()), account=self.account)
        else:
            # We can't find() the element because we don't know which tag to look for. The change element can
            # contain multiple item types, each with their own tag.
            item_elem = elem[0]
            item = BaseFolder.item_model_from_tag(item_elem.tag).from_xml(elem=item_elem, account=self.account)
        return change_type, item

    def get_payload(self, folder, shape, additional_fields, sync_state, ignore, max_changes_returned, sync_scope):
        sync_folder_items = self._partial_get_payload(
            folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state
        )
        is_empty, ignore = (True, None) if ignore is None else peek(ignore)
        if not is_empty:
            sync_folder_items.append(item_ids_element(items=ignore, version=self.account.version, tag="m:Ignore"))
        add_xml_child(sync_folder_items, "m:MaxChangesReturned", max_changes_returned)
        if sync_scope:
            add_xml_child(sync_folder_items, "m:SyncScope", sync_scope)
        return sync_folder_items

Ancestors

Class variables

var CHANGE_TYPES
var READ_FLAG_CHANGE
var SERVICE_NAME
var SYNC_SCOPES
var change_types_map
var last_in_range_name
var shape_tag

Methods

def call(self, folder, shape, additional_fields, sync_state, ignore, max_changes_returned, sync_scope)
Expand source code
def call(self, folder, shape, additional_fields, sync_state, ignore, max_changes_returned, sync_scope):
    self.sync_state = sync_state
    if max_changes_returned is None:
        max_changes_returned = 100
    if not isinstance(max_changes_returned, int):
        raise InvalidTypeError("max_changes_returned", max_changes_returned, int)
    if max_changes_returned <= 0:
        raise ValueError(f"'max_changes_returned' {max_changes_returned} must be a positive integer")
    if sync_scope is not None and sync_scope not in self.SYNC_SCOPES:
        raise InvalidEnumValue("sync_scope", sync_scope, self.SYNC_SCOPES)
    return self._elems_to_objs(
        self._get_elements(
            payload=self.get_payload(
                folder=folder,
                shape=shape,
                additional_fields=additional_fields,
                sync_state=sync_state,
                ignore=ignore,
                max_changes_returned=max_changes_returned,
                sync_scope=sync_scope,
            )
        )
    )
def get_payload(self, folder, shape, additional_fields, sync_state, ignore, max_changes_returned, sync_scope)
Expand source code
def get_payload(self, folder, shape, additional_fields, sync_state, ignore, max_changes_returned, sync_scope):
    sync_folder_items = self._partial_get_payload(
        folder=folder, shape=shape, additional_fields=additional_fields, sync_state=sync_state
    )
    is_empty, ignore = (True, None) if ignore is None else peek(ignore)
    if not is_empty:
        sync_folder_items.append(item_ids_element(items=ignore, version=self.account.version, tag="m:Ignore"))
    add_xml_child(sync_folder_items, "m:MaxChangesReturned", max_changes_returned)
    if sync_scope:
        add_xml_child(sync_folder_items, "m:SyncScope", sync_scope)
    return sync_folder_items

Inherited members

class Unsubscribe (*args, **kwargs)

Unsubscribing is only valid for pull and streaming notifications.

MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/unsubscribe-operation

Expand source code
class Unsubscribe(EWSAccountService):
    """Unsubscribing is only valid for pull and streaming notifications.

    MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/unsubscribe-operation
    """

    SERVICE_NAME = "Unsubscribe"
    returns_elements = False
    prefer_affinity = True

    def call(self, subscription_id):
        return self._get_elements(payload=self.get_payload(subscription_id=subscription_id))

    def get_payload(self, subscription_id):
        payload = create_element(f"m:{self.SERVICE_NAME}")
        add_xml_child(payload, "m:SubscriptionId", subscription_id)
        return payload

Ancestors

Class variables

var SERVICE_NAME
var prefer_affinity
var returns_elements

Methods

def call(self, subscription_id)
Expand source code
def call(self, subscription_id):
    return self._get_elements(payload=self.get_payload(subscription_id=subscription_id))
def get_payload(self, subscription_id)
Expand source code
def get_payload(self, subscription_id):
    payload = create_element(f"m:{self.SERVICE_NAME}")
    add_xml_child(payload, "m:SubscriptionId", subscription_id)
    return payload

Inherited members

class UpdateFolder (*args, **kwargs)
Expand source code
class UpdateFolder(BaseUpdateService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/updatefolder-operation"""

    SERVICE_NAME = "UpdateFolder"
    SET_FIELD_ELEMENT_NAME = "t:SetFolderField"
    DELETE_FIELD_ELEMENT_NAME = "t:DeleteFolderField"
    CHANGE_ELEMENT_NAME = "t:FolderChange"
    CHANGES_ELEMENT_NAME = "m:FolderChanges"
    element_container_name = f"{{{MNS}}}Folders"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.folders = []  # A hack to communicate parsing args to _elems_to_objs()

    def call(self, folders):
        # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same
        # class as the folder instance it was requested with.
        self.folders = list(folders)  # Convert to a list, in case 'folders' is a generator. We're iterating twice.
        return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=self.folders))

    def _elems_to_objs(self, elems):
        for (folder, _), elem in zip(self.folders, elems):
            if isinstance(elem, Exception):
                yield elem
                continue
            yield parse_folder_elem(elem=elem, folder=folder, account=self.account)

    @staticmethod
    def _target_elem(target):
        return to_item_id(target, FolderId)

    def get_payload(self, folders):
        # Takes a list of (Folder, fieldnames) tuples where 'Folder' is an instance of a subclass of Folder and
        # 'fieldnames' are the attribute names that were updated.
        payload = create_element(f"m:{self.SERVICE_NAME}")
        payload.append(self._changes_elem(target_changes=folders))
        return payload

Ancestors

Class variables

var CHANGES_ELEMENT_NAME
var CHANGE_ELEMENT_NAME
var DELETE_FIELD_ELEMENT_NAME
var SERVICE_NAME
var SET_FIELD_ELEMENT_NAME
var element_container_name

Methods

def call(self, folders)
Expand source code
def call(self, folders):
    # We can't easily find the correct folder class from the returned XML. Instead, return objects with the same
    # class as the folder instance it was requested with.
    self.folders = list(folders)  # Convert to a list, in case 'folders' is a generator. We're iterating twice.
    return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=self.folders))
def get_payload(self, folders)
Expand source code
def get_payload(self, folders):
    # Takes a list of (Folder, fieldnames) tuples where 'Folder' is an instance of a subclass of Folder and
    # 'fieldnames' are the attribute names that were updated.
    payload = create_element(f"m:{self.SERVICE_NAME}")
    payload.append(self._changes_elem(target_changes=folders))
    return payload

Inherited members

class UpdateItem (*args, **kwargs)
Expand source code
class UpdateItem(BaseUpdateService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/updateitem-operation"""

    SERVICE_NAME = "UpdateItem"
    SET_FIELD_ELEMENT_NAME = "t:SetItemField"
    DELETE_FIELD_ELEMENT_NAME = "t:DeleteItemField"
    CHANGE_ELEMENT_NAME = "t:ItemChange"
    CHANGES_ELEMENT_NAME = "m:ItemChanges"
    element_container_name = f"{{{MNS}}}Items"

    def call(
        self,
        items,
        conflict_resolution,
        message_disposition,
        send_meeting_invitations_or_cancellations,
        suppress_read_receipts,
    ):
        if conflict_resolution not in CONFLICT_RESOLUTION_CHOICES:
            raise InvalidEnumValue("conflict_resolution", conflict_resolution, CONFLICT_RESOLUTION_CHOICES)
        if message_disposition not in MESSAGE_DISPOSITION_CHOICES:
            raise InvalidEnumValue("message_disposition", message_disposition, MESSAGE_DISPOSITION_CHOICES)
        if send_meeting_invitations_or_cancellations not in SEND_MEETING_INVITATIONS_AND_CANCELLATIONS_CHOICES:
            raise InvalidEnumValue(
                "send_meeting_invitations_or_cancellations",
                send_meeting_invitations_or_cancellations,
                SEND_MEETING_INVITATIONS_AND_CANCELLATIONS_CHOICES,
            )
        if message_disposition == SEND_ONLY:
            raise ValueError("Cannot send-only existing objects. Use SendItem service instead")
        return self._elems_to_objs(
            self._chunked_get_elements(
                self.get_payload,
                items=items,
                conflict_resolution=conflict_resolution,
                message_disposition=message_disposition,
                send_meeting_invitations_or_cancellations=send_meeting_invitations_or_cancellations,
                suppress_read_receipts=suppress_read_receipts,
            )
        )

    def _elem_to_obj(self, elem):
        return Item.id_from_xml(elem)

    def _update_elems(self, target, fieldnames):
        fieldnames_copy = list(fieldnames)

        if target.__class__ == CalendarItem:
            # For CalendarItem items where we update 'start' or 'end', we want to update internal timezone fields
            target.clean_timezone_fields(version=self.account.version)  # Possibly also sets timezone values
            for field_name in ("start", "end"):
                if field_name in fieldnames_copy:
                    tz_field_name = target.tz_field_for_field_name(field_name).name
                    if tz_field_name not in fieldnames_copy:
                        fieldnames_copy.append(tz_field_name)

        yield from super()._update_elems(target=target, fieldnames=fieldnames_copy)

    def _get_value(self, target, field):
        value = super()._get_value(target, field)

        if target.__class__ == CalendarItem:
            # For CalendarItem items where we update 'start' or 'end', we want to send values in the local timezone
            if field.name in ("start", "end"):
                if type(value) is EWSDate:
                    # EWS always expects a datetime
                    return target.date_to_datetime(field_name=field.name)
                tz_field_name = target.tz_field_for_field_name(field.name).name
                return value.astimezone(getattr(target, tz_field_name))

        return value

    @staticmethod
    def _target_elem(target):
        return to_item_id(target, ItemId)

    def get_payload(
        self,
        items,
        conflict_resolution,
        message_disposition,
        send_meeting_invitations_or_cancellations,
        suppress_read_receipts,
    ):
        # Takes a list of (Item, fieldnames) tuples where 'Item' is an instance of a subclass of Item and 'fieldnames'
        # are the attribute names that were updated.
        attrs = dict(
            ConflictResolution=conflict_resolution,
            MessageDisposition=message_disposition,
            SendMeetingInvitationsOrCancellations=send_meeting_invitations_or_cancellations,
        )
        if self.account.version.build >= EXCHANGE_2013_SP1:
            attrs["SuppressReadReceipts"] = suppress_read_receipts
        payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs)
        payload.append(self._changes_elem(target_changes=items))
        return payload

Ancestors

Class variables

var CHANGES_ELEMENT_NAME
var CHANGE_ELEMENT_NAME
var DELETE_FIELD_ELEMENT_NAME
var SERVICE_NAME
var SET_FIELD_ELEMENT_NAME
var element_container_name

Methods

def call(self, items, conflict_resolution, message_disposition, send_meeting_invitations_or_cancellations, suppress_read_receipts)
Expand source code
def call(
    self,
    items,
    conflict_resolution,
    message_disposition,
    send_meeting_invitations_or_cancellations,
    suppress_read_receipts,
):
    if conflict_resolution not in CONFLICT_RESOLUTION_CHOICES:
        raise InvalidEnumValue("conflict_resolution", conflict_resolution, CONFLICT_RESOLUTION_CHOICES)
    if message_disposition not in MESSAGE_DISPOSITION_CHOICES:
        raise InvalidEnumValue("message_disposition", message_disposition, MESSAGE_DISPOSITION_CHOICES)
    if send_meeting_invitations_or_cancellations not in SEND_MEETING_INVITATIONS_AND_CANCELLATIONS_CHOICES:
        raise InvalidEnumValue(
            "send_meeting_invitations_or_cancellations",
            send_meeting_invitations_or_cancellations,
            SEND_MEETING_INVITATIONS_AND_CANCELLATIONS_CHOICES,
        )
    if message_disposition == SEND_ONLY:
        raise ValueError("Cannot send-only existing objects. Use SendItem service instead")
    return self._elems_to_objs(
        self._chunked_get_elements(
            self.get_payload,
            items=items,
            conflict_resolution=conflict_resolution,
            message_disposition=message_disposition,
            send_meeting_invitations_or_cancellations=send_meeting_invitations_or_cancellations,
            suppress_read_receipts=suppress_read_receipts,
        )
    )
def get_payload(self, items, conflict_resolution, message_disposition, send_meeting_invitations_or_cancellations, suppress_read_receipts)
Expand source code
def get_payload(
    self,
    items,
    conflict_resolution,
    message_disposition,
    send_meeting_invitations_or_cancellations,
    suppress_read_receipts,
):
    # Takes a list of (Item, fieldnames) tuples where 'Item' is an instance of a subclass of Item and 'fieldnames'
    # are the attribute names that were updated.
    attrs = dict(
        ConflictResolution=conflict_resolution,
        MessageDisposition=message_disposition,
        SendMeetingInvitationsOrCancellations=send_meeting_invitations_or_cancellations,
    )
    if self.account.version.build >= EXCHANGE_2013_SP1:
        attrs["SuppressReadReceipts"] = suppress_read_receipts
    payload = create_element(f"m:{self.SERVICE_NAME}", attrs=attrs)
    payload.append(self._changes_elem(target_changes=items))
    return payload

Inherited members

class UpdateUserConfiguration (*args, **kwargs)
Expand source code
class UpdateUserConfiguration(EWSAccountService):
    """MSDN:
    https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/updateuserconfiguration-operation
    """

    SERVICE_NAME = "UpdateUserConfiguration"
    returns_elements = False

    def call(self, user_configuration):
        return self._get_elements(payload=self.get_payload(user_configuration=user_configuration))

    def get_payload(self, user_configuration):
        return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), user_configuration, version=self.account.version)

Ancestors

Class variables

var SERVICE_NAME
var returns_elements

Methods

def call(self, user_configuration)
Expand source code
def call(self, user_configuration):
    return self._get_elements(payload=self.get_payload(user_configuration=user_configuration))
def get_payload(self, user_configuration)
Expand source code
def get_payload(self, user_configuration):
    return set_xml_value(create_element(f"m:{self.SERVICE_NAME}"), user_configuration, version=self.account.version)

Inherited members

class UploadItems (*args, **kwargs)
Expand source code
class UploadItems(EWSAccountService):
    """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/uploaditems-operation"""

    SERVICE_NAME = "UploadItems"
    element_container_name = f"{{{MNS}}}ItemId"

    def call(self, items):
        # _pool_requests expects 'items', not 'data'
        return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items))

    def get_payload(self, items):
        """Upload given items to given account.

        'items' is an iterable of tuples where the first element is a Folder instance representing the ParentFolder
        that the item will be placed in and the second element is a tuple containing an optional ItemId, an optional
        Item.is_associated boolean, and a Data string returned from an ExportItems.
        call.

        :param items:
        """
        payload = create_element(f"m:{self.SERVICE_NAME}")
        items_elem = create_element("m:Items")
        payload.append(items_elem)
        for parent_folder, (item_id, is_associated, data_str) in items:
            # TODO: The full spec also allows the "UpdateOrCreate" create action.
            attrs = dict(CreateAction="Update" if item_id else "CreateNew")
            if is_associated is not None:
                attrs["IsAssociated"] = is_associated
            item = create_element("t:Item", attrs=attrs)
            set_xml_value(item, ParentFolderId(parent_folder.id, parent_folder.changekey), version=self.account.version)
            if item_id:
                set_xml_value(item, to_item_id(item_id, ItemId), version=self.account.version)
            add_xml_child(item, "t:Data", data_str)
            items_elem.append(item)
        return payload

    def _elem_to_obj(self, elem):
        return elem.get(ItemId.ID_ATTR), elem.get(ItemId.CHANGEKEY_ATTR)

    @classmethod
    def _get_elements_in_container(cls, container):
        return [container]

Ancestors

Class variables

var SERVICE_NAME
var element_container_name

Methods

def call(self, items)
Expand source code
def call(self, items):
    # _pool_requests expects 'items', not 'data'
    return self._elems_to_objs(self._chunked_get_elements(self.get_payload, items=items))
def get_payload(self, items)

Upload given items to given account.

'items' is an iterable of tuples where the first element is a Folder instance representing the ParentFolder that the item will be placed in and the second element is a tuple containing an optional ItemId, an optional Item.is_associated boolean, and a Data string returned from an ExportItems. call.

:param items:

Expand source code
def get_payload(self, items):
    """Upload given items to given account.

    'items' is an iterable of tuples where the first element is a Folder instance representing the ParentFolder
    that the item will be placed in and the second element is a tuple containing an optional ItemId, an optional
    Item.is_associated boolean, and a Data string returned from an ExportItems.
    call.

    :param items:
    """
    payload = create_element(f"m:{self.SERVICE_NAME}")
    items_elem = create_element("m:Items")
    payload.append(items_elem)
    for parent_folder, (item_id, is_associated, data_str) in items:
        # TODO: The full spec also allows the "UpdateOrCreate" create action.
        attrs = dict(CreateAction="Update" if item_id else "CreateNew")
        if is_associated is not None:
            attrs["IsAssociated"] = is_associated
        item = create_element("t:Item", attrs=attrs)
        set_xml_value(item, ParentFolderId(parent_folder.id, parent_folder.changekey), version=self.account.version)
        if item_id:
            set_xml_value(item, to_item_id(item_id, ItemId), version=self.account.version)
        add_xml_child(item, "t:Data", data_str)
        items_elem.append(item)
    return payload

Inherited members