iMasters.com - Knowledge for Developers.
Follow Us
Home Backend [Databricks API as an internal service] dbutils — notebook.run, widgets.getArgument, widgets.text and notebook_params
Backend

[Databricks API as an internal service] dbutils — notebook.run, widgets.getArgument, widgets.text and notebook_params

With an increasing demand for greater autonomy in data pipelines, the Databricks data analytics platform has become a popular solution for companies looking to build scalable and efficient pipelines. During a freelance project, I was asked for greater autonomy over the core of the company, which is defined by a data pipeline that imports files from several different SFTPs, ingests to S3, a series of data treatments to follow a mandatory layout, and, finally, the sending to a queue system where the other systems could take advantage of the refined data. I thought about how I could meet the following requirements with little or no development beyond what already exists in Databricks. Therefore, in this article, we will talk about how you can use the Databricks API as an internal service and give autonomy to:

-> Reprocess a file from its source (SFTP).

-> Generate only the JSON that is sent to SQS.

-> Process a file in an approval environment without sending it to SQS. These are some required autonomies, but what will be displayed here is valid for any business rule.

dbutils — notebook.run and widgets.get

Before some Databricks evolutions regarding “steps” within a job, I always used a very interesting function, which is dbutils.notebook.run. With it, I was able to assemble a kind of notebook orchestration that I would like to run both in sequence and in parallel, as well as assign retry and time (parameters that you can pass in this function). In this way, I set up a single notebook like the one below, which carried out my extraction, transformation, and sending orchestration of data from hundreds of files:

Orchestration notebook example:

class PropriedadesNoteBook:
def __init__(self, path, timeout, retry=1):
self.path = path
self.timeout = timeout
self.retry = retrynbmovefiles = PropriedadesNoteBook(“0-move-ftp-to-s3”, 500,3)
nblimpadados = PropriedadesNoteBook(“1-notebook_limpa_dados”, 1000,2)
nbimport = PropriedadesNoteBook(“2-notebook_importa_arquivos”, 1000,2)

In the example above, I do my entire “ETL” step sequentially, as in my case, the files were served via SFTP 1-3 times a day (sometimes much more, which is where I want to go with this article). Nice. You have your job, your orchestration, and your notebooks that execute tasks, and your job runs in a defined window according to the definition with the business team. But, after being developed, tested, and validated, this process grew throughout the business, mainly in 3 aspects:

Many new customer setups, which the product team needed to validate the JSON processed by Databricks at the approval or production level.

Recurrence of customers that, due to an error on their side, the file was available outside their job’s schedule.

Need to send the complete stream but to a different messaging system.

Well, we arrived where it will be shown how to use and the effectiveness of the functions below:

notebook_params (parametro da API do databricks)
dbutils.widgets.getArgument
dbutils.widgets.text

Passing and receiving data through widgets between notebooks.

As I mentioned earlier, I felt the need to bring more freedom to users and systems and take some of the data engineer’s worry out of modifying, executing, and monitoring the flow of importing, processing, and sending data to AWS SQS (message queue system). In this way, I started to elaborate a notebook responsible for the reprocessing of the needs above, remembering:

Many new customer setups that the product team needed to validate the JSON processed by Databricks at the homologation or production level.

Recurrence of customers that, due to an error on their side, the file was available outside their job’s schedule.

Need to send the complete stream but to a different messaging system.

Continuing, in addition to the notebook I developed, which I called reprocessing, I created a second notebook called orchestrador_reprocessamento and, finally, I created a bucket dedicated to this type of reprocessing need, getting only the penultimate process that the JSON, send, or not to the SQS and etc. Great, now, how the user or application will call this notebook and specify exactly what he wants in view of these three new features? For this, I used the Databricks API, which is composed of your cluster ID + .cloud.databricks.com/api/2.1/jobs/run-now. I Got something like: https://meuidcluster.cloud.databricks.com/api/2.1/jobs/run-now. Also, as I mentioned earlier, I created a bucket focused on data reprocessing, in which bucket I create the following folders:

– hmle/json

– hmle/sqs

– json_gerados

– prod/json

– prod/sqs

Basically, as most of the needs (except the complete processing involving SFTP) involve the need to place the SETUP file for processing, I created the folders above where my reprocessing notebook has the following intelligence:

– hmle/json — If there are files inside that directory, I start my reprocessing notebook by passing via widgets to my regular flow notebook (this way, I keep the daily flow rule without creating something separate with this need for reprocessing) as follows:

HMLE_PROCESS = "T"
PROD_PROCESS = "F"
JSON_PROCESS = "T"
dbutils.widgets.text('HMLE_PROCESS', HMLE_PROCESS)
dbutils.widgets.text('PROD_PROCESS', PROD_PROCESS)
dbutils.widgets.text('JSON_PROCESS', JSON_PROCESS)
if dbutils.notebook.run(nbprocessjson.path, nbprocessjson.timeout, {"HMLE_PROCESS": HMLE_PROCESS, "PROD_PROCESS": PROD_PROCESS, "JSON_PROCESS": JSON_PROCESS}) != "error":
  print("Notebook "+nbprocessjson.path+ " executado com sucesso!")
else:
  print("Ocorreu um erro na execução do notebook: "+nbprocessjson.path)
  send_slack_message_reprocessing("1-importa arquivos lake", f"Ocorreu uma falha na execução do notebook: {nbprocessjson.path}")

The code snippet above runs in my reprocessing notebook. Notice that I pass everything in the widgets according to how the file was available by the user to the bucket, i.e., I want to process the file and obtain the JSON in an approval environment. In the notebook that will receive the parameters, it looks like this:

ry:
    JSON_PROCESS = dbutils.widgets.getArgument("JSON_PROCESS")
    PROD_PROCESS = dbutils.widgets.getArgument("PROD_PROCESS")
    HMLE_PROCESS= dbutils.widgets.getArgument("HMLE_PROCESS")
    
    print(f"JSON - {JSON_PROCESS} PROD - {PROD_PROCESS} - File json name - {HMLE_PROCESS}") 
except Exception as e:
    print(f"Falha nos widgets - {str(e)}")
    print("Processamento diário!")
    pass

In this way, I can call my daily processing notebook, adapting future business rules without worrying about whether this will be reflected in the processing, as the reprocessing already points to the main notebooks.

-> hmle/sqs — I perform the same logic as the code above, however when it arrives in my notebook for sending to SQS, I validate my widgets and see if it is filled in or not as HMLE or Prod and point to the correct SQS.

-> json_gerados — When I’m asked to generate the file processing only up to the JSON step (and not send it to SQS), I write the file with ano_mes_dia_minute_second.json in this bucket.

-> prod/json — Same logic for hml/json.

-> prod/sqs — Same logic as the others. I send the file available for processing in this folder to the Prod SQS.

Using the Databricks API for full reprocessing

You might be wondering why you introduced a user logic to make the file available in the bucket in the specific folders as per your need, but the full reprocessing (SFTP -> S3 ingestion -> data cleaning -> layout processing -> SQS upload ) does not follow this pattern but via the Databricks API? This complete reprocessing case involves a step where the user or internal API sends the file already obtained by the user to one of the buckets, which is the SFTP access of each client (+40 clients). In addition to being sensitive information, I need to ensure that, when I handle the client’s SFTP, the file will make it to the final step, which is AWS SQS. So this is where notebook_params comes in. Very simply, I pass an attribute within this parameter in the request body, and it is received by my notebook linked to my reprocessing job as a widget.

Request example:

{
“job_id”: 961722246913259,
“pipeline_params”: {
“full_refresh”: true
},
“notebook_params”: {
“ftp”: true
}
}

Therefore, in orchestration notebook, I validate if: dbutils.widgets.get(“ftp”) exists, and if it is filled in as true or false, and, if true, I call my notebook through dbutils.notebook.run communication and import with the clients’ SFTP.

Conclusion

In this article, we explore the advantages of using the Databricks API and its capabilities in a project that involves importing, processing, and sending data to a queue system. We demonstrate how the Databricks platform allows the creation of scalable and efficient data pipelines, providing greater autonomy to users and systems. Using the dbutils.notebook.run function, it was possible to orchestrate notebooks and execute ETL steps sequentially and in parallel, in addition to assigning retry and time. With the implementation of widgets and the Databricks API, we were able to adapt the workflow according to the specific needs of users, allowing the reprocessing of files, generation of JSONs, and processing in an approval or production environment. Using the Databricks API, we can ensure that even in cases of complete reprocessing, the file will be sent to AWS SQS after going through all pipeline steps. This will show how flexible and efficient the Databricks platform can be when dealing with different business rules and data processing requirements. And the most amazing thing is that I created a reprocessing flow that follows my pipeline daily standard flow without having to make it super complex.

*The content of this article is the author’s responsibility and does not necessarily reflect the opinion of iMasters.

Written by
Airton Lira Junior

Airton Graduated in systems development analysis from FIAP - Faculdade de Tecnologia Paulista, a postgraduate in database from IESP - Higher Education Institute of Paraíba, a specialist in Protheus - TOTVS, developer in Go, Python, Java languages, ​​and enthusiast in the architectures of software in DevOps.

Leave a comment

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Related Articles

Backend

How to Create a Skill for Amazon’s Virtual Assistant Alexa

If you didn’t know, it is not necessary for an Amazon Echo...

Backend

The APIs role in a 5G world

5G is about to revolutionize how we connect and use technology daily....

Backend

EF Core using AsNoTracking with Identity Resolution

Today we will see the consequences of using AsNoTracking with the Identity...

Backend

Understand key features added in ASP.NET Core 7.0

Version 7.0 of the .NET platform brought many new features in this...