AI-Driven Insights: Improving Cloud Composer Alerts with Gemini

Yu Ishikawa
4 min readJun 25, 2024

--

Introduction

In today’s data-driven world, monitoring and alerting systems are crucial for maintaining smooth operations. Cloud Composer, a fully managed workflow orchestration service built on Apache Airflow, provides a robust platform for managing complex workflows. However, understanding why tasks fail can be challenging, especially when dealing with large volumes of raw logs. This is where the power of AI comes into play. By leveraging Gemini, an advanced model on Vertex AI, we can enhance log analysis and alerting, making it easier to quickly identify and address issues.

Current Alerting System

Currently, we use Airflow’s callback functions to send alerts to Slack whenever a task fails. These callbacks provide brief information about the failed task, including the DAG ID, task ID, and execution date. While this approach is functional, it requires us to manually sift through the raw logs on the Apache Airflow UI to diagnose the issue.

Challenges

The primary challenge with this system is the lack of context in the alerts. The brief information provided is often insufficient for quickly understanding what went wrong. This is particularly problematic for complex workflows involving numerous tasks and dependencies. Additionally, identifying specific failures in dbt models and tests can be time-consuming and cumbersome.

Enhancing Alerting with Gemini

Introduction to Vertex AI and Gemini

Vertex AI is a comprehensive machine learning platform by Google Cloud that allows for the deployment and management of machine learning models at scale. Gemini is one of the advanced models available on Vertex AI, designed for powerful log analysis and natural language processing.

Benefits of Using Gemini

By integrating Gemini into our alerting system, we can automatically analyze the raw logs of failed tasks and generate detailed, context-rich error messages. This enhancement allows us to understand what went wrong more quickly and effectively, directly from our Slack notifications.

Implementation Details

Setting Up Airflow Callbacks

Airflow provides several callback mechanisms to execute custom code when tasks succeed, fail, or retry. For our use case, we’ll utilize the on_failure_callback to send detailed alerts to Slack.

Example Code

Here’s an example of setting up a DAG with failure callbacks:

import datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator

def task_failure_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")

def dag_success_alert(context):
print(f"DAG has succeeded, run_id: {context['run_id']}")

with DAG(
dag_id="example_callback",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
dagrun_timeout=datetime.timedelta(minutes=60),
catchup=False,
on_success_callback=None,
on_failure_callback=task_failure_alert,
tags=["example"],
):
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
task1 >> task2 >> task3

Fetching Raw Logs from Google Cloud Storage

Cloud Composer stores logs of Airflow tasks in Google Cloud Storage (GCS). To fetch these logs for analysis, we need to know the path pattern: logs/${dag_id}/${task_id}/${execution_date}/${try_number}.log. The GCS bucket name can be retrieved from the Cloud Composer environment variables.

Using Vertex AI for Log Analysis

To enhance our alerting system, we integrate Vertex AI to analyze the raw logs fetched from GCS. Here’s a step-by-step explanation of how we implement this feature:

  1. Set Up the Notification System: When a task fails, the on_failure_callback function is triggered. This function receives a context object containing details about the failed task.
  2. Extract Information from the Context: The context object provides essential information such as the DAG ID, task ID, execution date, and try number. This data is crucial for identifying and fetching the correct log file from GCS.
  3. DAG ID and Task ID: These are directly accessible from the context. For instance, context[‘task_instance’].dag_id gives the DAG ID, and context[‘task_instance’].task_id provides the Task ID.
  4. Execution Date: This is available as context[‘ts’], which gives a timestamp of when the task was executed. This timestamp is used to locate the specific log file in GCS.
  5. Try Number: The try number indicates which attempt of the task failed. This can be accessed using context[‘task_instance’].try_number.
  6. Fetch Logs from GCS: Using the extracted information, construct the path to the log file stored in GCS. The path format typically follows: logs/${dag_id}/${task_id}/${execution_date}/${try_number}.log.
  7. Analyze Logs with Vertex AI: With the log file path, fetch the logs and send them to Vertex AI for analysis. Vertex AI’s Gemini model processes these logs and generates a detailed analysis, identifying the root cause of the failure.
  8. Send Detailed Alerts to Slack: Incorporate the analysis from Vertex AI into the alert message. This enriched message, containing both the basic task details and the AI analysis, is sent to Slack. This way, team members can quickly understand the issue without delving into raw logs.

Benefits and Use Cases

Advantages of AI-Powered Log Analysis

Using AI to analyze logs provides several key benefits:

  • Time Efficiency: Quickly understand the root cause of failures without manually sifting through logs.
  • Accuracy: Reduce human error in diagnosing issues by relying on AI to parse and interpret log data.
  • Actionable Insights: Receive detailed, actionable insights directly in Slack, enabling faster resolution times.

Specific Use Case: Analyzing dbt Command Executions

One of the most significant advantages of this system is its ability to analyze dbt command executions. With numerous dbt models and tests running daily, identifying which ones failed can be challenging. AI-powered analysis simplifies this process, providing clear insights directly in Slack.

Conclusion

By integrating Gemini into our Cloud Composer alerting system, we significantly enhance our ability to diagnose and address task failures. This implementation not only saves time but also improves the accuracy of our analyses, allowing for quicker and more effective problem resolution. Future improvements could include expanding the use of AI to other aspects of workflow management and further refining the analysis capabilities.

--

--

Yu Ishikawa

Data Engineering / Machine Learning / MLOps / Data Governance / Privacy Engineering