Execute Code from Local Machine on Databricks Spark Cluster
In this guide, you'll learn how to execute code from your local machine on a Databricks Spark cluster step by step. We will cover the environment setup, necessary installations, troubleshooting commands, and the code itself.
Prerequisites
- Install Python: Make sure you have Python installed on your local machine. You can download it from Python 3.9.0 or directly from Python 3.9.0 Installer.
- Install Databricks Connect: Follow the instructions in the Databricks Connect documentation to install and configure it.
- Databricks Cluster: Ensure you have a Databricks cluster set up and running. Refer to the Databricks documentation for guidance.
- Install Visual C++ Build Tools: Download and install the Microsoft Visual C++ Build Tools.
Environment Setup
-
Setup Virtual Environment: Create and activate a virtual
environment:
python -m venv venv
source venv/bin/activate # On MacOS/Linux
venv\Scripts\activate # On Windows -
Upgrade Pip: Make sure you have the latest version of pip:
pip install --upgrade pip
python -m ensurepip --upgrade -
Install Required Libraries: Install the necessary Python
libraries:
pip install azure-identity databricks-cli pyspark
-
Upgrade Databricks Connect: Make sureyou're using the latest
version of Databricks Connect:
pip install -U databricks-connect
-
Configure Databricks Connect: Configure Databricks Connect
using your Databricks workspace URL and personal access token:
databricks-connect configure
-
Verify Databricks CLI Installation: Ensure the Databricks CLI
is correctly installed:
databricks -v
-
Install Additional Tools: Install useful tools for
troubleshooting and managing dependencies:
pip install pipenv safety check-manifest pipdeptree
Databricks CLI Installation via Winget
-
Search for Databricks CLI: Use Winget to search for the
Databricks CLI:
winget search databricks
-
Install Databricks CLI: Install the Databricks CLI via Winget:
winget install Databricks.DatabricksCLI
-
Manual Installation: Alternatively, download the CLI directly
and install:
ng https://github.com/databricks/cli/releases/download/v0.221.1/databricks_cli_0.221.1_windows_amd64-signed.zip
- Install Databricks Connect for Python: Follow the instructions in the Databricks Connect documentation to install and configure it.
Troubleshooting Commands
-
Upgrade Specific Libraries: Ensure specific libraries are
up-to-date:
pip install --upgrade cryptography
pip install pyopenssl==22.1.0
pip install urllib3 --upgrade -
Force Reinstall Libraries: Reinstall problematic libraries to
fix issues:
pip install --force-reinstall cryptography
pip -vvv install --upgrade --force-reinstall cffi -
Cache Management: Clear the pip cache and check for dependency
issues:
pip cache purge
pip check -
Setuptools, Wheel, CFFI: Upgrade setuptools, wheel, and CFFI:
pip install --upgrade setuptools wheel cffi
-
Rust Installation: Install Rust to build some Python libraries:
pip install --upgrade cryptography
pip install --upgrade setuptools wheel cffi
curl https://sh.rustup.rs -sSf | sh
rustup install stable
rustup default stable -
Rust Version Check: Ensure Rust is correctly installed:
rustup show
rustc --version
cargo --version -
Generate Requirements File: Generate a `requirements.txt` file
for your project:
pip freeze > requirements.txt
Requirements File
Save the following content in a file named
requirements.txt
:
annotated-types==0.7.0
Authlib==1.3.1
azure-core==1.30.2
azure-identity==1.16.0
build==1.2.1
certifi==2024.6.2
cffi==1.16.0
charset-normalizer==3.3.2
check-manifest==0.49
click==8.1.7
colorama==0.4.6
cryptography==38.0.4
distlib==0.3.8
dparse==0.6.4b0
filelock==3.14.0
idna==3.7
importlib_metadata==7.1.0
Jinja2==3.1.4
markdown-it-py==3.0.0
MarkupSafe==2.1.5
marshmallow==3.21.3
mdurl==0.1.2
msal==1.28.0
msal-extensions==1.1.0
packaging==24.0
pipdeptree==2.22.0
pipenv==2024.0.0
platformdirs==4.2.2
portalocker==2.8.2
py4j==0.10.9.7
pycparser==2.22
pydantic==2.7.3
pydantic_core==2.18.4
Pygments==2.18.0
PyJWT==2.8.0
pyOpenSSL==22.1.0
pyproject_hooks==1.1.0
pyspark==3.5.1
pywin32==306
requests==2.32.3
rich==13.7.1
ruamel.yaml==0.18.6
ruamel.yaml.clib==0.2.8
safety==3.2.2
safety-schemas==0.0.2
shellingham==1.5.4
six==1.16.0
tomli==2.0.1
typer==0.12.3
typing_extensions==4.12.2
urllib3==2.2.1
virtualenv==20.26.2
zipp==3.19.2
Execute Code on Databricks Cluster
Here's the Python script to execute code on the Databricks Spark cluster:
import os
import time
import logging
import requests
from azure.identity import ClientSecretCredential
from pyspark.sql import SparkSession
import cProfile
import pstats
def main():
# Azure AD and Databricks details
TENANT_ID = os.getenv("TENANT_ID", "[[YOUR_TENANT_ID]]")
CLIENT_ID = os.getenv("CLIENT_ID", "[[YOUR_CLIENT_ID]]")
CLIENT_SECRET = os.getenv("CLIENT_SECRET", "[[YOUR_CLIENT_SECRET]]")
DATABRICKS_HOST = "[[YOUR_DATABRICKS_HOST]]"
CLUSTER_ID = "[[YOUR_CLUSTER_ID]]"
ORG_ID = "[[YOUR_ORG_ID]]"
# Define the temporary directory
TEMP_DIR = r"[[YOUR_TEMP_DIRECTORY]]"
os.makedirs(TEMP_DIR, exist_ok=True)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()], # Log to standard output
)
logger = logging.getLogger(__name__)
def get_databricks_token():
credential = ClientSecretCredential(
tenant_id=TENANT_ID, client_id=CLIENT_ID, client_secret=CLIENT_SECRET
)
scope = ["2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default"]
token = credential.get_token(*scope)
return token.token
def is_cluster_running(cluster_id, access_token):
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get(
f"{DATABRICKS_HOST}/api/2.0/clusters/get",
headers=headers,
json={"cluster_id": cluster_id},
)
if response.status_code == 200:
cluster_info = response.json()
cluster_state = cluster_info.get("state", "UNKNOWN")
logger.info(f"Cluster state: {cluster_state}")
return cluster_state == "RUNNING"
else
logger.error(
f"Failed to get cluster status: {response.status_code} - {response.text}"
)
return False
def start_cluster_if_stopped(cluster_id, access_token):
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get```html
f"{DATABRICKS_HOST}/api/2.0/clusters/get",
headers=headers,
json={"cluster_id": cluster_id},
)
if response.status_code == 200:
cluster_info = response.json()
cluster_state = cluster_info.get("state", "UNKNOWN")
logger.info(f"Initial cluster state: {cluster_state}")
if cluster_state in ["TERMINATED", "TERMINATING", "UNKNOWN"]:
logger.info("Cluster is not running. Starting the cluster...")
start_response = requests.post(
f"{DATABRICKS_HOST}/api/2.0/clusters/start",
headers=headers,
json={"cluster_id": cluster_id},
)
if start_response.status_code == 200:
logger.info("Cluster is starting...")
cluster_state = "PENDING"
else:
logger.error(
f"Failed to start cluster: {start_response.status_code} - {start_response.text}"
)
return False
# Wait until the cluster is running
while cluster_state not in ["RUNNING", "ERROR"]:
time.sleep(30)
response = requests.get(
f"{DATABRICKS_HOST}/api/2.0/clusters/get",
headers=headers,
json={"cluster_id": cluster_id},
)
if response.status_code == 200:
cluster_info = response.json()
cluster_state = cluster_info.get("state", "UNKNOWN")
logger.info(f"Current cluster state: {cluster_state}")
else:
logger.error(
f"Failed to get cluster status: {response.status_code} - {response.text}"
)
return False
return cluster_state == "RUNNING"
else:
logger.error(
f"Failed to get cluster status: {response.status_code} - {response.text}"
)
return False
access_token = get_databricks_token()
# Set environment variables
os.environ["DATABRICKS_HOST"] = DATABRICKS_HOST
os.environ["DATABRICKS_CLUSTER_ID"] = CLUSTER_ID
os.environ["DATABRICKS_ORG_ID"] = ORG_ID
os.environ["DATABRICKS_TOKEN"] = access_token
os.environ["SPARK_HOME"] = r"[[YOUR_SPARK_HOME]]"
os.environ["PATH"] = f"{os.environ['SPARK_HOME']}/bin;" + os.environ["PATH"]
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"
# Start the cluster if necessary and wait until it's running
if start_cluster_if_stopped(CLUSTER_ID, access_token):
# Initialize Spark session using Databricks Connect settings
spark = (
SparkSession.builder.appName("DatabricksConnectExample")
.config("spark.master", "local") # Set to local for Databricks Connect
.config("spark.databricks.service.client.enabled", "true")
.config("spark.databricks.service.address", DATABRICKS_HOST)
.config("spark.databricks.service.token", access_token)
.config("spark.databricks.service.clusterId", CLUSTER_ID)
.config("spark.databricks.service.orgId", ORG_ID)
.config(
"spark.driver.extraJavaOptions",
"-Dlog4j2.configurationFile=file:/databricks/spark/conf/log4j2.properties",
)
.config("spark.local.dir", TEMP_DIR) # Set the custom temporary directory
.getOrCreate()
)
# Set the logging level to reduce verbosity
spark.sparkContext.setLogLevel("ERROR")
logger.info("Spark session initialized successfully.")
# Run a basic command (executed on the cluster)
logger.info("Starting to generate a DataFrame with range 0-99.")
df = spark.range(100)
df.show()
# Perform some transformations (all executed on the cluster)
logger.info("Filtering the DataFrame to select rows with ID < 50")
df_filtered = df.filter(df.id < 50)
df_filtered.show()
# Aggregate data (also executed on the cluster)
logger.info("Aggregating the filtered DataFrame to count the rows.")
df_agg = df_filtered.groupBy().count()
result = df_agg.collect()
logger.info(f"Aggregated count from Databricks cluster: {result[0]['count']}")
# Stop the Spark session
logger.info("Stopping the Spark session.")
spark.stop()
else:
logger.error(
f"Cluster {CLUSTER_ID} is not running or couldn't be started. Please check the cluster status."
)
# Profile the execution
if __name__ == "__main__":
profiler = cProfile.Profile()
profiler.enable()```html
main() # Run your main function
profiler.disable()
with open("profile_results.txt", "w") as f:
stats = pstats.Stats(profiler, stream=f)
stats.strip_dirs()
stats.sort_stats("cumulative")
stats.print_stats(6000) # Print top functions
Summary
In this guide, we demonstrated how to execute code from a local machine on a Databricks Spark cluster. We covered:
- Setting up the environment and installing necessary libraries.
- Obtaining a Databricks token and checking the cluster status.
- Executing code using the Databricks Connect configuration.
Make sure to replace the placeholders in the script with your own Azure Active Directory and Databricks credentials. Happy coding!
Comments
Post a Comment