Loading Data to BigQuery using Google Workflows

By Emily Cogsdill and Jesus Toquinto Zubiate

Overview

In this post we will look at how CTA uses Google Workflows to transfer data into BigQuery from S3and Google Drive.

Google Workflows

This is particularly useful when you are performing tasks that use the Google API and Google Cloud Services. Rather than needing to write and deploy custom code to handle different APIs, Google Workflows makes it simple to configure tasks and set up dependencies between them.

AWS S3 to BigQuery

  1. Storage Transfer Service (STS) synchronizes the S3 bucket with a Cloud Storage (GCS) bucket
  2. The BigQuery API loads data from GCS into BQ, and
  3. Google Workflows orchestrates the entire process.

Prerequisites

Pro Tip: If you are an organization working with Community Tech Alliance, you already have all of this set up for you! Learn more here.

Create an STS transfer

Head over to the STS page on the Google Cloud Console and click on Create Transfer Job. This will take you to the Create page, set the source to S3 and destination to GCS.

Fill in your source information and add your S3 credentials for Step 2. Then choose your destination GCS bucket on Step 3. For the initial set up, we chose to Run the job once starting now.

Finally, we configure some additional transfer settings:

Create the Workflow

  • The destination BigQuery table we want to load data into (identified by dest_project_id, dest_dataset_id, and dest_table_id)
  • The GCS source URI to load, and
  • The name we want to give to the STS transfer job.
# Arguments
# - dest_project_id (String)
# Project ID of Destination Table
# - dest_dataset_id (String)
# Dataset ID of Destination Table
# - dest_table_id (String)
# Table Name of Destination Table
# - source_uri (String)
# Full Cloud Storage Path of Source File(s) (ex. gs://path/to/file.txt.gz or ["gs://path/to/file.txt.gz", etc..])
# - transfer_job_name (String)
# Full Name of transfer job that syncs S3 to GCS bucket (ex. transferJobs/123456789)

main:
params: [args]
steps:
- init:
assign:
- dest_project_id: ${args.dest_project_id}
- dest_dataset_id: ${args.dest_dataset_id}
- dest_table_id: ${args.dest_table_id}
- source_uri: ${args.source_uri}
- transfer_job_name: ${args.transfer_job_name}

Our first step is to run the STS transfer job, which will load any new data that has not yet been synced from S3. Google Workflows provides a connector for the Storage Transfer Service that we can use to start a job run.

  - run_s3_to_gcs_transfer:
call: googleapis.storagetransfer.v1.transferJobs.run
args:
jobName: ${transfer_job_name}
body:
projectId: ${dest_project_id}
result: transfer_result

Next, we use the BigQuery Connector to create a Load job to load data from a GCS bucket to BigQuery.

- load_data_into_bq_table:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${dest_project_id}
body:
configuration:
load:
createDisposition: CREATE_IF_NEEDED
writeDisposition: WRITE_TRUNCATE
sourceUris: ${source_uri}
destinationTable:
projectId: ${dest_project_id}
datasetId: ${dest_dataset_id}
tableId: ${dest_table_id}
skipLeadingRows: 1
result: load_result

And you’re done!

For our use case, we wanted to take an additional step to set up automated alerts when any step in the workflow fails. To enable this, we added some logs to our workflow that we can use to trigger those alerts. This is set up by wrapping each workflow call with a try/catch block. Within the catch you can add any number of steps. Here’s an example that will create a detailed log of the failure.

 - run_s3_to_gcs_transfer:
try:
call: googleapis.storagetransfer.v1.transferJobs.run
args:
jobName: ${transfer_job_name}
body:
projectId: ${dest_project_id}
result: transfer_result
except:
as: error
steps:
- log_transfer_job_error:
call: sys.log
args:
severity: "ERROR"
json:
workflow_execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
source_uri: ${source_uri}
transfer_job_name: ${transfer_job_name}
target_destination:
projectId: ${dest_project_id}
datasetId: ${dest_dataset_id}
tableId: ${dest_table_id}
sync_status: "FAILED"
error_message: ${error}
- raise_error_to_halt_execution:
raise: error

Make sure that you are adding a step in your except block to re-raise the error and halt execution of the workflow.

steps:
- log_transfer_job_error: ...
- raise_error_to_halt_execution:
raise: error

That’s it! You now have a workflow that can be used to load data from S3 into BigQuery, and with just a little extra effort, your workflow will emit logs that you can use to set up automated alerting using Google Monitoring. You can schedule this workflow to run on any schedule you would like or run it ad hoc, depending on your use case. And here is a link to the full workflow code.

Google Drive to BigQuery

Prerequisites

Create the Workflow

# This workflow loads data from Google Drive to BQ.
#
# Arguments:
# - base_drive_folder_id (String)
# The root Google Drive Folder ID
# - dest_project_id (String)
# Project ID of where data should be loaded
# - source_dataset_id (String)
# Dataset where data should be loaded
main:
params: [args]
steps:
- init:
assign:
- base_drive_folder_id: ${args.base_drive_folder_id}
- dest_project_id: ${args.dest_project_id}
- dest_dataset_id: ${args.dest_dataset_id}

Next, we call the Drive API to get a list of all the CSV files in the base Drive folder provided. This call is a bit trickier because Workflows does not provide a dedicated connector for Drive, so we need to use the http.get function. This requires that we provide the full API URL path and the Authentication type we want to use, as well as the scopes needed. The query that we are sending to the API is just requesting for all of the files that have the text/csv type and have the input drive folder id as its parent. For more information on structuring the Drive API Query, you can refer to the documentation here.

 - get_file_ids_from_drive:
call: http.get
args:
url: https://content.googleapis.com/drive/v3/files
auth:
type: OAuth2
scopes:
- https://www.googleapis.com/auth/drive
query:
includeItemsFromAllDrives: true
supportsAllDrives: true
q: ${"'" + base_drive_folder_id + "' in parents and mimeType = 'text/csv'"}
result: csv_files

Now that we have a list of CSV files in the Drive folder, we can iterate through each file and load it into its own table in the destination BigQuery Dataset. We have to do a bit of string manipulation to format the source URI that the BigQuery Connector will accept:

- iterate_files:
for:
value: file
in: ${csv_files.body.files}
steps:
- assign_variables_for_bq_load:
assign:
- source_uri: ${"https://drive.google.com/open?id=" + file.id}
- file_name_no_ext: ${text.split(file.name, ".")[0]}
- table_name: ${file_name_no_ext}

Now that we have the source URI formatted correctly, we can load the file into BigQuery. To do this, we load the CSV file into a temporary external table that we then query to create a native BigQuery table. (Note: we have to provide the needed OAuth scopes that the connector will need to get data from Drive.)

- load_file_to_bq:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${dest_project_id}
connector_params:
scopes:
- https://www.googleapis.com/auth/drive
- https://www.googleapis.com/auth/bigquery
body:
configuration:
query:
createDisposition: CREATE_IF_NEEDED
writeDisposition: WRITE_TRUNCATE
destinationTable:
projectId: ${dest_project_id}
datasetId: ${dest_dataset_id}
tableId: ${table_name}
tableDefinitions:
${"temp_" + table_name}:
sourceUris:
- ${source_uri}
sourceFormat: CSV
csvOptions:
skipLeadingRows: 1
autodetect: true
query: ${"SELECT * FROM temp_" + table_name}
result: load_result

And that’s it! We now have a workflow that will get all CSV type files from a Google Drive folder, read them into temporary tables, and create a BigQuery Table out of the files data.

Here is a link to the full code for this workflow.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store