by Mark Tse, Software Developer, Platform Development D2L
originally published 24 March 2018
Adding a column, provided it's appended to the end of a data set and therefore doesn't disrupt the order of the existing columns, is deemed low impact. Any system ingesting the previous versions of the data set that didn't contain these columns should not throw errors or otherwise be unable to function due to the addition of those appended columns.
In this article, we will make changes to the code sample in Brightspace Data Sets headless client sample and Brightspace Data Sets - Differential Data Sets Client Example to support additive changes to the Brightspace Data Sets schema.
Changes Overview
To support extracts with columns in the CSV that the database table does not have, we will have to truncate those extra columns before updating the database table.
Ignoring New Columns
Recall that for each data extract, we upload the entire extract into a temporary table before doing an upsert (insert or update):
def update_db(db_conn_params, table, csv_data): ''' In a single transaction, update the table by: - Loading the CSV data into a temporary table - Run an update or insert query to update the main table with the data in the temporary table - Delete the temporary table ''' with psycopg2.connect(**db_conn_params) as conn: with conn.cursor() as cur: tmp_table_id = sql.Identifier('tmp_' + table) # 1) Create the temporary table cur.execute( sql.SQL(''' CREATE TEMP TABLE {tmp_table} AS SELECT * FROM {table} LIMIT 0; ''') .format( tmp_table=tmp_table_id, table=sql.Identifier(table) ) ) # 2) Bulk update the CSV into the temporary table cur.copy_expert( sql.SQL(''' COPY {tmp_table} FROM STDIN WITH (FORMAT CSV, HEADER); ''') .format(tmp_table=tmp_table_id), csv_data ) # 3) Perform the upsert upsert_query_file = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'schema', 'upserts', 'upsert_{table}.sql'.format(table=table) ) with open(upsert_query_file) as upsert_query: cur.execute(upsert_query.read()) # 4) Remove the temporary table cur.execute(sql.SQL('DROP TABLE {tmp_table}').format(tmp_table=tmp_table_id)) conn.commit()
Step 2) uses the COPY
command in PostgreSQL, which requires the input CSV file to have the exact number of columns as in the table. If the database has an older schema defined and a data extract with a newer schema is used (which has more columns than the table), PostgreSQL will throw an exception.
To prevent this from occurring, we will process the CSV file to remove all extraneous columns. First, we will need to determine how many columns the table in the database has:
def get_number_of_columns(db_conn_params, table): with psycopg2.connect(**db_conn_params) as conn: with conn.cursor() as cur: cur.execute( sql.SQL(''' SELECT * FROM {table} LIMIT 0; ''') .format(table=sql.Identifier(table)) ) # cur.description is a list of named tuples # Each named tuple describes a column in the table # The length of that list is therefore the number of columns in the # table. return len(cur.description)
Next, we will add a method that processes the data extract given the number of columns in the table:
def process_csv_stream(csv_input_stream, num_columns_in_table): ''' Ignore excessive columns in the CSV due to additive changes / BDS minor changes by ignoring any columns in the CSV past the number of columns in the table ''' csv_rows = [] # Read each line in the CSV, and truncate all columns not yet defined for # the table we are loading the data into csv_reader = csv.reader(csv_input_stream, quoting=csv.QUOTE_MINIMAL) for line in csv_reader: csv_rows.append(line[:num_columns_in_table]) # Re-write the processed rows into a new CSV file-like object (i.e. a # stream) csv_data = io.StringIO() csv_writer = csv.writer(csv_data, quoting=csv.QUOTE_MINIMAL) csv_writer.writerows(csv_rows) # Rewind the stream to the beginning before returning it so that the # downstream reader reads from the beginning csv_data.seek(io.SEEK_SET) return csv_data
Memory Concerns
Notice that in process_csv_stream()
we read the entire CSV into memory. This becomes infeasible with large data extracts. To prevent out-of-memory issues, we will process the CSV in batches instead of the entire file at the same time. To do so, we will add a new method:
def batch_update_db(db_conn_params, table, csv_file, batch_size=10000): # Remove the first row, which contains the headers csv_file.readline() num_columns = get_number_of_columns(db_conn_params, table) csv_input_stream = io.StringIO() def update_db_with_batch(input_stream): ''' Helper method that forms a closure so we don't have to pass many of the values used in this method as arguments ''' # Rewind the stream to the beginning before passing it on input_stream.seek(io.SEEK_SET) with process_csv_stream(input_stream, num_columns) as csv_data: # Reuse this already existing method update_db(db_conn_params, table, csv_data) input_stream.close() i = 0 for line in csv_file: csv_input_stream.write(line.decode('utf-8')) i += 1 # For every 10000 lines, we update the table before continuing. # Batch size is configurable, and the code sample can be extended to # pass the batch size in as an argument (out-of-scope of this article) if i == batch_size: update_db_with_batch(csv_input_stream) # Assign a new file-like object so that Python knows the old one # can be cleaned up csv_input_stream = io.StringIO() i = 0 # Update the database with the remaining lines update_db_with_batch(csv_input_stream)
Then we change our code to use .batch_update_db()
instead of update_db()
directly:
def unzip_and_update_db(response_content, db_conn_params, table): with io.BytesIO(response_content) as response_stream: with zipfile.ZipFile(response_stream) as zipped_data_set: files = zipped_data_set.namelist() assert len(files) == 1 csv_name = files[0] with zipped_data_set.open(csv_name) as csv_file: # update_db(db_conn_params, table, csv_data) batch_update_db(db_conn_params, table, csv_file)
Finally, since we are now removing the header in the code, we need to tell PostgreSQL to not remove the first row of the CSV by removing the HEADER
option in the update_db()
method:
cur.copy_expert( sql.SQL(''' COPY {tmp_table} FROM STDIN WITH (FORMAT CSV); ''') .format(tmp_table=tmp_table_id), csv_data)
Alternative Options
These changes are required as the code sample uses PostgreSQL as the backing database, which does not have built-in SQL support for skipping columns in a CSV file. Alternative options are available for the following database engines which uses functionality built into the engine itself:
There is likely a performance improvement by using functionality built into the database engine. However, the changes described in this article are agnostic to the database engine, and can be adapted for any database engine. It is up to the reader to decide which option to pursue.
Summary
In this article, we have shown how we can pre-process a data extract in code to manage additive changes to BDS schema. Please let us know of any feedback, questions, or comments!