Dataverse, meet Databricks
Microsoft Dataverse is a set of data services, tables/schemas and security features that support Dynamics 365 and PowerApps.
Its popularity is growing rapidly as it is becoming the keystone application-data repository for an increasing number of low-code PowerApps, CRM and intelligent workflow solutions.
While there are several integration platforms that perform the task of integrating data from Dataverse to Databricks quite brilliantly (E.g. FiveTran) and who’s approach may offer significant advantages to the following pattern; I wanted to dust off my data engineering chops and build a native Databricks process to showcase a pattern that can be used for not only Dataverse, but any REST ODATA endpoint.
The notebook is completely dynamic, including the schema inference. It results in a Delta table, ready for analysis.
TL;DR; The completed notebook
Step 1: Create an Azure AD App ID
We need an account to query the API for Dataverse. Follow this walkthrough to create the App Registration in your AD tenant.
Tutorial here: https://learn.microsoft.com/en-us/power-apps/developer/data-platform/walkthrough-register-app-azure-active-directory.
Note: Sometimes the language for Dataverse can be confusing. You may see references to the common data service. Dataverse is the new term. And often you’ll note Dynamics 365 CRM in the documentation even though you may not be using that app specifically. Dataverse is core to Dynamics CRM as well as PowerApps.
Step 2: Apply permissions to Dataverse for your Client ID (App):
With the Application Client ID you just created, follow the steps in the link to apply permissions to your PowerApps instance’s Dataverse dataset.
These permissions will allow Databricks to use the ClientID (and its Access Token) to query the Dataverse tables.
https://learn.microsoft.com/en-us/power-platform/admin/manage-application-users.
Note: In my case, I applied “System Administrator” as I’m determining the least-privileged permission set. (TODO).
Step 3: In Databricks, Create a Python Notebook.
Let’s start building the sequence, cell by cell.
- We need some importers.
import requests
import json
from pyspark.sql.functions import *
from pyspark.sql.types import *
2. Retrieve the secrets from key vault
Record the ClientID, Secret, Tenant and your Dataverse Orgname as Secrets in Azure Key Vault. We’re going to use these four variables throughout the process to get data from Dataverse’s ODATA endpoint.
For my_dv_entity
, set any table endpoint name you would like from your environment. A complete list is here. Note: your custom tables may have a prefix like cr552_tablename.
# Retrieve secrets from our key vault.
my_dv_orgName = dbutils.secrets.get(scope="demo-kv-scope",key="eric-demo-dataverse-org")
my_dv_clientId = dbutils.secrets.get(scope="demo-kv-scope",key="eric-demo-dataverse-clientid")
my_dv_secret = dbutils.secrets.get(scope="demo-kv-scope",key="eric-demo-dataverse-secret")
my_ad_tenant = dbutils.secrets.get(scope="demo-kv-scope",key="bp-tenant-id")
my_dv_entity = 'contacts' #Type any endpoint entity name for export. TODO: Make dynamic with Databricks widget
3. Create a Function to Obtain the Client’s API Access TOKEN
To make REST API calls we need an access token for the Client ID we created.
This function acquires the token. It takes your TenantID (aka, your company’s AD), the ClientId, Secret and the URL for your Dataverse and passes it to Microsoft’s OAUTH2 flow endpoint and returns its access token for use in this session.
def get_access_token(tenant_id, client_id, client_secret):
# Set the token endpoint URL and the request payload
token_endpoint_url = (
f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
)
payload = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": f"https://{my_dv_orgName}.crm.dynamics.com/.default", # IMPORTANT. The proper key is dependant upon this scope matching your dataverse url/.default
}
# Make the POST request to the token endpoint
response = requests.post(token_endpoint_url, data=payload)
# Extract the access token from the response
response_json = response.json()
access_token = response_json["access_token"]
return access_token
my_dv_accessToken = get_access_token(
tenant_id=my_ad_tenant, client_id=my_dv_clientId, client_secret=my_dv_secret
)
4. Create a Function to call the Dataverse endpoint & retrieve all records.
This function simply takes as its input a URL to your Dataverse tables. It uses the access token my_dv_accessToken
and grabs 5,000 records each loop, appending each loop’s output to a result.
It will continue to loop until all the records for your query are retrieved. The @odata.nextLink
is a field in the result-set returned from Dataverse that tells this function to grab the next 5,000 records.
def call_dataverse_endpoint(endpoint):
# Empty list
data = []
# Headers
headers = {
"Authorization": f"Bearer {my_dv_accessToken}",
"Accept": "application/json",
"Content-Type": "application/json; charset=utf-8"
}
# Initial request
response = requests.get(endpoint, headers=headers)
# Loop through the responses until odata.nextLink is gone.
while "@odata.nextLink" in response.json():
# Append the data returned by the endpoint to the list
data.extend(response.json()["value"])
# Request the odata.nextLink URL
response = requests.get(response.json()["@odata.nextLink"], headers=headers)
# Append nextLink response data
data.extend(response.json()["value"])
# Return
return data
5. Let ‘er rip! We have everything we need to start.
Since the function returns to us a list, we’ll create an object list_data
to hold all the records from step 4 by calling it with our Dataverse endpoint, for Contacts in this example.
list_data = call_dataverse_endpoint(f'https://{my_dv_orgName}.crm.dynamics.com/api/data/v9.2/contacts')
Here’s the output from my demo environment. Below, you can see the extract took about 2.5 minutes and retrieved 111,023 contact records.
Note the endpoint does not have any filter or search query qualifiers. If you filtered on modifiedon
date for example, your exports could be optimized.
6. Let’s convert our result list_data
to a dataframe
At this point, we have an object in spark that holds our data, list_data
. It’s in JSON ready format, but to convert it to a dataframe we have to get the schema for the data.
While there are several ways to obtain or infer the schema, the best practice is to create the one you need for your data manually. You have full control of the columns you want to express in your table and you have full control of the data types for each attribute. Defining your own schema also helps you consider how and when to introduce schema changes to your table. While Databricks Delta (and Delta Live Tables) gives you an incredible number of ways to evolve a schema automatically, it’s important to consider side effects that result from columns vanishing from your source data or if data types change. Here’s a nice write up on schema creation.
But, in a data exploration scenario like this, who wants to hand-crank a schema each time you try a new table!? Not this guy!
I’ve found a nice utility that takes an object and returns a schema that we can use with the spark.createDataFrame
command. It’s fully dynamic and great for use cases like this, particularly when you have data where inferring mechanisms may not work. The gist is here and I’ve included it in my final notebook.
7. With the Schema Provided, Create the Delta Table for Analysis
# Auto create the schema as a StructType object type from our list_data object
data_schema = generate_schema(input_json=list_data[0])
# Create a dataframe from our Dataverse API calls using the schema we just created
df = spark.createDataFrame(data=list_data, schema=data_schema)
# Switch to the Unity catalog used for my demos
sql('USE CATALOG demos;')
# Create a Dataverse database/schema
sql('CREATE DATABASE IF NOT EXISTS Dataverse;')
# Create our delta table from our dataframe
df.write.mode('overwrite').format('delta').saveAsTable(f'Dataverse.{my_dv_entity}')
That’s it! You have a table ready for view. You can re-run this notebook for any entity you would like and it will automatically create a new table or overwrite an existing table.
FUTURE Enhancements
Here are some enhancements we can make to this data acquisition pattern.
- Delta Live Tables (DLT) with MERGE. DLT is perfectly suited for workstreams like this. We can easily envision a pipeline that acquires all new table data every ~N hours and applies a MERGE (or APPLY CHANGES INTO using DLT speak) statement into your Delta table, creating new records or updating existing ones. This would be a significant advantage to the full load & overwrite sequence I’ve developed here.
- Data Validation. Data from Dataverse with missing or bad values may not be useful for your analytical needs. With DLT, you can define expectations about your data and drop rows that do not meet your quality objectives.
- Fully Parameterized Pipeline. Databricks notebooks can be called from external orchestration tools like Airflow, Data Factory and others. A Dataverse endpoint can be passed programmatically to this notebook for processing.
- Error Control. You’ll notice that I did not include
try, except
logic in my functions. These are vital for production ready workloads.