Microsoft Azure Event Grid Client Library for Python
Azure Event Grid is a fully-managed intelligent event routing service that allows for uniform event consumption using a publish-subscribe model.
Source code | Package (PyPI) | Package (Conda) | API reference documentation | Product documentation | Samples | Changelog
Install the Azure Event Grid client library for Python with pip:
pip install azure-eventgrid
If you use Azure CLI, replace <resource-group-name>
and <resource-name>
with your own unique names.
az eventgrid topic --create --location <location> --resource-group <resource-group-name> --name <resource-name>
az eventgrid domain --create --location <location> --resource-group <resource-group-name> --name <resource-name>
In order to interact with the Event Grid service, you will need to create an instance of a client. An endpoint and credential are necessary to instantiate the client object.
Azure Event Grid provides integration with Azure Active Directory (Azure AD) for identity-based authentication of requests. With Azure AD, you can use role-based access control (RBAC) to grant access to your Azure Event Grid resources to users, groups, or applications.
To send events to a topic or domain with a TokenCredential
, the authenticated identity should have the "EventGrid Data Sender" role assigned.
With the azure-identity
package, you can seamlessly authorize requests in both development and production environments. To learn more about Azure Active Directory, see the azure-identity
README.
For example, you can use DefaultAzureCredential
to construct a client which will authenticate using Azure Active Directory:
from azure.identity import DefaultAzureCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent
default_az_credential = DefaultAzureCredential()
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
client = EventGridPublisherClient(endpoint, default_az_credential)
You can find the topic endpoint within the Event Grid Topic resource on the Azure portal. This will look like:
"https://<event-grid-topic-name>.<topic-location>.eventgrid.azure.net/api/events"
To use an Access key as the credential
parameter,
pass the key as a string into an instance of AzureKeyCredential.
Note: The Access Key may be found in the azure portal in the "Access Keys" menu of the Event Grid Topic resource. They may also be obtained via the azure CLI, or the
azure-mgmt-eventgrid
library. A guide for getting access keys can be found here.
import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
topic_key = os.environ["EVENTGRID_TOPIC_KEY"]
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
credential_key = AzureKeyCredential(topic_key)
client = EventGridPublisherClient(endpoint, credential_key)
Note: A client may also be authenticated via SAS signature, using the
AzureSasCredential
. A sample demonstrating this, is available here (async_version).
Note: The
generate_sas
method can be used to generate a shared access signature. A sample demonstrating this can be seen here.
A topic is a channel within the EventGrid service to send events. The event schema that a topic accepts is decided at topic creation time. If events of a schema type are sent to a topic that requires a different schema type, errors will be raised.
An event domain is a management tool for large numbers of Event Grid topics related to the same application. They allow you to publish events to thousands of topics. Domains also give you authorization and authentication control over each topic. For more information, visit Event domain overview.
When you create an event domain, a publishing endpoint for this domain is made available to you. This process is similar to creating an Event Grid Topic. The only difference is that, when publishing to a domain, you must specify the topic within the domain that you'd like the event to be delivered to.
An event is the smallest amount of information that fully describes something that happened in the system. When a custom topic or domain is created, you must specify the schema that will be used when publishing events.
Event Grid supports multiple schemas for encoding events.
While you may configure your topic to use a custom schema, it is more common to use the already-defined Event Grid schema. See the specifications and requirements here.
Another option is to use the CloudEvents v1.0 schema. CloudEvents is a Cloud Native Computing Foundation project which produces a specification for describing event data in a common way. The service summary of CloudEvents can be found here.
EventGridPublisherClient
provides operations to send event data to a topic hostname specified during client initialization.
Regardless of the schema that your topic or domain is configured to use, EventGridPublisherClient
will be used to publish events to it. Use the send
method publishing events.
The following formats of events are allowed to be sent:
A list or a single instance of strongly typed EventGridEvents.
A dict representation of a serialized EventGridEvent object.
A list or a single instance of strongly typed CloudEvents.
A dict representation of a serialized CloudEvent object.
A dict representation of any Custom Schema.
Please have a look at the samples for detailed examples.
Note: It is important to know if your topic supports CloudEvents or EventGridEvents before publishing. If you send to a topic that does not support the schema of the event you are sending, send() will throw an exception.
A system topic in Event Grid represents one or more events published by Azure services such as Azure Storage or Azure Event Hubs. For example, a system topic may represent all blob events or only blob creation and blob deletion events published for a specific storage account.
The names of the various event types for the system events published to Azure Event Grid are available in azure.eventgrid.SystemEventNames
.
For complete list of recognizable system topics, visit System Topics.
For more information about the key concepts on Event Grid, see Concepts in Azure Event Grid.
Event Grid on Kubernetes with Azure Arc is an offering that allows you to run Event Grid on your own Kubernetes cluster. This capability is enabled by the use of Azure Arc enabled Kubernetes. Through Azure Arc enabled Kubernetes, a supported Kubernetes cluster connects to Azure. Once connected, you are able to install Event Grid on it. Learn more about it here.
Starting with v4.7.0, this package also supports publishing a CNCF cloud event from https://pypi.org/project/cloudevents/. You would be able to pass a CloudEvent object from this library to the send
API.
from cloudevents.http import CloudEvent
event = CloudEvent(...)
client.send(event)
The following sections provide several code snippets covering some of the most common Event Grid tasks, including:
This example publishes an Event Grid event.
import os
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent
key = os.environ["EG_ACCESS_KEY"]
endpoint = os.environ["EG_TOPIC_HOSTNAME"]
event = EventGridEvent(
data={"team": "azure-sdk"},
subject="Door1",
event_type="Azure.Sdk.Demo",
data_version="2.0"
)
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(event)
This example publishes a Cloud event.
import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient
key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]
event = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team": "azure-sdk"}
)
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(event)
It is possible to send events as a batch when sending multiple events to a topic or a domain. This example sends a list of CloudEvents using the send method.
WARNING: When sending a list of multiple events at one time, iterating over and sending each event will not result in optimal performance. For best performance, it is highly recommended to send a list of events.
import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient
key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]
event0 = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team": "azure-sdk"}
)
event1 = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team2": "azure-eventgrid"}
)
events = [event0, event1]
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(events)
A dict representation of respective serialized models can also be used to publish CloudEvent(s) or EventGridEvent(s) apart from the strongly typed objects.
Use a dict-like representation to send to a topic with custom schema as shown below.
import os
import uuid
import datetime as dt
from msrest.serialization import UTC
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient
key = os.environ["CUSTOM_SCHEMA_ACCESS_KEY"]
endpoint = os.environ["CUSTOM_SCHEMA_TOPIC_HOSTNAME"]
event = custom_schema_event = {
"customSubject": "sample",
"customEventType": "sample.event",
"customDataVersion": "2.0",
"customId": uuid.uuid4(),
"customEventTime": dt.datetime.now(UTC()).isoformat(),
"customData": "sample data"
}
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(event)
This example consumes a message received from storage queue and deserializes it to a CloudEvent object.
from azure.core.messaging import CloudEvent
from azure.storage.queue import QueueServiceClient, BinaryBase64DecodePolicy
import os
import json
# all types of CloudEvents below produce same DeserializedEvent
connection_str = os.environ['STORAGE_QUEUE_CONN_STR']
queue_name = os.environ['STORAGE_QUEUE_NAME']
with QueueServiceClient.from_connection_string(connection_str) as qsc:
payload = qsc.get_queue_client(
queue=queue_name,
message_decode_policy=BinaryBase64DecodePolicy()
).peek_messages()
## deserialize payload into a list of typed Events
events = [CloudEvent.from_dict(json.loads(msg.content)) for msg in payload]
This example consumes a payload message received from ServiceBus and deserializes it to an EventGridEvent object.
from azure.eventgrid import EventGridEvent
from azure.servicebus import ServiceBusClient
import os
import json
# all types of EventGridEvents below produce same DeserializedEvent
connection_str = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connection_str) as sb_client:
payload = sb_client.get_queue_receiver(queue_name).receive_messages()
## deserialize payload into a list of typed Events
events = [EventGridEvent.from_dict(json.loads(next(msg.body).decode('utf-8'))) for msg in payload]
You can use OpenTelemetry for Python as usual with EventGrid since it's compatible with azure-core tracing integration.
Here is an example of using OpenTelemetry to trace sending a CloudEvent.
First, set OpenTelemetry as enabled tracing plugin for EventGrid.
from azure.core.settings import settings
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan
settings.tracing_implementation = OpenTelemetrySpan
Regular open telemetry usage from here. See OpenTelemetry for details.
This example uses a simple console exporter to export the traces. Any exporter can be used here including azure-monitor-opentelemetry-exporter
, jaeger
, zipkin
etc.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor # this requires opentelemetry >= 1.0.0
# Simple console exporter
exporter = ConsoleSpanExporter()
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(exporter)
)
Once the tracer
and exporter
are set, please follow the example below to start collecting traces while using the send
method from the EventGridPublisherClient
to send a CloudEvent object.
import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.messaging import CloudEvent
from azure.core.credentials import AzureKeyCredential
hostname = os.environ['CLOUD_TOPIC_HOSTNAME']
key = AzureKeyCredential(os.environ['CLOUD_ACCESS_KEY'])
cloud_event = CloudEvent(
source = 'demo',
type = 'sdk.demo',
data = {'test': 'hello'},
)
with tracer.start_as_current_span(name="MyApplication"):
client = EventGridPublisherClient(hostname, key)
client.send(cloud_event)
azure.eventgrid
logger to collect traces from the library.Event Grid client library will raise exceptions defined in Azure Core.
This library uses the standard logging library for logging. Basic information about HTTP sessions (URLs, headers, etc.) is logged at INFO level.
Optional keyword arguments can be passed in at the client and per-operation level. The azure-core reference documentation describes available configurations for retries, logging, transport protocols, and more.
The following section provides several code snippets illustrating common patterns used in the Event Grid Python API.
These code samples show common champion scenario operations with the Azure Event Grid client library.
Generate Shared Access Signature: sample_generate_sas.py
Authenticate the client: sample_authentication.py (async_version)
Publish events to a topic using SAS: sample_publish_events_to_a_topic_using_sas_credential_async.py (async_version)
Publish Event Grid Events to a topic: sample_publish_eg_events_to_a_topic.py (async_version)
Publish EventGrid Events to a domain topic: sample_publish_eg_events_to_a_domain_topic.py (async_version)
Publish a Cloud Event: sample_publish_events_using_cloud_events_1.0_schema.py (async_version)
Publish a Custom Schema: sample_publish_custom_schema_to_a_topic.py (async_version)
The following samples cover publishing and consuming dict
representations of EventGridEvents and CloudEvents.
Publish EventGridEvent as dict like representation: sample_publish_eg_event_using_dict.py (async_version)
Publish CloudEvent as dict like representation: sample_publish_cloud_event_using_dict.py (async_version)
Consume a Custom Payload of raw cloudevent data: sample_consume_custom_payload.py
More samples can be found here.
For more extensive documentation on Azure Event Grid, see the Event Grid documentation on docs.microsoft.com.
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
SystemEventnames
related to Azure Resource Notifications.SystemEventNames
related to Azure Communication Services and Azure Resource Notifications.SystemEventNames
related to Azure App Configuration and Azure EventGrid.SystemEventNames
related to Azure Container Services.publish_cloud_events
, receive_cloud_events
, acknowledge_cloud_events
, release_cloud_events
, and reject_cloud_events
operations.SystemEventNames
related to Storage Tasks, Azure Communication Services and Azure HealthcareApis.SystemEventNames
related to Azure Communication Services, DataBox and ApiManagementGateway APIs.SystemEventNames
enums for APIManagement were incorrectly capitalized, changed Api
to API
.msrest
dependency and six
dependencyisodate
dependencySystemEventNames
related to health care APIs.SystemEventNames
related to health care APIs.send
API will raise on exceptions.SystemEvents
.Microsoft.ContainerService.NewKubernetesVersionAvailable
to SystemEvents
.from_json
method which now accepts storage QueueMessage, eventhub's EventData or ServiceBusMessage or simply json bytes to return an EventGridEvent
msrest
dependency to 0.6.21
to align with mgmt package.EventGridPublisherClient
now supports Azure Active Directory (AAD) for authentication.New Features
SystemEventNames
enum.Bug Fixes
ServiceBusDeadletterMessagesAvailableWithNoListenerEventName
with the right value.New Features
SystemEventNames
enum.Bug Fixes
repr
on EventGridEvent
to show more meaningful text.New Features
AcsChatThreadParticipantRemovedEventName
, AcsChatThreadParticipantAddedEventName
and AcsRecordingFileStatusUpdatedEventName
.Note: This is the first stable release of our efforts to create a user-friendly and Pythonic client library for Azure EventGrid. Users migrating from v1.x
are advised to view the migration guide.
New Features
azure-eventgrid
package now supports azure.core.messaging.CloudEvent
which honors the CNCF CloudEvent spec.azure.eventgrid.SystemEventNames
can be used to get the event model type mapping for system events.EventGridPublisherClient
for the publish flow for EventGrid Events, CloudEvents and Custom schema events.Breaking Changes
azure.eventgrid.models
namespace along with all the models in it are now removed.:
azure.eventgrid.SystemEventNames
provides the list of available events name for easy switching.azure.eventgrid.event_grid_client.EventGridClient
is now removed in favor of azure.eventgrid.EventGridPublisherClient
.azure.eventgrid.event_grid_client.EventGridClientConfiguration
is now removed.Disclaimer: v2.0.0 is functionally equivalent to v4.0.0. Users are advised to use v4.0.0 instead of this.
Breaking Changes
~azure.eventgrid.CloudEvent
is now removed in favor of ~azure.core.messaging.CloudEvent
.SystemEventNames
related to Azure Communication Service starting with ACS****
are renamed to Acs***
to honor pascal case.Features
SystemEvents
- ServiceBusDeadletterMessagesAvailablePeriodicNotificationsEventData
and ServiceBusActiveMessagesAvailablePeriodicNotificationsEventData
Breaking Changes
EventGridSharedAccessSignatureCredential
is deprecated in favor of AzureSasCredential
.azure.eventgrid.models
namespace along with all the models in it are now removed. azure.eventgrid.SystemEventNames
can be used to get the event model type mapping.topic_hostname
is renamed to endpoint
in the EventGridPublisherClient
.azure.eventgrid.generate_shared_access_signature
method is now renamed to generate_sas
.EventGridConsumer
is now removed. Please see the samples to see how events can be deserialized.CustomEvent
model is removed. Dictionaries must be used to send a custom schema.Bug Fixes
EventGridEvent
has two additional required positional parameters namely, data
and data_version
.EventGridPublisherClient
now appropriately throws a ValueError
if an invalid credential is passed during initialization.Bug Fixes
TypeError
is raised when bytes are passed to an EventGridEvent
.Feature
Features
Features
CloudEvents
.EventGridPublisherClient
for the publish flow for EventGrid Events, CloudEvents and CustomEvents.EventGridConsumer
for the consume flow of the events.General Breaking changes
This version uses a next-generation code generator that might introduce breaking changes.
NameOfEnum.stringvalue
. Format syntax
should be preferred.msrestazure.azure_operation.AzureOperationPoller
to
msrest.polling.LROPoller
. External API is the same.msrest.polling.LROPoller
,
regardless of the optional parameters used.raw=True
. Instead of
returning the initial call result as ClientRawResponse
,
without polling, now this returns an LROPoller. After polling,
the final resource will be returned as a ClientRawResponse
.polling
parameter. The default behavior is
Polling=True
which will poll using ARM algorithm. When
Polling=False
, the response of the initial call will be
returned without polling.polling
parameter accepts instances of subclasses of
msrest.polling.PollingMethod
.add_done_callback
will no longer raise if called after
polling is finished, but will instead execute the callback right
away.Features