No dia 07 de março de 2023 a Databricks lançou para todo o publico sua nova feature de consumo de dados do Delta Lake via API chamado “Databricks SQL Statement Execution API”.
Segundo a descrição da própria Databricks (veja mais aqui) esta é uma forma de simplificar o acesso aos dados e facilitar a construção de aplicações de dados personalizadas para as suas necessidades.
O acesso aos dados é feito por meio de uma API REST assíncrona, o que simplifica a conexão, eliminando a necessidade de gerenciamento de conexões, como ocorre ao utilizar JDBC ou ODBC. Além disso, não é necessário instalar nenhum driver para acessar os dados. Você pode usar esta feature para conectar suas aplicações baseadas em cloud, serviços e dispositivos no Databricks SQL.
Neste post vou mostrar como utilizar esta nova feature que além de ser muito útil é bem simples de ser implementada.
A API
Endpoints
A API possui três endpoints:
POST /sql/statements – É o responsável por enviar nosso script ao banco. Caso demore para dar um retorno o endpoint devolve um “statment id” que será usado em outro endpoint para trazer o resultado assim que a consulta for finalizada. Se o retorno da consulta for rápido então os dados são retornados em formato JSON.
GET /sql/statements/{statement_id} – Busca o retorno de um script enviado passando como parâmetro o statment id da mesma.
POST /sql/statements/{statement_id}/cancel – Cancela a execução de um script em andamento.
Configuração
Precisamos de 3 informações para utilizar a API.
1 - Nome da instância do workspace Databricks
Esta informação está disponível na página inicial do Databricks no ambiente Azure no campo URL.
Exemplo: https://adb-1234567890123456.7.azuredatabricks.net
2 - Token de acesso pessoal
Precisamos gerar este token no Databricks. Ele será utilizado como forma de autenticação quando formos consumir a API. (“Authorization”:”Bearer {TOKEN}”)
No workspace do Databricks vá no menu superior do lado direito, clique na seta para baixo e escolha a opção User Settings
Na tela que será aberta entre na aba “Access tokens” e clique no botão “Generate new Token”
No campo “Comment” de um nome para seu token e no campo “Lifetime” escolha por quanto tempo este token será valido. Caso queira deixar ele valido para sempre deixe o campo vazio. Ao finalizar clique no botão “Generate”.
Uma nova tela será aberta com o código token gerado. **Copie-o e guarde em um lugar seguro pois depois que fechar esta tela você não conseguirá mais resgatar este valor.
Podemos ver abaixo que nosso token pessoal foi criado.
3 - ID do SQL Warehouse
Para o uso desta feature é necessário termos configurado um SQL Warehouse no ambiente de SQL do Databricks. Quando a chamada da API for efetuada ela será endereçada para este ambiente computacional que vai ser iniciado (caso esteja desligado) e será o responsável por processar os scripts SQL enviados nas requisições HTTP.
Acessamos o ambiente de SQL do Databricks no menu lateral superior esquerdo clicando em SQL.
Dentro do ambiente de SQL voltamos ao menu lateral e clicamos em “SQL Warehouses”
Por default o Databricks cria um SQL Warehouse padrão do qual podemos utilizar para o consumo da API.
Caso no seu ambiente não exista você pode criar-lo clicando no botão “Create SQL warehouse”
Clique no nome do SQL Warehouse que vai ser utilizado
Vá na aba “Connection Details” e no campo “HTTP path” copie o código que está depois da última barra. Este código é o id do SQL Warehouse.
Com os três itens acima e, mãos já podemos fazer o acesso aos dados via API.
Para fins de exemplificação do consumo da API irei usar o Postman, uma ferramenta própria para testes de consumo de APIs. Você pode usar a ferramenta que preferir.
Configuração da chamada da API
Configuramos a chamada da seguinte forma:
Verbo HTTP
Post
URL
URL da instância do workspace Databricks + api/2.0/ + endponint.
Exemplo: (http://adb-1234567890123456.7.azuredatabricks.net/api/2.0/statements/)
Header
Authorization: Bearer {Token de acesso pessoal}
Exemplo: Bearer dapi9aaa74b82cf1746dd9143b34g01ea5f9-9
Body
Enviamos aqui um JSON simples contendo os seguintes campos:
warehouse_id - O Id do SQL Warehouse que pegamos acima no item 2.
catalog - Nome do catalogo de dados.
schema – Nome do database no delta lake onde será feita a consulta.
statement – Script SQL que será enviado.
Com tudo configurado então fazemos a chamada da API que nos dá o seguinte retorno:
Podemos ver que ela devolveu um status de consulta pendente e também um “statement_id” para que possamos consultar em seguida o retorno do nosso select.
Enquanto isso se olharmos no SQL Warehouse podemos ver que automaticamente ele foi iniciado.
Esse início pode demorar alguns minutos. Depois que tiver iniciado podemos ver o “State” como Running.
Como o primeiro endpoint nos retornou um “statement_id” temos que consumir agora o segundo endpoint passando este valor como parâmetro para obter o resultado.
Para isso vamos fazer um novo consumo da API muito parecido com o primeiro que fizemos. As únicas diferenças serão:
Verbo
GET
URL
Acrescentaremos no final o “statement_id”
Body
Não enviamos body
Abaixo temos o resultado
Segue o JSON completo do resultado
{
"statement_id": "01edc420-728f-1a8e-8697-785deaca0b47",
"status": {
"state": "SUCCEEDED"
},
"manifest": {
"format": "JSON_ARRAY",
"schema": {
"column_count": 3,
"columns": [
{
"name": "name",
"type_text": "STRING",
"type_name": "STRING",
"position": 0
},
{
"name": "genre",
"type_text": "STRING",
"type_name": "STRING",
"position": 1
},
{
"name": "age",
"type_text": "INT",
"type_name": "INT",
"position": 2
}
]
},
"total_chunk_count": 1,
"chunks": [
{
"chunk_index": 0,
"row_offset": 0,
"row_count": 6
}
],
"total_row_count": 6
},
"result": {
"chunk_index": 0,
"row_offset": 0,
"row_count": 6,
"data_array": [
[
"João da Silva",
"M",
"40"
],
[
"Maria de Fatima Bueno",
"F",
"72"
],
[
"Marcia Farias",
"F",
"45"
],
[
"Otelo Ribeiro",
"M",
"18"
],
[
"Douglas Poso",
"M",
"37"
],
[
"Helena Poso",
"F",
"22"
]
]
}
}
Vamos fazer uma nova consulta, só que agora adicionando um pouco mais de complexidade. Faremos um group by por gênero conforme coloco abaixo.
Desta vez como o SQL Warehouse já estava no ar e a consulta retornou dados rapidamente,ele nos devolveu o resultado do select ao invés do “statement_id”
JSON completo
{
"statement_id": "01edc422-4337-1211-a0b2-7c479fd69685",
"status": {
"state": "SUCCEEDED"
},
"manifest": {
"format": "JSON_ARRAY",
"schema": {
"column_count": 2,
"columns": [
{
"name": "genero",
"type_text": "STRING",
"type_name": "STRING",
"position": 0
},
{
"name": "quantidade",
"type_text": "BIGINT",
"type_name": "LONG",
"position": 1
}
]
},
"total_chunk_count": 1,
"chunks": [
{
"chunk_index": 0,
"row_offset": 0,
"row_count": 2
}
],
"total_row_count": 2
},
"result": {
"chunk_index": 0,
"row_offset": 0,
"row_count": 2,
"data_array": [
[
"F",
"3"
],
[
"M",
"3"
]
]
}
}
Vamos testar agora o ultimo endpoint, aquele que cancela uma consulta enviada.
Seguindo o mesmo padrão do primeiro consumo:
Verbo
POST
URL
URL da instância do workspace Databricks + api/2.0/ + endponint + statment_id+/cancel.
Exemplo (http://adb-1234567890123456.7.azuredatabricks.net+ api/2.0/statements/ /01edc423-2721-1e12-8acc-d4f456a7171d/cancel)
Header
Authorization Bearer {Token de acesso pessoal}
Ele não devolve um JSON, porém podemos ver o Status HTTP 200 que indica que a consulta foi cancelada com sucesso.
Mas não pense que este serviço executa apenas selects no Delta Lake... também é possível utilizar os comandos DML. Vamos ver mais alguns exemplos.
Inserção de Dados
Enviamos um comando de inserção simples para a mesma tabela cliente.
O retorno é um JSON informando que a operação foi bem-sucedida (tag “state”), trazendo a quantidade de linhas afetadas (“num_affected_rows”) e inseridas (“num_inserted_rows”).
"statement_id": "01edc71a-1d71-1297-93ee-faa1b78988c9",
"status": {
"state": "SUCCEEDED"
},
"manifest": {
"format": "JSON_ARRAY",
"schema": {
"column_count": 2,
"columns": [
{
"name": "num_affected_rows",
"type_text": "BIGINT",
"type_name": "LONG",
"position": 0
},
{
"name": "num_inserted_rows",
"type_text": "BIGINT",
"type_name": "LONG",
"position": 1
}
]
},
"total_chunk_count": 1,
"chunks": [
{
"chunk_index": 0,
"row_offset": 0,
"row_count": 1
}
],
"total_row_count": 1
},
"result": {
"chunk_index": 0,
"row_offset": 0,
"row_count": 1,
"data_array": [
[
"1",
"1"
]
]
}
}
Vamos conferir se o registro foi mesmo inserido. Faremos agora um select buscando por registros cujo a idade seja igual a 80 anos.
Vemos na imagem acima o retorno do nosso cliente João da Souza.
Update
Vamos aproveitar o mesmo registro e faremos agora um update. Vamos alterar o campo “age” para 90.
Algumas vezes ele não consegue retornar diretamente o resultado. Vimos isso lá no começo desta postagem. Mas ele devolve um “statment_id” que usamos para consultar o resultado.
Vamos agora fazer um select deste registro porem filtrando agora pela idade igual a 90 anos.
Para finalizar, faremos agora a exclusão do registro com um comando delete.
Fazendo o select pela idade igual a 90 ele não retorna mais registros.
Vemos que o resultado para a seleção retornou vazio, conforme o esperado.
O Databricks vem facilitando o acesso aos dados cada dia mais. Esse tipo de conexão deixa o processo muito mais simples e nos abre um grande leque de oportunidades.
Espero que tenham gostado e que se aventurem também nessa nova forma de acesso aos dados do Delta Lake!