Learning Databricks – Part 2 – Getting Data

There are a bunch of ways to make data available but, if you’re just starting out or building a pet project, you probably want to keep it simple. If you don’t have your own data to start with, check out Kaggle, get some datasets, and start cooking.

Small data sample

Databricks is powerful – it’s made to be used on large and complex data but that doesn’t mean it’s no good for tiny data. For illustration, my samples throughout this series will use the following tiny CSVs.

id,name,description
1,Widget,"Made from cheap plastic"
2,Gadget,
id,widget,amount,currency
1,1,0.25,USD
2,1,0.20,EUR
3,1,27,JPY
4,2,1.75,USD
5,2,1.40,EUR
6,2,189,JPY

Uploading the Data

You can save and upload that CSV, for example, as Items.csv and Pricing.csv.

The first stop is the Add Data button and Upload File interface. It’s a standard upload experience so I won’t cover it.

Once you’ve uploaded the file, it’ll give you the chance to create a table in the UI or a notebook. The notebook option spits out some boilerplate code which can create a persistent table from the file but it’s very basic. It’s a good idea to see the boilerplate but, even for a toy project, you’ll probably want to do a little more.

The important thing to remember is your files will be available at /FileStore/tables/Items.csv and /FileStore/tables/Pricing.csv.

Serializing the data

The files are now available via Databricks’ built-in storage but they still need to be read into language-level objects to be useful. If you checked the boilerplate, you saw one of the options for the reader was infer_schema.

This can be useful if you want to read in the file with very few assumptions and start tweaking the structure on the fly. I do this when I read in different datasets for every job and hand off the results to some other subsystem.

In this case, however, I’m going to use the same data across multiple notebooks using different languages.

Schema specification

These schema specify column names, types, and nullability. The pyspark.sql.types module is necessary for type specification. Read up on it to see what else it offers.

from pyspark.sql.types import StructType,
  StructField,
  StringType,
  IntegerType,
  DoubleType
items_schema = StructType([
  StructField('ItemId', IntegerType(), nullable = False),
  StructField('Name', StringType(), nullable = False),
  StructField('Description', StringType(), nullable = True)
])
pricing_schema = StructType([
  StructField('PriceId', IntegerType(), nullable = False),
  StructField('ItemPriceId', IntegerType(), nullable = False),
  StructField('Amount', DoubleType(), nullable = False),
  StructField('Currency', StringType(), nullable = False)
])
Column specifications are positional. You could switch ‘PriceId’ and ‘ItemPriceId’ and you’d end up with misnamed columns. Even worse, Spark might try to cram a value into an ineligible type.

A schema object’s structure can be arbitrarily complex. In this case, both schema are just a few columns of simple types, but it’s possible to embed structures within structures which can model semi-structured data like JSON.

Creating DataFrames

The CSV files are next read into DataFrames. This step applies the schema to the dataset and creates a language-level object we can interact with.

item_df = spark.read.csv('/FileStore/tables/Items.csv',
  schema = items_schema,
  header = True)

price_df = spark.read.csv('/FileStore/tables/Pricing.csv',
  schema = pricing_schema,
  header = True)

There isn’t much magic at play. DataFrameReader can read a bunch of stuff in addition to CSVs. The first parameter is the location of the data to read, the second is the schema I defined, and the third indicates that the first row of the file is a header.

Creating Tables

Finally, the DataFrames know how to create tables out of their data.

item_df.write.format('parquet') \
  .mode('overwrite') \
  .saveAsTable('items')

price_df.write.format('parquet') \
  .mode('overwrite') \
  .saveAsTable('pricing')

That’s all self-describing but, for reference, modes are documented. Other available formats include JSON, text, CSV, and ORC.

Recap

I essentially took the raw data and moved it as-is into tables named items and pricing. From here I can perform transformations, create visualizations, and store the data in new tables.

Check out my series on ADLS Gen2 and Azure Databricks to see one example of using Databricks as one component in a larger system.

I published a complete notebook using large(ish) datasets with wide schemas. Databricks kills those links after six months so it’ll go down eventually. Rest assured the samples in this article cover all the same stuff and are much more readable.

Learning Databricks – Part 1 – A Brief Introduction

Databricks is one of my favorite tools. It’s a managed Apache Spark platform used for data engineering and data science. Before I dive in to using it, I’ll highlight some of the key features.

Jupyter is similar to Databricks in a lot of respects so, if you’re familiar with the former, I doubt this article will be informative. Keep an eye out for further installments in this series.

A full-fledged IDE

The Databricks web GUI is a first-class IDE. Files, folders, and dependencies are all navigable. Data connections, jobs, and clusters can be managed and inspected.

IDE navigation

There are separate Users and Shared areas in the workspace. When I’m working on a team project, I like to do my prototyping, debugging, and scratch pad stuff in my Users area and, once code is ready to go to a live environment, I put it in Shared.

This is an awesome feature for collaborative development and, conveniently, means you don’t need an IDE on your local machine. It also drastically reduces the chances of a “works on my machine” situation.

Notebooks

Notebooks are special files which the Databricks interface parses into command windows. The actual files are a combination of the code and markup you write in the GUI and metadata which, among other things, indicate how the commands are divided. In the notebook analogy, command windows are like pages.

A basic notebook example

Each notebook has a default programming language: one of Python, Scala, SQL, Java, or R. The window’s language can be specified using a magic command like %sql on the first line.

You can’t switch languages in the middle of a command window.

Languages run on separate kernels. If you create a DataFrame (a handy data abstraction) in Python, it won’t be available to commands in other languages as-is.

Version control

As a bonus, notebooks have built-in version control and can be connected to Git repositories. This comes complete with diff views between any revision and its immediate predecessor.

I added line 2 to demonstrate the in-line diff view

The built-in version control capabilities are somewhat limited when compared to Git. I definitely recommend integrating with a Git repository for a mature and flexible process.

Try it out

If you’ve been thinking of trying it out, there’s a free community edition. It’s limited to a low-end cluster and doesn’t provide certain features like Git integration, but it has all the basic stuff needed to see if Databricks is right for you.

Next up

I’ll cover practical topics like acquiring and serializing/deserializing data, transformation, and analysis. I hope to get into machine learning and visualizations too. If you like a “come learn with me” experience, look out for those.

Scalability in Cosmos DB

Even when just sitting at idle, Cosmos DB will rack up charges. You set the throughput as Request Units (RUs) and that power is constantly provisioned so Cosmos won’t experience “warm up time”. Naturally, you get billed by the hour.

I’ve spent more money than I had to by manually provisioning the throughput or, more often, failing to scale down during non-peak hours. For my ETL to run effectively without racking up charges, I decided to set up some programmatic scaling.

Use case: scaling before large writes

My Cosmos consumers don’t need a lot of power so I only need to ramp up the RUs for roughly a 90-minute window in the early morning when I throw a lot of writes at it. All together, I must have the following so the throughput is only increased while it’s needed.

  • My load subsystem must be able to increase the throughput just before it starts loading.
  • It must be able to decrease the throughput as soon as it’s done loading.
  • Since Azure Data Factory orchestrates my ETL, it must be able to invoke the solution.

Cosmos APIs

I had a lot of choices for programmatically interacting with Cosmos including a RESTful API, the Azure CLI, PowerShell, and .NET. Look for other options in the Quickstarts in the official documentation.

Some of these, like PowerShell, can’t modify the throughput. Make sure to RTFM.

Azure Functions as a proxy

ADF has a first-class Azure Functions Activity. Functions are a lightweight compute product, which makes them perfect for my on-demand requirement. As a managed product, I don’t have to worry about configuration beyond choosing my language

It was easiest for me to implement this in C#, so I chose the .NET library and created a .NET Core Function with an HTTP trigger.

Implementation

My implementation is stateless and is not coupled to any of my ETL subsystems. Microsoft.Azure.DocumentDB.Core is the only dependency not already bundled with the Functions App template. Ultimately, the app is plug-and-play.

Here are a couple of edited excerpts from the Function to show the most important parts. The repo is linked in the last section for your free use.

Communicating with Cosmos

Cosmos references are wired up in two stages. First, the DocumentClient (i.e. a reference to the Cosmos DB) is spun up using the Cosmos instance’s URL.

using (
  var client = new DocumentClient(new Uri(dbUrl),
  key,
  new ConnectionPolicy { UserAgentSuffix = " samples-net/3" }))
{
  client.CreateDatabaseIfNotExistsAsync(new Database { Id = dbName }).Wait();
  var coll = CreateCollection(client).Result;
  ChangeThroughput(client, coll).Wait();
}

CreateCollection is a helper method which gets a reference to a collection within the DB.

async Task<DocumentCollection> CreateCollection(DocumentClient client) {
  return await client.CreateDocumentCollectionIfNotExistsAsync(
    UriFactory.CreateDatabaseUri(dbName),
    new DocumentCollection {
      Id = collectionName
    },
    new RequestOptions {
      OfferThroughput = throughput
    });
}

CreateDocumentCollectionIfNotExistsAsync is part of the Cosmos .NET API. As the name implies, it conditionally creates the collection. This is necessary even if you know the collection already exists – it’s how the Function gets a reference to the collection.

Modifying the offer

The throughput is changed first by getting the existing offer for the Cosmos DB, then replacing that offer with the throughput you want.

OfferV2.OfferType is always “Invalid”. It doesn’t mean the Function failed to fetch or replace the offer.
async Task ChangeThroughput(DocumentClient client,
  DocumentCollection simpleCollection)
{
  var offer = client.CreateOfferQuery().Where(o =>
    o.ResourceLink == simpleCollection.SelfLink)
    .AsEnumerable().Single();
  var newOffer = await client.ReplaceOfferAsync(
    new OfferV2(offer, throughput));
}

There’s some more stuff to glue it together and some error handling I’ve added. The repo layers all of that in.

Using the Function

You’ll need to send along five values in the request headers.

  • The name of the Cosmos DB
  • The name of the collection
  • The Cosmos’ URL
  • One of the read-write keys
  • The number of RUs you want to set.
    • This must be a multiple of 100.
The read-write key is transmitted in plain text in my current implementation. This is an obvious security flaw which I haven’t gotten around to fixing.

Source code

The source is on GitHub. Feel free to use it. If you have ideas for improvements, I’d love to hear them.

ADLS Gen2 and Azure Databricks – Part 4 – Mounting to DBFS

Now you know why I use Gen2 with Databricks, my struggle with service principals, and how I configure the connection between the two. I’m finally going to mount the storage account to the Databricks file system (DBFS) and show a couple of things I do once the mount is available.

Databricks Utilities

Save a few keystrokes and access the file system utilities directly using dbfs. The method names will be familiar to those who work in the command line but fear not; it’s all fairly self-describing.

Mounting Gen2 storage

For my own sanity, I always unmount from a location before trying to mount to it.

def mount(source_url, configs, mount_point):
  try:
    dbfs.unmount(mount_point)
  except:
    print("The mount point didn't have anything mounted to it.")
  finally:
    dbfs.mount(source=source_url,
               mount_point=mount_point,
               extra_configs=configs)

I went over creating the source URL and the configs map in the previous installment. The mount point is a *nix style path specification, the root is conventionally /mnt. Tweak on this method to manage your own exception handling, then call it for however many file systems you want to mount.

Navigating mounted storage

The path names will not exactly match the paths in your file system which is a side effect of the name of the mount point in DBFS. Say you’ve got a storage account called transactionvendors001, a file system named awesome-vendor, and a mount point called /mnt/awesome.

When navigating Gen2 in something like Azure Storage Explorer, your paths will be transactionvendors001/awesome-vendor/…/… but the equivalent in Databricks will be /mnt/awesome/…/…

A really nice feature is wildcard syntax in path specifications, which is basically a requirement for directories containing some arbitrary number of files. You might read raw JSON in using something like clients_df = spark.read.json('/mnt/awesome/clients/*.json', multiLine = True)

Series recap

I sang the praises of ADLS Gen2 storage combined with Azure Databricks, illustrated how I suck at service principals (and how to keep you from sucking at them), set up a boilerplate configuration for the Databricks handshake with storage accounts, and now actually mounted storage to DBFS.

This is all basically tax we’ve got to pay to get on the highway. It’s not fun and it doesn’t show off any of the awesome stuff Databricks can actually do per se. Stay tuned and I’ll shovel up some more exciting stuff from a novice perspective.

ADLS Gen2 and Azure Databricks – Part 3 – Spark Configuration

I went over why I use ADLS Gen2 with Databricks and how to set up a service principal to mediate permissions between them. Now we’ll configure the connection between Databricks and the storage account.

Config

This part is simple and mostly rinse-and-repeat. You need to set up a map of config values to use which identify the Databricks workspace with the storage account.

configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.createRemoteFileSystemDuringInitialization": "true",
    "fs.azure.account.oauth2.client.id": "<SP application ID>",
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "gen2", key = "databricks workspace"),
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<AAD tenant ID>/oauth2/token"
}

There’s nothing too magical going on here. I’ve never had a reason to change lines 2-4 though, honestly, I’ve never needed Databricks to tell the storage account to create the file system so I could probably do away with that line.

Lines 5-7 might need some tweaks depending on how many service principals, tenants, and secrets you use. I work in the same tenant for everything and I use the same service principal for all of the assorted ETL stuff I work in. Lastly, I have a one-to-one relationship between secrets and Databricks workspaces. With that arrangement, I can use a single config map for all of my storage accounts.

Take the above with a grain of salt. I don’t know if this could be more secure. Make sure you understand your own requirements.

The fill-in-the-blanks stuff threw me for a loop the first time. I always have trouble figuring out which settings or text boxes want which IDs.

Client ID

"fs.azure.account.oauth2.client.id" is the application ID of your service principal.

  • Go to the Azure portal
  • Go to Azure Active Directory
  • Select “App registrations” in the left blade
  • Select your service principal from the list of registrations
  • Copy the “Application (client) ID”
You’ll need the “Application (client) ID” GUID

Client endpoint

"fs.azure.account.oauth2.client.endpoint" is the same basic URL every time, you just plug in your Azure Active Directory tenant ID where the code snippet specifies.

  • Go to the Azure portal
  • Go to Azure Active Directory
  • Select “Properties” in the left blade
  • Copy the “Directory ID”
You’ll end up with something like https://login.microsoftonline.com/c559656c-8c6a-43b2-8ac4-c7f169fca936/oauth2/token when you insert your Directory ID

Client secret

"fs.azure.account.oauth2.client.secret" is the secret you made in the Azure portal for the service principal. You can type this in clear text but even I know enough about security to tell you not to do that. Databricks secrets are really easy and might be the only turnkey way to do this.

Managing secrets is a whole process on its own. I’ll dive in to that soon.

Storage location

This was another “duh” moment for me once I sorted it out. My problem was figuring out the protocol for the URL.

"abfss://file-system@storageaccount.bfs.core.windows.net/"

  • You can use abfs instead of abfss if you want to forego SSL
  • file-system is the name you gave a file system within a storage account
  • storageaccount is the name of the storage account file-system belongs to

Recap

This time, we created the Spark configuration. We didn’t persist or use the config yet: look for an upcoming installment on mounting storage in Databricks.