Module exchangelib.services.get_streaming_events
Expand source code
import logging
from ..errors import EWSError, InvalidTypeError
from ..properties import Notification
from ..util import MNS, DocumentYielder, DummyResponse, create_element, get_xml_attr, get_xml_attrs
from .common import EWSAccountService, add_xml_child
log = logging.getLogger(__name__)
xml_log = logging.getLogger(f"{__name__}.xml")
class GetStreamingEvents(EWSAccountService):
"""MSDN:
https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getstreamingevents-operation
"""
SERVICE_NAME = "GetStreamingEvents"
element_container_name = f"{{{MNS}}}Notifications"
prefer_affinity = True
# Connection status values
OK = "OK"
CLOSED = "Closed"
def __init__(self, *args, **kwargs):
# These values are set each time call() is consumed
self.connection_status = None
super().__init__(*args, **kwargs)
self.streaming = True
def call(self, subscription_ids, connection_timeout):
if not isinstance(connection_timeout, int):
raise InvalidTypeError("connection_timeout", connection_timeout, int)
if connection_timeout < 1:
raise ValueError(f"'connection_timeout' {connection_timeout} must be a positive integer")
# Add 60 seconds to the timeout, to allow us to always get the final message containing ConnectionStatus=Closed
self.timeout = connection_timeout * 60 + 60
return self._elems_to_objs(
self._get_elements(
payload=self.get_payload(
subscription_ids=subscription_ids,
connection_timeout=connection_timeout,
)
)
)
def _elem_to_obj(self, elem):
return Notification.from_xml(elem=elem, account=None)
@classmethod
def _get_soap_parts(cls, response, **parse_opts):
# Pass the response unaltered. We want to use our custom document yielder
return None, response
def _get_soap_messages(self, body, **parse_opts):
# 'body' is actually the raw response passed on by '_get_soap_parts'. We want to continuously read the content,
# looking for complete XML documents. When we have a full document, we want to parse it as if it was a normal
# XML response.
r = body
for i, doc in enumerate(DocumentYielder(r.iter_content()), start=1):
xml_log.debug("Response XML (docs counter: %(i)s): %(xml_response)s", dict(i=i, xml_response=doc))
response = DummyResponse(content=doc)
try:
_, body = super()._get_soap_parts(response=response, **parse_opts)
except Exception:
r.close() # Release memory
raise
# TODO: We're skipping ._update_api_version() here because we don't have access to the 'api_version' used.
# TODO: We should be doing a lot of error handling for ._get_soap_messages().
yield from super()._get_soap_messages(body=body, **parse_opts)
if self.connection_status == self.CLOSED:
# Don't wait for the TCP connection to timeout
break
def _get_element_container(self, message, name=None):
error_ids_elem = message.find(f"{{{MNS}}}ErrorSubscriptionIds")
error_ids = [] if error_ids_elem is None else get_xml_attrs(error_ids_elem, f"{{{MNS}}}SubscriptionId")
self.connection_status = get_xml_attr(message, f"{{{MNS}}}ConnectionStatus") # Either 'OK' or 'Closed'
log.debug("Connection status is: %s", self.connection_status)
# Upstream normally expects to find a 'name' tag but our response does not always have it. We still want to
# call upstream, to have exceptions raised. Return an empty list if there is no 'name' tag and no errors.
if message.find(name) is None:
name = None
try:
res = super()._get_element_container(message=message, name=name)
except EWSError as e:
# When the request contains a combination of good and failing subscription IDs, notifications for the good
# subscriptions seem to never be returned even though the XML spec allows it. This means there's no point in
# trying to collect any notifications here and delivering a combination of errors and return values.
if error_ids:
e.value += f" (subscription IDs: {error_ids})"
raise e
return [] if name is None else res
def get_payload(self, subscription_ids, connection_timeout):
payload = create_element(f"m:{self.SERVICE_NAME}")
subscriptions_elem = create_element("m:SubscriptionIds")
for subscription_id in subscription_ids:
add_xml_child(subscriptions_elem, "t:SubscriptionId", subscription_id)
if not len(subscriptions_elem):
raise ValueError("'subscription_ids' must not be empty")
payload.append(subscriptions_elem)
add_xml_child(payload, "m:ConnectionTimeout", connection_timeout)
return payload
Classes
class GetStreamingEvents (*args, **kwargs)
-
Expand source code
class GetStreamingEvents(EWSAccountService): """MSDN: https://docs.microsoft.com/en-us/exchange/client-developer/web-service-reference/getstreamingevents-operation """ SERVICE_NAME = "GetStreamingEvents" element_container_name = f"{{{MNS}}}Notifications" prefer_affinity = True # Connection status values OK = "OK" CLOSED = "Closed" def __init__(self, *args, **kwargs): # These values are set each time call() is consumed self.connection_status = None super().__init__(*args, **kwargs) self.streaming = True def call(self, subscription_ids, connection_timeout): if not isinstance(connection_timeout, int): raise InvalidTypeError("connection_timeout", connection_timeout, int) if connection_timeout < 1: raise ValueError(f"'connection_timeout' {connection_timeout} must be a positive integer") # Add 60 seconds to the timeout, to allow us to always get the final message containing ConnectionStatus=Closed self.timeout = connection_timeout * 60 + 60 return self._elems_to_objs( self._get_elements( payload=self.get_payload( subscription_ids=subscription_ids, connection_timeout=connection_timeout, ) ) ) def _elem_to_obj(self, elem): return Notification.from_xml(elem=elem, account=None) @classmethod def _get_soap_parts(cls, response, **parse_opts): # Pass the response unaltered. We want to use our custom document yielder return None, response def _get_soap_messages(self, body, **parse_opts): # 'body' is actually the raw response passed on by '_get_soap_parts'. We want to continuously read the content, # looking for complete XML documents. When we have a full document, we want to parse it as if it was a normal # XML response. r = body for i, doc in enumerate(DocumentYielder(r.iter_content()), start=1): xml_log.debug("Response XML (docs counter: %(i)s): %(xml_response)s", dict(i=i, xml_response=doc)) response = DummyResponse(content=doc) try: _, body = super()._get_soap_parts(response=response, **parse_opts) except Exception: r.close() # Release memory raise # TODO: We're skipping ._update_api_version() here because we don't have access to the 'api_version' used. # TODO: We should be doing a lot of error handling for ._get_soap_messages(). yield from super()._get_soap_messages(body=body, **parse_opts) if self.connection_status == self.CLOSED: # Don't wait for the TCP connection to timeout break def _get_element_container(self, message, name=None): error_ids_elem = message.find(f"{{{MNS}}}ErrorSubscriptionIds") error_ids = [] if error_ids_elem is None else get_xml_attrs(error_ids_elem, f"{{{MNS}}}SubscriptionId") self.connection_status = get_xml_attr(message, f"{{{MNS}}}ConnectionStatus") # Either 'OK' or 'Closed' log.debug("Connection status is: %s", self.connection_status) # Upstream normally expects to find a 'name' tag but our response does not always have it. We still want to # call upstream, to have exceptions raised. Return an empty list if there is no 'name' tag and no errors. if message.find(name) is None: name = None try: res = super()._get_element_container(message=message, name=name) except EWSError as e: # When the request contains a combination of good and failing subscription IDs, notifications for the good # subscriptions seem to never be returned even though the XML spec allows it. This means there's no point in # trying to collect any notifications here and delivering a combination of errors and return values. if error_ids: e.value += f" (subscription IDs: {error_ids})" raise e return [] if name is None else res def get_payload(self, subscription_ids, connection_timeout): payload = create_element(f"m:{self.SERVICE_NAME}") subscriptions_elem = create_element("m:SubscriptionIds") for subscription_id in subscription_ids: add_xml_child(subscriptions_elem, "t:SubscriptionId", subscription_id) if not len(subscriptions_elem): raise ValueError("'subscription_ids' must not be empty") payload.append(subscriptions_elem) add_xml_child(payload, "m:ConnectionTimeout", connection_timeout) return payload
Ancestors
Class variables
var CLOSED
var OK
var SERVICE_NAME
var element_container_name
var prefer_affinity
Methods
def call(self, subscription_ids, connection_timeout)
-
Expand source code
def call(self, subscription_ids, connection_timeout): if not isinstance(connection_timeout, int): raise InvalidTypeError("connection_timeout", connection_timeout, int) if connection_timeout < 1: raise ValueError(f"'connection_timeout' {connection_timeout} must be a positive integer") # Add 60 seconds to the timeout, to allow us to always get the final message containing ConnectionStatus=Closed self.timeout = connection_timeout * 60 + 60 return self._elems_to_objs( self._get_elements( payload=self.get_payload( subscription_ids=subscription_ids, connection_timeout=connection_timeout, ) ) )
def get_payload(self, subscription_ids, connection_timeout)
-
Expand source code
def get_payload(self, subscription_ids, connection_timeout): payload = create_element(f"m:{self.SERVICE_NAME}") subscriptions_elem = create_element("m:SubscriptionIds") for subscription_id in subscription_ids: add_xml_child(subscriptions_elem, "t:SubscriptionId", subscription_id) if not len(subscriptions_elem): raise ValueError("'subscription_ids' must not be empty") payload.append(subscriptions_elem) add_xml_child(payload, "m:ConnectionTimeout", connection_timeout) return payload
Inherited members