Databricks
Best Practices
Data Ingestion
File\Event based sources:
- Use Autoloader for easier checkpointing
- Use CleanSource option to maintain the landing/input directory. Available from 16.4 LTS
Orchestration
Parallelism
- Avoid threadpool executor and use For-Each with Concurrency based on requirement. This gives higher visibility of operations and spreads load across workers.
Security
Observability
Freshness
- Setup SQL Alerts with notifications
- Setup Dashboards for specific requirements
FinOps
Databricks Performance Guide
Common Operations
graph LR
ShallowClone --> TestChanges
TestChanges --> VerifyImpactedRows
VerifyImpactedRows --> Implement
Unzipping a Zip file within Databricks
from zipfile import ZipFile
from io import BytesIO
def unzip_file_within_databricks(source_file_path, target_dir_path):
"""
Unzips a zip file within Databricks and stores the unzipped files in the target directory
Parameters
----------
source_file_path : str
The path to the zip file to be unzipped - Eg: "abfss://source-path@sadataengci.dfs.core.windows.net/file.zip"
target_dir_path : str
The path to the target directory where the unzipped files should be stored Eg: "abfss://target-path@sadataengci.dfs.core.windows.net/target-folder/"
Returns
-------
None
"""
zip_df = spark.read.format("binaryFile").load(source_file_path)
zip_content = zip_df.select("content").collect()[0][0]
with ZipFile(BytesIO(zip_content), 'r') as zipping:
for fname in zipping.namelist():
if fname.endswith(".csv"):
f_content = zipping.read(fname)
target_filepath = f"{target_dir_path}{fname}"
print(fname)
dbutils.fs.put(target_filepath, f_content.decode("utf-8"), overwrite=True)
print("Unzipped and saved")