Pular para conteúdo

sidra-sql

Infraestrutura avançada de ETL e Data Warehousing para dados SIDRA do IBGE.

Pegadinhas da fonte oficial

  • O IBGE revisa dados sem aviso. PIB Q3 2020 publicado em janeiro de 2021 pode estar diferente em 2026. O sidra-sql usa SCD Type II (ativo, modificacao) — nunca sobrescreve, sempre acrescenta. Queries com WHERE modificacao <= 'YYYY-MM-DD' AND ativo = TRUE reproduzem qualquer snapshot histórico.
  • Códigos de tabela são strings. "1620", não 1620. Inteiro causa 404 silencioso.
  • unnest_classifications = true é uma arma carregada. Tabelas com 6 classificações fazem cross-product explosivo. Comece com false e expanda só quando souber o volume.
  • INSERT linha-a-linha leva horas. O motor usa COPY FROM STDIN + tabela de staging — 400k linhas/s em hardware comum. Não tente "otimizar" via SQLAlchemy ORM.
  • Períodos têm formatos heterogêneos. Mensal 202301, trimestral 202301, anual 2023, plurianual 2020-2023. A dimensão periodo já normaliza com ano, mes, trimestre, data_inicio, data_fim — joine pela coluna que faz sentido.
  • config.ini lê do CWD. Se você roda de outro diretório, ele falha sem mensagem clara. os.chdir() ou caminho absoluto.

sidra-sql banner

O Que É

sidra-sql é uma infraestrutura completa de Data Warehousing e ETL (Extração, Transformação, Carregamento) projetada para ingerir, estruturar e versionarizare dados extraídos do SIDRA.

Enquanto sidra-fetcher resolve o problema de comunicação (obter dados da API), sidra-sql resolve o problema de governança, persistência e reprodutibilidade. Converte dados IBGE brutos e hierárquicos em um banco de dados relacional PostgreSQL otimizado para consultas analíticas pesadas.

Problema que Resolve

Trabalhar com dados IBGE para pesquisa rigorosa ou modelagem financeira envolve desafios estruturais além do simples download:

1. Ontologia Complexa do IBGE

SIDRA organiza dados em estruturas altamente aninhadas:

  • Agregados (tabelas) contêm variáveis em múltiplos níveis territoriais
  • Até 6 classificações diferentes podem se cruzar
  • Achatar isso em arquivos CSV destrói integridade referencial e causa duplicação massiva

Estrutura SIDRA (hierárquica):

graph TD
    T[Tabela 1620 - PIB] --> V[Variável 116 - PIB Real]
    V --> L1[Território: Brasil - Nível 1]
    V --> L3[Território: Estados - Nível 3]
    V --> L5[Território: Municípios - Nível 5]
    T --> C[Classificação: Por Atividade]
    C --> C1[Agricultura]
    C --> C2[Indústria]
    C --> C3[Serviços]

Problema: Como representar isso em um CSV flat sem duplicação?

2. Gargalos de I/O de Ingestão

Salvar dezenas de milhões de linhas usando métodos tradicionais (inserção ORM linha por linha) pode levar horas e esgotar RAM.

3. Revisões & Reprodutibilidade

O IBGE frequentemente revisa dados históricos. Simplesmente substituir dados antigos por novos destrói a reprodutibilidade de pesquisa acadêmica ou modelos de ML treinados em "snapshots históricos."

sidra-sql resolve todos os três com arquitetura sênior de Data Engineering:

Architecture & Recursos Principais

1. Ingestão Streaming (Performance "Gold Standard")

Para volumes massivos de dados sem exaustão de memória:

  • Estratégia Two-Pass: Primeiro pass resolve dimensões e chaves estrangeiras; segundo pass faz bulk load
  • PostgreSQL COPY FROM STDIN: Protocolo binário nativo transmite milhões de registros em segundos
  • Upsert Atômico: Usa ON CONFLICT DO NOTHING para deduplicação
  • Tabelas de Staging: Tabela temporária _staging_dados para operações atômicas

Performance: Inserir 10M linhas em ~30 segundos (vs horas com INSERT tradicional)

2. Star Schema Dimensional

Modelagem relacional estrita separa metadados de fatos:

Tabelas de Dimensão:

  • tabela_sidra: Metadados brutos em formato JSONB (preserva estrutura IBGE)
  • localidade: Malha territorial (Brasil, estados, municípios, regiões)
  • periodo: Dimensão de tempo com níveis apropriados de agregação
  • dimensao: Cruzamentos de classificações (categorias × variáveis)

Tabela de Fatos (dados):

  • Extremamente enxuta: Apenas chaves estrangeiras, data de modificação, flag de versão, valor
  • Constraint unique composto previne duplicatas
  • Otimizada para consultas analíticas (workloads OLAP)
erDiagram
    localidade ||--o{ dados : localidade_id
    dimensao ||--o{ dados : dimensao_id
    periodo ||--o{ dados : periodo_id
    tabela_sidra ||--o{ dados : tabela_sidra_id

    localidade {
        bigint id PK
        text nc
        text nn
        text d1c
        text d1n
    }
    dimensao {
        bigint id PK
        text mc
        text mn
        text d2c
        text d2n
        text d4c
    }
    dados {
        bigint id PK
        text tabela_sidra_id FK
        bigint localidade_id FK
        bigint dimensao_id FK
        integer periodo_id FK
        date modificacao
        boolean ativo
        text v
    }

3. Dimensões que Mudam Lentamente (SCD Type II)

Auditabilidade para ambientes de pesquisa e regulatória:

  • Nunca deletar: Quando IBGE revisa dados históricos, inserir nova versão + marcar antiga como ativo = FALSE
  • Histórico completo preservado: Saber exatamente como o banco de dados estava em qualquer data passada
  • Colunas: modificacao (data), ativo (flag booleano)

Caso de uso acadêmico: Reproduzir pesquisa de 2015 exatamente com dados de 2015, mesmo se 2026 tem correções

4. Pipelines Declarativos via TOML

Lógica de negócio isolada de código:

# fetch.toml - Definir pipelines sem codificar
[[tabelas]]
tabela_sidra = "1620"
variables = ["116"]                    # Variável PIB
territories = {6 = []}                 # Total Brasil

[[tabelas]]
tabela_sidra = "1737"
variables = ["63"]                     # Inflação IPCA
territories = {1 = [], 3 = []}        # Níveis 1 & 3

5. Explosão Automática de Classificações

A diretiva unnest_classifications = true dispara algoritmo recursivo:

  • Mapeia todos os cross-produtos variável × categoria automaticamente
  • Elimina descoberta manual de ID
  • Gera consultas dimensionais otimizadas

6. Arquitetura de Plugin

O motor é leve e genérico; definições de dados vivem em repositórios Git separados:

Plugin (seu repo Git)
├── manifest.toml       ← registro de pipeline
├── pipeline-a/
│   ├── fetch.toml      ← o que baixar do SIDRA
│   ├── transform.toml  ← config de tabela analytics
│   └── transform.sql   ← consulta de desnormalização

Instalar e executar:

sidra-sql plugin install https://github.com/Quantilica/sidra-pipelines.git --alias std
sidra-sql run std pib_municipal

Recursos Adicionais

  • ✅ Busca full-text em metadados SIDRA (JSONB português)
  • ✅ Cache de metadados para performance (arquivos JSON locais)
  • ✅ Integração PostgreSQL (transações ACID)
  • ✅ Operações idempotentes (re-execução segura)
  • ✅ Lógica de retry com backoff exponencial (até 5 tentativas)
  • ✅ Trilhas de auditoria via colunas ativo + modificacao

Instalação

pip install git+https://github.com/Quantilica/sidra-sql.git

Configuração

sidra-sql lê um arquivo config.ini no diretório de trabalho:

[storage]
data_dir = data

[database]
user = postgres
password = suasenha
host = localhost
port = 5432
dbname = dados
schema = ibge_sidra
tablespace = pg_default
readonly_role = readonly_role

Carregue a config em Python:

from sidra_sql.config import Config

config = Config()  # lê config.ini do diretório atual

Exemplo Rápido: Pipeline ETL

Opção 1: CLI (Recomendado)

Use pipelines padrão pré-construídos de sidra-pipelines:

# Instalar catálogo padrão
sidra-sql plugin install https://github.com/Quantilica/sidra-pipelines.git --alias std

# Executar pipeline (baixar + carregar + transformar)
sidra-sql run std pib_municipal

# Consultar resultados em PostgreSQL
psql -c "SELECT * FROM analytics.pib_municipal WHERE ano >= 2020"

Opção 2: TOML Declarativo (Recomendado para pipelines customizados)

Define a fetch pipeline:

# pipelines/economic/fetch.toml
[[tabelas]]
tabela_sidra = "1620"
variables = ["116"]
territories = {6 = []}
unnest_classifications = true

[[tabelas]]
tabela_sidra = "1737"
variables = ["63"]
territories = {1 = [], 3 = []}

Define a transform:

# pipelines/economic/transform.toml
[[table]]
name = "economic_indicators"
schema = "analytics"
strategy = "replace"
sql = "transform.sql"
-- pipelines/economic/transform.sql
SELECT
    l.d1n  AS territory,
    p.codigo AS period,
    d.d2n  AS variable,
    r.v    AS value
FROM ibge_sidra.dados r
JOIN ibge_sidra.localidade l ON r.localidade_id = l.id
JOIN ibge_sidra.periodo    p ON r.periodo_id    = p.id
JOIN ibge_sidra.dimensao   d ON r.dimensao_id   = d.id
WHERE r.ativo = TRUE

Execute o pipeline:

from pathlib import Path
from sidra_sql.config import Config
from sidra_sql.toml_runner import TomlScript
from sidra_sql.transform_runner import TransformRunner

config = Config()

# Extract + Load
TomlScript(config, Path("pipelines/economic/fetch.toml")).run()

# Transform (SQL → analytics schema)
TransformRunner(config, Path("pipelines/economic/transform.toml")).run()

Opção 3: ETL Programático

Controle total sobre cada fase:

from sidra_sql.config import Config
from sidra_sql.sidra import Fetcher
from sidra_sql.storage import Storage
from sidra_sql.database import get_engine, save_agregado, load_dados

config = Config()
engine = get_engine(config)
storage = Storage.default(config)

with Fetcher(config, storage=storage, max_workers=4) as fetcher:
    # 1. BUSCAR metadados (territórios, períodos, classificações)
    metadata = fetcher.fetch_metadata("1620")
    save_agregado(engine, metadata)

    # 2. BAIXAR arquivos de dados para armazenamento local
    data_files = fetcher.download_table(
        tabela_sidra="1620",
        territories={"6": []},  # Total Brasil
        variables=["116"],
    )

# 3. CARREGAR em PostgreSQL via COPY FROM STDIN
load_dados(engine, storage, data_files)

Governança de Dados: Tratamento de Revisões

IBGE frequentemente publica revisões de dados. O warehouse preserva o histórico em vez de sobrescrever, via SCD Type II nas colunas ativo + modificacao — o mecanismo de desativar-e-inserir está detalhado em Bulk Load & Versionamento de Revisões. Na prática, isso permite duas consultas:

Consultando Snapshots Históricos

from datetime import date
from sqlalchemy import select, and_
from sidra_sql.database import get_engine
from sidra_sql.config import Config
from sidra_sql.models import Dados

engine = get_engine(Config())

# Data as it existed on 2024-06-01
snapshot_date = date(2024, 6, 1)

with engine.connect() as conn:
    rows = conn.execute(
        select(Dados).where(
            and_(
                Dados.tabela_sidra_id == "1620",
                Dados.modificacao <= snapshot_date,
                Dados.ativo == True,
            )
        )
    ).fetchall()

Trilha de Auditoria

from sqlalchemy import select, and_
from sidra_sql.models import Dados

# Todas as versões de um ponto de dados específico
with engine.connect() as conn:
    history = conn.execute(
        select(Dados).where(
            and_(
                Dados.tabela_sidra_id == "1620",
                Dados.localidade_id == 6,
                Dados.periodo_id == 123,
            )
        ).order_by(Dados.modificacao)
    ).fetchall()

for row in history:
    status = "ATIVO" if row.ativo else "SUBSTITUÍDO"
    print(f"{row.modificacao}: {row.v}  [{status}]")

Ingestão Streaming

A carga usa COPY FROM STDIN em dois passes (resolve dimensões em memória, depois faz o streaming dos fatos para staging) — ~400k linhas/s, 10M linhas em segundos contra horas com INSERT tradicional. O mecanismo, os diagramas e o comparativo de performance estão em Bulk Load & Versionamento de Revisões.

Em código, toda a carga fica atrás de um único helper:

from sidra_sql.database import get_engine, load_dados
from sidra_sql.storage import Storage
from sidra_sql.config import Config

config = Config()
engine = get_engine(config)
storage = Storage.default(config)

# data_files é a lista retornada por Fetcher.download_table()
load_dados(engine, storage, data_files)  # COPY FROM STDIN, two-pass

Referência de API

CLI

Executar pipelines

sidra-sql run <alias> [pipeline_id] [--force-metadata]

Executa pipeline(s) de um plugin instalado. Omita pipeline_id para executar todos os pipelines do plugin. --force-metadata re-busca metadados da tabela SIDRA mesmo se já estiverem em cache.

sidra-sql run-path <path> [--force-metadata]

Executa um pipeline diretamente a partir de um diretório, sem plugin registrado.

sidra-sql transform <alias> <pipeline_id>

Executa apenas a etapa de transformação SQL de um pipeline (sem fetch nem load).

Gerenciar plugins

sidra-sql plugin install <url> [--alias ALIAS]
sidra-sql plugin update [alias]
sidra-sql plugin remove <alias>
sidra-sql plugin list
sidra-sql plugin validate [alias] [--plugin-dir PATH]
sidra-sql plugin scaffold <name>
    [-d/--description TEXT]
    [--version TEXT]          # padrão: 1.0.0
    [-o/--output-dir PATH]    # padrão: diretório atual
    [--git-init/--no-git-init]  # padrão: --git-init

Cria a estrutura de arquivos (manifest.toml, fetch.toml, transform.toml, .sql, README.md) para um novo plugin, com git init opcional.

sidra-sql plugin add-pipeline <pipeline_id>
    [-d/--description TEXT]
    [-p/--path PATH]          # caminho relativo ao plugin (padrão: pipeline_id)
    [--plugin-dir PATH]       # diretório raiz do plugin (padrão: diretório atual)

Adiciona um novo pipeline a um plugin existente, atualizando manifest.toml.

Configuração

sidra-sql config set <section.option> <value> [--global]
sidra-sql config get <section.option>
sidra-sql config list [--global] [--local]

Gerencia config.ini. Sem --global, lê/escreve o arquivo local. Com --global, usa ~/.config/sidra-sql/config.ini. O config list sem flags mostra a visão mesclada (local sobrescreve global).


API Python (resumo)

Os pontos de entrada do pacote, na ordem de um ETL. Assinaturas completas no código-fonte do repositório.

Símbolo Papel
Config() config.ini do diretório atual (storage + conexão PostgreSQL)
Fetcher(config, storage=, max_workers=) Extração: fetch_metadata(tabela), download_table(tabela, territories, variables)
Storage.default(config) Arquivos JSON locais: write_data, read_data, write_metadata
TomlScript(config, toml_path) ETL declarativo: baixa + carrega as tabelas do fetch.toml
TransformRunner(config, toml_path) Executa o transform.toml + .sql pareado
get_engine, save_agregado, load_dados Helpers de banco: engine, upsert de metadados, bulk load via COPY

Formato de Pipeline TOML

fetch.toml — Configuração de Extração

[[tabelas]]
tabela_sidra = "1620"          # código da tabela SIDRA (obrigatório)
variables = ["116"]             # códigos de variáveis; omitir para todas
territories = {6 = []}          # {nível: [códigos]}; lista vazia = todas
unnest_classifications = true   # expandir todos os cross-produtos de categorias

[[tabelas]]
tabela_sidra = "1737"
variables = ["63"]
territories = {1 = [], 3 = []}
split_variables = true          # uma requisição por variável (evita limites SIDRA)
classifications = {81 = ["allxt"]}

Campos de [[tabelas]]:

Campo Tipo Obrigatório Descrição
tabela_sidra str Código da tabela SIDRA
territories dict[str, list] Nível territorial → códigos de unidade
variables list[str] Códigos de variáveis (padrão: todas)
classifications dict[str, list] Classificação → códigos de categorias
unnest_classifications bool Expandir todas as combinações de categorias
split_variables bool Uma requisição por variável

transform.toml — Configuração de Transformação

[[table]]
name = "pib_municipal"          # nome da tabela/view alvo
schema = "analytics"            # schema alvo
strategy = "replace"            # "replace" (tabela) ou "view"
sql = "transform.sql"           # arquivo .sql pareado
description = "PIB Municipal"
primary_key = ["municipio_id", "ano"]
indexes = [
  { name = "idx_pib_ano", columns = ["ano"], unique = false }
]

Pareado com arquivo transform.sql (mesmo nome) contendo consulta SELECT.

manifest.toml — Registro de Plugin

name = "Pipelines Padrão"
description = "Pipelines SIDRA padrão Quantilica"
version = "1.0.0"

[[pipeline]]
id = "pib_municipal"
description = "PIB Municipal da tabela SIDRA 5938 IBGE"
path = "pib_municipal"

Schema Dimensional

O star schema (diagrama acima) vive no schema ibge_sidra: três dimensões — localidade (malha territorial), periodo (tempo normalizado com ano/mes/trimestre/data_inicio/data_fim) e dimensao (variável × até 6 classificações) — mais a tabela-fato enxuta dados (FKs + modificacao + ativo + v) e o registro tabela_sidra (metadados em JSONB). As colunas e constraints completas de cada tabela estão nos modelos SQLAlchemy do repositório.

Exemplo de Consulta Analítica

from sqlalchemy import select
from sidra_sql.models import Dados, Localidade, Periodo, Dimensao

# PIB por estado (apenas registros ativos)
query = (
    select(
        Localidade.d1n.label("estado"),
        Periodo.codigo.label("periodo"),
        Dimensao.d2n.label("variavel"),
        Dados.v.label("valor"),
    )
    .join(Localidade, Dados.localidade_id == Localidade.id)
    .join(Periodo,    Dados.periodo_id    == Periodo.id)
    .join(Dimensao,   Dados.dimensao_id   == Dimensao.id)
    .where(
        Dados.ativo == True,
        Localidade.nc == "N3",          # estados
        Dados.tabela_sidra_id == "1620",
    )
    .order_by(Periodo.codigo.desc())
)

with engine.connect() as conn:
    for row in conn.execute(query):
        print(f"{row.estado} | {row.periodo} | {row.valor}")

Performance

Cache Local

Dados baixados são armazenados como arquivos JSON sob data_dir. Re-executar um pipeline pula arquivos que já existem em disco:

Primeira execução:  baixa do IBGE (segundos a minutos)
Re-execução:        verifica cache local, pula arquivos existentes (overhead <1s)

Force re-download deletando diretório de dados ou usando --force-metadata.

Benchmarks de Ingestão Streaming

Performance no mundo real em hardware padrão (8-core, 16 GB RAM):

Dataset Linhas Tempo Throughput
IPCA mensal 3.2M 8s 400k linhas/sec
PIB trimestral 50k <1s

Melhores Práticas para Governança de Dados

1. Use Pipelines Declarativos (TOML) para Reprodutibilidade

# pipelines/annual_snapshot/fetch.toml
[[tabelas]]
tabela_sidra = "1620"
variables = ["116"]
territories = {6 = [], 3 = []}

[[tabelas]]
tabela_sidra = "1737"
variables = ["63"]
territories = {1 = []}

Vantagens:

  • ✅ Não-desenvolvedores podem manter pipelines
  • ✅ Controle de versão (TOML em git)
  • ✅ Reprodutível entre máquinas
  • ✅ Desacoplado de mudanças de código

2. Documente Datas de Snapshot para Reprodutibilidade Acadêmica

import json
from datetime import datetime

# Registrar quando você construiu o dataset
metadata = {
    "snapshot_date": datetime.now().isoformat(),
    "pipeline": "pipelines/analysis/fetch.toml",
    "sidra_sql_version": "1.2.0",
}
with open("data/metadata.json", "w") as f:
    json.dump(metadata, f, indent=2)

Então consulte com Dados.modificacao <= snapshot_date para reproduzir o dataset exato.

3. Use Star Schema para Ferramentas BI

Conecte PostgreSQL diretamente a:

  • Tableau (conexão ODBC para PostgreSQL)
  • Power BI (conector nativo PostgreSQL)
  • Looker (SQL runner)
  • Metabase (consultas SQL em warehouse)

Schema normalizado é otimizado para workloads OLAP de ferramentas BI.


Resolução de Problemas

Download Falha com Erro de Rede

Fetcher retenta automaticamente (até 5 vezes, backoff exponencial: 5s → 10s → 20s…). Se todas as tentativas falharem, verifique disponibilidade da API SIDRA ou reduza max_workers.

Tabela Não Encontrada

Códigos de tabela SIDRA devem ser strings, não inteiros. Use "1620", não 1620.

metadata = fetcher.fetch_metadata("1620")   # ✅
metadata = fetcher.fetch_metadata(1620)     # ❌

ID de Variável Desconhecido

Procure no catálogo SIDRA IBGE diretamente em sidra.ibge.gov.br para procurar códigos de variáveis e classificações para uma tabela dada.

Arquivo de Configuração Não Encontrado

Config()config.ini do diretório de trabalho atual. Execute Python da raiz do projeto, ou defina o caminho explicitamente:

import os
os.chdir("/path/to/project")
config = Config()

Schema Já Existe

TransformRunner com strategy = "replace" descarta e recria a tabela alvo. strategy = "view" usa CREATE OR REPLACE VIEW e é não-destrutivo.


Saiba Mais