Skip to content

Construa Pipelines de Dados com Apache Airflow: 5 Passos Simples

    Empresas enfrentam desafios ao lidar com a gestão de dados devido à sua crescente quantidade. Automatizar a coleta e transformação de dados através de Pipelines de Dados tornou-se essencial para economizar recursos e tempo.

    O Apache Airflow é uma ferramenta de código aberto popular que ajuda nesse processo, automatizando fluxos de trabalho longos e também oferecendo recursos de Pipelines de Dados.

    Este artigo apresentará o Apache Airflow e Pipelines de Dados, destacando 5 passos simples para construí-los e seus benefícios.

    O que é o Apache Airflow?

    O Apache Airflow, uma plataforma de código aberto, é usado para automatizar fluxos de trabalho e agendar tarefas.

    Empresas o usam para organizar tarefas complexas, criar pipelines de dados e realizar processos ETL.

    Ele funciona com base em DAG (Grafo Acíclico Direcionado) para criar fluxos de trabalho eficientes, conectando nós através de conectores

    Principais Características do Apache Airflow

    O Apache Airflow é popular por suas características únicas:

    1. Integração com Python para criar facilmente fluxos de trabalho.
    2. Personalização de operadores e executores.
    3. Uso de modelos Jinja para construir pipelines simples.
    4. Escalabilidade ilimitada para gerenciar muitos fluxos de trabalho.

    Passos para Construir Pipelines de Dados com Apache Airflow

    Passo 1: Instale os Arquivos Docker e a Interface do Usuário para o Apache Airflow

    Para configurar Pipelines de Dados com o Apache Airflow, você primeiro precisa instalar seus Arquivos Docker e a Interface do Usuário. Você pode facilmente obter o arquivo Docker com todas as configurações do repositório Github do Puckel.

    Instale o cliente Docker e execute o seguinte comando para iniciar o servidor Airflow:

    docker-compose -f ./docker-compose-LocalExecutor.yml up -d

    Em seguida, configure a interface do usuário do Airflow baixando-a em http://localhost:8080.

    O portal da interface do usuário do Airflow pode acionar um DAG (Grafo Acíclico Direcionado) e fornecer o status das tarefas atuais.

    Passo 2: Crie um Arquivo DAG

    Agora que você instalou os Arquivos Docker e a Interface do Usuário, pode criar facilmente um arquivo DAG para Pipelines de Dados com o Apache Airflow. Primeiro, você deve definir argumentos padrão e, em seguida, instanciar sua classe DAG usando um nome de DAG, por exemplo, “monitorar_erros”, usando o seguinte código:

    default_args = {
        "owner": "airflow",
        "depends_on_past": False,
        "start_date": datetime(2020, 8, 13),
        "email": ["airflow@airflow.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        "catchup": False,
    }
    
    dag = DAG("monitorar_erros", default_args=default_args, schedule_interval=timedelta(1))

    Para extrair os arquivos de log do servidor, o Apache Airflow permite criar tarefas concorrentes que baixam cada arquivo em paralelo, agilizando o processo. Use o comando sftp configurado com seu operador SFTP e ID de conexão SSH no portal Airflow para buscar os arquivos.

    log_list = ['securityApp.log', 'mainApp.log', 'extApp.log', 'timeApp.log', 'tokenApp.log',
                'bridgeApp.log', 'daemonApp.log', 'notificationApp.log', 'messageApp.log']
    
    dl_tasks = []
    for file in log_list:
        op = SFTPOperator(task_id=f"download_{file}",
                    ssh_conn_id="log_server",
                    local_filepath=f"{base_folder}/{file}",
                    remote_filepath=f"{remote_path}/{file}",
                    operation=SFTPOperation.GET,
                    create_intermediate_dirs=True,
                    dag=dag)
        dl_tasks.append(op)

    Depois disso, atualize a interface do Airflow para ver o novo DAG chamado ‘monitorar_erros’. Clique nele para ver o gráfico. Antes de executar o DAG, configure a conexão SSH no menu ‘Admin’ para uso pelo operador SFTP.

    /usr/local/airflow/.ssh/id_rsa

    Você deve deixar o campo de senha vazio e inserir os seguintes dados JSON no campo Extra:

    {
      "key_file": "/usr/local/airflow/.ssh/id_rsa",
      "timeout": "10",
      "compress": "false",
      "no_host_key_check": "false",
      "allow_host_key_change": "false"
    }

    Em seguida, inicie o DAG e acione-o. Isso transformará algumas tarefas em verde, indicando que estão em execução. As tarefas restantes ficarão cinzas, representando que ainda estão na fila. Dessa forma, você pode acompanhar seu fluxo de trabalho após a criação de Pipelines de Dados com o Apache Airflow.

    Passo 3: Extrair Linhas Contendo Exceções

    Neste passo de construção de Pipelines de Dados com o Apache Airflow, você deve adicionar todas as linhas que contêm a palavra “exceção” nos arquivos de log e gravá-las em um arquivo (errors.txt) que deve estar na mesma pasta.

    Além disso, o comando “grep” pode pesquisar texto específico em uma coleção de arquivos, desde que todos os arquivos estejam na mesma pasta. O comando a seguir realiza essa tarefa:

    bash_command = """
        grep -E 'Exception' --include=*.log -rnw '{{ params.base_folder }}' > {{ params.base_folder }}/errors.txt
        ls -l {{ params.base_folder }}/errors.txt && cat {{ params.base_folder }}/errors.txt
    """
    grep_exception = BashOperator(task_id="grep_exception",
                            bash_command=bash_command,
                            params={'base_folder': base_folder},
                            dag=dag)

    Atualize o DAG e acione-o novamente, a visualização do gráfico será atualizada.

    Passo 4: Extrair os Campos Necessários

    Agora você pode analisar o arquivo de log linha por linha para extrair os campos necessários. Um operador Python que trabalha com expressões regulares pode ajudá-lo nessa tarefa usando o código a seguir:

    def parse_log(logString):
        r = r".+/(?P<file>.+):(?P<line>d+):[[]] (?P<date>.+)/(?P<time>d{2}:d{2}:d{2},d{3}) ERROR ?(?:SessionId : )
    
    ?(?P<session>.+)? [(?P<app>w+)] .+ (?:Error :|service Exception) (?P<module>(?=[w.-]+ : )[w.-]+)?(?: : )?(?P<errMsg>.+)"
        group = re.match(r, logString)
        return group.groups()
    
    def parse_log_file(filepath, tablename):
        with open(filepath) as fp:
            records = []
            for line in fp:
                records.append(parse_log(line))
            save_to_database(tablename, records)
    
    parse_log = PythonOperator(task_id='parse_log',
                            python_callable=parse_log_file,
                            op_kwargs={'filepath': f'{base_folder}/errors.txt',
                                       'tablename': f'{table_name}'},
                            dag=dag)

    Os campos extraídos são salvos em um banco de dados (Postgres DB é usado neste exemplo) e você pode realizar consultas neles posteriormente.

    Agora, para usar o banco de dados Postgres, você precisa configurar a conexão do Airflow com o Postgres. Para fazer isso, modifique sua conexão padrão com o Postgres.

    Em seguida, acione o DAG novamente, verificando se todas as tarefas foram executadas com sucesso e se todos os dados dos logs foram analisados e enviados para o banco de dados.

    Agora você pode clicar em “Consulta Ad Hoc” no menu de Perfil de Dados e digitar a instrução SQL necessária.

    Agora que você sabe como criar Pipelines de Dados com o Apache Airflow, é hora de consultá-los e extrair detalhes de erro.

    Passo 5: Consulte a Tabela para Gerar Registros de Erros

    Os passos até agora foram sobre a construção de Pipelines de Dados com o Apache Airflow. Este último passo mostrará como usar o pipeline construído para detectar e monitorar erros. Você pode consultar a tabela salva para contar o número de erros e seus tipos. Além disso, você pode usar outro operador Python para consultar o banco de dados e obter 2 arquivos de relatório:

    1. Um conterá os registros de todos os erros no banco de dados.
    2. O segundo será uma tabela de estatísticas que representará todos os tipos de erros em ordem decrescente de ocorrência.

    Use o seguinte código para desenvolver os 2 relatórios mencionados:

    def gen_error_reports(statfile, logfile, tablename, **kwargs):
        # Conexão ao banco de dados
        db_hook = PostgresHook(postgres_conn_id='postgres_default', schema='airflow')
        db_conn = db_hook.get_conn()
        db_cursor = db_conn.cursor()
    
        sql = f"SELECT error, count(*) as occurrence FROM {tablename} group by error ORDER BY occurrence DESC"
        sql_output = "COPY ({0}) TO STDOUT WITH CSV HEADER".format(sql)
    
        # Configure uma variável para armazenar o caminho e nome do arquivo.
        with open(statfile, 'w') as f_output:
            db_cursor.copy_expert(sql_output, f_output)
    
    
    gen_reports = PythonOperator(task_id='gen_reports',
                            python_callable=gen_error_reports,
                            op_kwargs={'statfile': f'{base_folder}/error_stats.csv',
                                       'logfile': f'{base_folder}/error_logs.csv',
                                       'tablename': f'{table_name}'},
                            provide_context=True,
                            dag=dag)

    Em seguida, acione o DAG novamente e verifique os arquivos de relatório gerados.

    A pasta “error_logs.csv” conterá todos os registros de exceção presentes no banco de dados; “error_stats.csv” conterá os diferentes tipos de erros com suas ocorrências, conforme mostrado abaixo:

    Isso é tudo! Agora você está pronto para construir Pipelines de Dados com o Apache Airflow por conta própria.

    Benefícios de Pipelines de Dados com Apache Airflow

    Desenvolver Pipelines de Dados com o Apache Airflow traz vários benefícios, incluindo:

    1. A capacidade de construir pipelines complexos usando código Python e integrar com outras ferramentas.
    2. Agendamento flexível e processamento incremental para economizar tempo.
    3. Facilidade para reprocessear tarefas anteriores e adicionar novas.
    4. Suporte a logs para rastrear problemas durante a execução.

    Conclusão

    Agora você está pronto para começar a construir Pipelines de Dados com o Apache Airflow, uma ferramenta poderosa para automatizar fluxos de trabalho de dados. Isso ajudará sua organização a gerenciar grandes volumes de dados de forma eficaz e confiável.

    Leave a Reply

    Your email address will not be published. Required fields are marked *