Execute Code from Local Machine on Databricks Spark Cluster

In this guide, you'll learn how to execute code from your local machine on a Databricks Spark cluster step by step. We will cover the environment setup, necessary installations, troubleshooting commands, and the code itself.

Prerequisites

Environment Setup

  1. Setup Virtual Environment: Create and activate a virtual environment:
    python -m venv venv
    source venv/bin/activate # On MacOS/Linux
    venv\Scripts\activate # On Windows
  2. Upgrade Pip: Make sure you have the latest version of pip:
    pip install --upgrade pip
    python -m ensurepip --upgrade
  3. Install Required Libraries: Install the necessary Python libraries:
    pip install azure-identity databricks-cli pyspark
  4. Upgrade Databricks Connect: Make sureyou're using the latest version of Databricks Connect:
    pip install -U databricks-connect
  5. Configure Databricks Connect: Configure Databricks Connect using your Databricks workspace URL and personal access token:
    databricks-connect configure
  6. Verify Databricks CLI Installation: Ensure the Databricks CLI is correctly installed:
    databricks -v
  7. Install Additional Tools: Install useful tools for troubleshooting and managing dependencies:
    pip install pipenv safety check-manifest pipdeptree

Databricks CLI Installation via Winget

  • Search for Databricks CLI: Use Winget to search for the Databricks CLI:
    winget search databricks
  • Install Databricks CLI: Install the Databricks CLI via Winget:
    winget install Databricks.DatabricksCLI
  • Manual Installation: Alternatively, download the CLI directly and install:
    ng https://github.com/databricks/cli/releases/download/v0.221.1/databricks_cli_0.221.1_windows_amd64-signed.zip
  • Install Databricks Connect for Python: Follow the instructions in the Databricks Connect documentation to install and configure it.

Troubleshooting Commands

  • Upgrade Specific Libraries: Ensure specific libraries are up-to-date:
    pip install --upgrade cryptography
    pip install pyopenssl==22.1.0
    pip install urllib3 --upgrade
  • Force Reinstall Libraries: Reinstall problematic libraries to fix issues:
    pip install --force-reinstall cryptography
    pip -vvv install --upgrade --force-reinstall cffi
  • Cache Management: Clear the pip cache and check for dependency issues:
    pip cache purge
    pip check
  • Setuptools, Wheel, CFFI: Upgrade setuptools, wheel, and CFFI:
    pip install --upgrade setuptools wheel cffi
  • Rust Installation: Install Rust to build some Python libraries:
    pip install --upgrade cryptography
    pip install --upgrade setuptools wheel cffi
    curl https://sh.rustup.rs -sSf | sh
    rustup install stable
    rustup default stable
  • Rust Version Check: Ensure Rust is correctly installed:
    rustup show
    rustc --version
    cargo --version
  • Generate Requirements File: Generate a `requirements.txt` file for your project:
    pip freeze > requirements.txt

Requirements File

Save the following content in a file named requirements.txt:


annotated-types==0.7.0
Authlib==1.3.1
azure-core==1.30.2
azure-identity==1.16.0
build==1.2.1
certifi==2024.6.2
cffi==1.16.0
charset-normalizer==3.3.2
check-manifest==0.49
click==8.1.7
colorama==0.4.6
cryptography==38.0.4
distlib==0.3.8
dparse==0.6.4b0
filelock==3.14.0
idna==3.7
importlib_metadata==7.1.0
Jinja2==3.1.4
markdown-it-py==3.0.0
MarkupSafe==2.1.5
marshmallow==3.21.3
mdurl==0.1.2
msal==1.28.0
msal-extensions==1.1.0
packaging==24.0
pipdeptree==2.22.0
pipenv==2024.0.0
platformdirs==4.2.2
portalocker==2.8.2
py4j==0.10.9.7
pycparser==2.22
pydantic==2.7.3
pydantic_core==2.18.4
Pygments==2.18.0
PyJWT==2.8.0
pyOpenSSL==22.1.0
pyproject_hooks==1.1.0
pyspark==3.5.1
pywin32==306
requests==2.32.3
rich==13.7.1
ruamel.yaml==0.18.6
ruamel.yaml.clib==0.2.8
safety==3.2.2
safety-schemas==0.0.2
shellingham==1.5.4
six==1.16.0
tomli==2.0.1
typer==0.12.3
typing_extensions==4.12.2
urllib3==2.2.1
virtualenv==20.26.2
zipp==3.19.2

Execute Code on Databricks Cluster

Here's the Python script to execute code on the Databricks Spark cluster:


import os
import time
import logging
import requests
from azure.identity import ClientSecretCredential
from pyspark.sql import SparkSession
import cProfile
import pstats


def main():
    # Azure AD and Databricks details
    TENANT_ID = os.getenv("TENANT_ID", "[[YOUR_TENANT_ID]]")
    CLIENT_ID = os.getenv("CLIENT_ID", "[[YOUR_CLIENT_ID]]")
    CLIENT_SECRET = os.getenv("CLIENT_SECRET", "[[YOUR_CLIENT_SECRET]]")
    DATABRICKS_HOST = "[[YOUR_DATABRICKS_HOST]]"
    CLUSTER_ID = "[[YOUR_CLUSTER_ID]]"
    ORG_ID = "[[YOUR_ORG_ID]]"

    # Define the temporary directory
    TEMP_DIR = r"[[YOUR_TEMP_DIRECTORY]]"
    os.makedirs(TEMP_DIR, exist_ok=True)

    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        handlers=[logging.StreamHandler()],  # Log to standard output
    )
    logger = logging.getLogger(__name__)

    def get_databricks_token():
        credential = ClientSecretCredential(
            tenant_id=TENANT_ID, client_id=CLIENT_ID, client_secret=CLIENT_SECRET
        )
        scope = ["2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default"]
        token = credential.get_token(*scope)
        return token.token

    def is_cluster_running(cluster_id, access_token):
        headers = {"Authorization": f"Bearer {access_token}"}
        response = requests.get(
            f"{DATABRICKS_HOST}/api/2.0/clusters/get",
            headers=headers,
            json={"cluster_id": cluster_id},
        )

        if response.status_code == 200:
            cluster_info = response.json()
            cluster_state = cluster_info.get("state", "UNKNOWN")
            logger.info(f"Cluster state: {cluster_state}")
            return cluster_state == "RUNNING"
        else
            logger.error(
                f"Failed to get cluster status: {response.status_code} - {response.text}"
            )
            return False

    def start_cluster_if_stopped(cluster_id, access_token):
        headers = {"Authorization": f"Bearer {access_token}"}
        response = requests.get```html
            f"{DATABRICKS_HOST}/api/2.0/clusters/get",
            headers=headers,
            json={"cluster_id": cluster_id},
        )

        if response.status_code == 200:
            cluster_info = response.json()
            cluster_state = cluster_info.get("state", "UNKNOWN")
            logger.info(f"Initial cluster state: {cluster_state}")

            if cluster_state in ["TERMINATED", "TERMINATING", "UNKNOWN"]:
                logger.info("Cluster is not running. Starting the cluster...")
                start_response = requests.post(
                    f"{DATABRICKS_HOST}/api/2.0/clusters/start",
                    headers=headers,
                    json={"cluster_id": cluster_id},
                )

                if start_response.status_code == 200:
                    logger.info("Cluster is starting...")
                    cluster_state = "PENDING"
                else:
                    logger.error(
                        f"Failed to start cluster: {start_response.status_code} - {start_response.text}"
                    )
                    return False

            # Wait until the cluster is running
            while cluster_state not in ["RUNNING", "ERROR"]:
                time.sleep(30)
                response = requests.get(
                    f"{DATABRICKS_HOST}/api/2.0/clusters/get",
                    headers=headers,
                    json={"cluster_id": cluster_id},
                )

                if response.status_code == 200:
                    cluster_info = response.json()
                    cluster_state = cluster_info.get("state", "UNKNOWN")
                    logger.info(f"Current cluster state: {cluster_state}")
                else:
                    logger.error(
                        f"Failed to get cluster status: {response.status_code} - {response.text}"
                    )
                    return False

            return cluster_state == "RUNNING"
        else:
            logger.error(
                f"Failed to get cluster status: {response.status_code} - {response.text}"
            )
            return False

    access_token = get_databricks_token()

    # Set environment variables
    os.environ["DATABRICKS_HOST"] = DATABRICKS_HOST
    os.environ["DATABRICKS_CLUSTER_ID"] = CLUSTER_ID
    os.environ["DATABRICKS_ORG_ID"] = ORG_ID
    os.environ["DATABRICKS_TOKEN"] = access_token

    os.environ["SPARK_HOME"] = r"[[YOUR_SPARK_HOME]]"
    os.environ["PATH"] = f"{os.environ['SPARK_HOME']}/bin;" + os.environ["PATH"]
    os.environ["PYSPARK_PYTHON"] = "python"
    os.environ["PYSPARK_DRIVER_PYTHON"] = "python"

    # Start the cluster if necessary and wait until it's running
    if start_cluster_if_stopped(CLUSTER_ID, access_token):
        # Initialize Spark session using Databricks Connect settings
        spark = (
            SparkSession.builder.appName("DatabricksConnectExample")
            .config("spark.master", "local")  # Set to local for Databricks Connect
            .config("spark.databricks.service.client.enabled", "true")
            .config("spark.databricks.service.address", DATABRICKS_HOST)
            .config("spark.databricks.service.token", access_token)
            .config("spark.databricks.service.clusterId", CLUSTER_ID)
            .config("spark.databricks.service.orgId", ORG_ID)
            .config(
                "spark.driver.extraJavaOptions",
                "-Dlog4j2.configurationFile=file:/databricks/spark/conf/log4j2.properties",
            )
            .config("spark.local.dir", TEMP_DIR)  # Set the custom temporary directory
            .getOrCreate()
        )

        # Set the logging level to reduce verbosity
        spark.sparkContext.setLogLevel("ERROR")

        logger.info("Spark session initialized successfully.")

        # Run a basic command (executed on the cluster)
        logger.info("Starting to generate a DataFrame with range 0-99.")
        df = spark.range(100)
        df.show()

        # Perform some transformations (all executed on the cluster)
        logger.info("Filtering the DataFrame to select rows with ID < 50")         
        df_filtered = df.filter(df.id < 50)
        df_filtered.show()

        # Aggregate data (also executed on the cluster)
        logger.info("Aggregating the filtered DataFrame to count the rows.")
        df_agg = df_filtered.groupBy().count()
        result = df_agg.collect()
        logger.info(f"Aggregated count from Databricks cluster: {result[0]['count']}")

        # Stop the Spark session
        logger.info("Stopping the Spark session.")
        spark.stop()
    else:
        logger.error(
            f"Cluster {CLUSTER_ID} is not running or couldn't be started. Please check the cluster status."
        )


# Profile the execution
if __name__ == "__main__":
    profiler = cProfile.Profile()
    profiler.enable()```html
    main()  # Run your main function
    profiler.disable()
    with open("profile_results.txt", "w") as f:
        stats = pstats.Stats(profiler, stream=f)
        stats.strip_dirs()
        stats.sort_stats("cumulative")
        stats.print_stats(6000)  # Print top functions

Summary

In this guide, we demonstrated how to execute code from a local machine on a Databricks Spark cluster. We covered:

  • Setting up the environment and installing necessary libraries.
  • Obtaining a Databricks token and checking the cluster status.
  • Executing code using the Databricks Connect configuration.

Make sure to replace the placeholders in the script with your own Azure Active Directory and Databricks credentials. Happy coding!

Comments

Trending

Integrating Okta with Azure Web Apps and Power BI Embedding