by Mark Tse, Software Developer, Platform Development D2L
originally published 24 March 2018
In this article, we will be looking at how to keep a database updated with the daily Brightspace Data Sets (BDS) extracts.
Introduction
The release of BDS allows administrators access to raw data that can be aggregated and filtered to their needs. These data sets are refreshed on a daily basis, so administrators have to return daily to Brightspace to get the latest data. This article will help you get started on automating this process, so you can spend more time on working with the data and generating the reports you want, and less time on keeping a database up-to-date.
The source code in this article is available on GitHub.
Workflow Overview
This code example uses PostgreSQL and Python 3. However, it is possible to do this workflow with any database and programming language.
Prerequisites
- BDS enabled on Brightspace and scheduled to run daily
- Registration of an OAuth 2.0 application that allows for refresh tokens
- An initial refresh token
- Python 3.6, with dependent libraries installed (
pip install -r requirements.txt
) - PostgreSQL instance (sample tested against 9.6.2), and schema set up by running the schema files in
schema/tables
- A
config.json
file in the same folder as the script, with the following:
{ "bspace_url": "https://<domain>/",
"client_id": "<client_id>",
"client_secret": "<client_secret>",
"refresh_token": "<refresh_token>",
"dbhost": "<dbhost>",
"dbname": "<dbname>",
"dbuser": "<dbuser>",
"dbpassword": "<dbpassword>"}
Note: this configuration file's permissions should be set so that it is only readable by the user running the script, as it contains sensitive information.
Getting an OAuth 2.0 Access Token
First, we use the client ID and secret from the OAuth 2.0 application registration as well as the refresh token we obtained in the previous step to obtain an access token.
CONFIG_LOCATION = 'config.json'
AUTH_SERVICE = 'https://auth.brightspace.com/'
...
def get_config():
with open(CONFIG_LOCATION, 'r') as f:
return json.load(f)
def trade_in_refresh_token(config):
# https://tools.ietf.org/html/rfc6749#section-6
response = requests.post(
'{}/core/connect/token'.format(AUTH_SERVICE),
# Content-Type 'application/x-www-form-urlencoded'
data={
'grant_type': 'refresh_token',
'refresh_token': config['refresh_token'],
'scope': 'core:*:*'
},
auth=HTTPBasicAuth(config['client_id'], config['client_secret'])
)
if response.status_code != 200:
logger.error('Status code: %s; content: %s', response.status_code, response.text)
response.raise_for_status()
return response.json()
def put_config(config):
with open(CONFIG_LOCATION, 'w') as f:
json.dump(config, f, sort_keys=True)
config = get_config()
token_response = trade_in_refresh_token(config)
# Store the new refresh token for getting a new access token next runconfig['refresh_token'] = token_response['refresh_token']put_config(config)
Refresh tokens are one-time use. However, we get a new refresh token with the access token response, so we store the new refresh token into config.json
for next run.
Downloading the Data Sets
The data sets we are interested are defined at the top of the script:
DATA_SET_METADATA = [
DataSetMetadata(
plugin='07a9e561-e22f-4e82-8dd6-7bfb14c91776',
table='org_units'
),
DataSetMetadata(
plugin='793668a8-2c58-4e5e-b263-412d28d5703f',
table='grade_objects'
),
DataSetMetadata(
plugin='1d6d722e-b572-456f-97c1-d526570daa6b',
table='users'
),
DataSetMetadata(
plugin='9d8a96b4-8145-416d-bd18-11402bc58f8d',
table='grade_results'
)]
For each data set, we call Brightspace to get the zipped data set:
API_VERSION = '1.15'
...
endpoint = '{bspace_url}/d2l/api/lp/{lp_version}/dataExport/bds/download/{plugin_id}'.format(
bspace_url=config['bspace_url'],
lp_version=API_VERSION,
plugin_id=plugin
)
headers = {'Authorization': 'Bearer {}'.format(token_response['access_token'])}
response = requests.get(endpoint, headers=headers)
Extracting the CSV file
To extract the data set, we load the zip archive into memory. Streams are used to avoid loading the entire unzipped CSV into memory at once.
There should only be one file in the archive.
with io.BytesIO(response.content) as response_stream:
with zipfile.ZipFile(response_stream) as zipped_data_set:
files = zipped_data_set.namelist()
assert len(files) == 1
csv_name = files[0]
with zipped_data_set.open(csv_name) as csv_data:
update_db(db_conn_params, table, csv_data)
Updating the Database
To update the database, we first load the entire data set into a temporary table:
cur.execute(
'''
CREATE TEMP TABLE tmp_{table} AS
SELECT *
FROM {table}
LIMIT 0;
'''
.format(table=table)
)
cur.copy_expert(
'''
COPY tmp_{table}
FROM STDIN
WITH (FORMAT CSV, HEADER);
'''
.format(table=table),
csv_data
)
Note: the syntax here is for PostgreSQL, and may differ depending on which SQL engine you are using.
We perform an upsert (insert or update) to the main table by loading the query inside schema/upserts
and executing it. This upsert query will read all the data in the TEMP
table and update or insert into the main table based on the PRIMARY KEY
.
upsert_query_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'schema',
'upserts',
'upsert_{table}.sql'.format(table=table)
)
with open(upsert_query_file) as upsert_query:
cur.execute(upsert_query.read())
Here is a sample upsert query for PostgreSQL:
INSERT INTO users
SELECT
user_id,
user_name,
org_defined_id,
first_name,
middle_name,
last_name,
is_active,
Organization,
internal_email,
external_email,
signup_date
FROM tmp_users
ON CONFLICT ON CONSTRAINT users_pkey
DO UPDATE SET
user_name = EXCLUDED.user_name,
org_defined_id = EXCLUDED.org_defined_id,
first_name = EXCLUDED.first_name,
middle_name = EXCLUDED.middle_name,
last_name = EXCLUDED.last_name,
is_active = EXCLUDED.is_active,
organization = EXCLUDED.organization,
internal_email = EXCLUDED.internal_email,
external_email = EXCLUDED.external_email,
signup_date = EXCLUDED.signup_date
Other SQL engines may slightly differ by offering MERGE
instead of ON CONFLICT
.
Finally, we clean up by dropping the TEMP
table and commiting the transaction:
cur.execute('DROP TABLE tmp_{table}'.format(table=table))
...
conn.commit()
Scheduled Job
This script is re-runnable, and can be triggered daily. Alongside the BDS scheduled task, a scheduled execution of this script will keep the database up-to-date with the latest data.
Conclusion
We have shown how to programmatically update a database with a given set of daily BDS extracts. These steps remove the need to manually update the database as extracts are updated, providing more time for you to focus on analyzing the data.