In Vol 3, we ran four analyses against the aggregated table — trends, channel mix, funnel conversion rates, and week-over-week comparisons. The next step is to stop looking for problems manually and let the pipeline find them for you. This article covers how to write SQL that detects anomalies, and how to route those findings to Slack so the right person gets notified automatically.
A dashboard is a tool for exploration. An alert is a tool for catching problems. Both are useful, but they serve different jobs.
When everything is running normally, a dashboard is a distraction — you're scanning rows of green numbers to confirm that nothing is broken. The moment something is broken, you want to know immediately, not when you happen to open the dashboard later that morning. Automated alerts invert this: you only get notified when action is required.
The stack we're building on top of:
campaign_performance table from Vol 2Before writing any code, decide which conditions are actually worth an alert. A good alert has two properties: it's specific enough to point to a cause, and it's rare enough that people don't start ignoring it.
For a marketing reporting pipeline, these three conditions cover most real problems:
| Condition | What it signals |
|---|---|
| CAC increases >30% week-over-week on any channel | Audience saturation, creative fatigue, or a bidding issue |
| ROAS drops below a hard floor (e.g., 0.8) | You're spending more than you're recovering — requires immediate attention |
| Yesterday's signups are zero (or null) for an active channel | Tracking broke, API pull failed, or spend was paused accidentally |
Start with these three. You can always add more conditions later, but alert fatigue — too many notifications — is a real risk. If the team learns to dismiss alerts without reading them, you've lost the value of the system.
The SQL below uses the same week-over-week pattern from Vol 3, but instead of returning all channels it filters to only the rows that cross a threshold. The result is a table of problems, not a table of data.
WITH weekly AS (
SELECT
utm_source,
DATE_TRUNC('week', signup_date) AS week,
SUM(spend) AS spend,
SUM(user_signups) AS signups,
SUM(revenue) AS revenue
FROM campaign_performance
WHERE signup_date >= CURRENT_DATE - INTERVAL '14 days'
GROUP BY 1, 2
),
compared AS (
SELECT
utm_source,
MAX(CASE WHEN week = DATE_TRUNC('week', CURRENT_DATE - INTERVAL '7 days')
THEN spend END) AS spend_prior,
MAX(CASE WHEN week = DATE_TRUNC('week', CURRENT_DATE)
THEN spend END) AS spend_current,
MAX(CASE WHEN week = DATE_TRUNC('week', CURRENT_DATE - INTERVAL '7 days')
THEN signups END) AS signups_prior,
MAX(CASE WHEN week = DATE_TRUNC('week', CURRENT_DATE)
THEN signups END) AS signups_current,
MAX(CASE WHEN week = DATE_TRUNC('week', CURRENT_DATE)
THEN revenue END) AS revenue_current
FROM weekly
GROUP BY 1
)
SELECT
utm_source,
ROUND(spend_prior / NULLIF(signups_prior, 0), 2) AS cac_prior,
ROUND(spend_current / NULLIF(signups_current, 0), 2) AS cac_current,
ROUND(revenue_current / NULLIF(spend_current, 0), 2) AS roas_current,
COALESCE(signups_current, 0) AS signups_current,
CASE
WHEN signups_current = 0 OR signups_current IS NULL
THEN 'ZERO_SIGNUPS'
WHEN (spend_current / NULLIF(signups_current, 0))
> (spend_prior / NULLIF(signups_prior, 0)) * 1.30
THEN 'CAC_SPIKE'
WHEN (revenue_current / NULLIF(spend_current, 0)) < 0.8
THEN 'ROAS_FLOOR'
END AS alert_type
FROM compared
WHERE
signups_current = 0 OR signups_current IS NULL
OR (spend_current / NULLIF(signups_current, 0))
> (spend_prior / NULLIF(signups_prior, 0)) * 1.30
OR (revenue_current / NULLIF(spend_current, 0)) < 0.8
ORDER BY alert_type;
If nothing is wrong, this query returns zero rows. That's the intended behavior — the Python script checks the row count and only fires an alert if there's something in the result.
The script below connects to Snowflake, runs the query, and sends a formatted Slack message if any alerts are found. If the result is empty, it exits silently.
import snowflake.connector
import requests
import json
import os
SNOWFLAKE_CONFIG = {
"user": os.environ["SNOWFLAKE_USER"],
"password": os.environ["SNOWFLAKE_PASSWORD"],
"account": os.environ["SNOWFLAKE_ACCOUNT"],
"warehouse": "REPORTING_WH",
"database": "MARKETING",
"schema": "PUBLIC",
}
SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"]
ALERT_QUERY = """
-- (paste the full SQL from Step 2 here)
"""
def run_alert_check():
conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
cursor = conn.cursor()
cursor.execute(ALERT_QUERY)
rows = cursor.fetchall()
columns = [desc[0].lower() for desc in cursor.description]
cursor.close()
conn.close()
return [dict(zip(columns, row)) for row in rows]
def format_slack_message(alerts):
lines = [":rotating_light: *Marketing Alert — Action Required*\n"]
for a in alerts:
source = a["utm_source"].upper()
alert_type = a["alert_type"]
if alert_type == "ZERO_SIGNUPS":
lines.append(
f"• *{source}* — No signups recorded this week. "
"Check tracking and campaign status."
)
elif alert_type == "CAC_SPIKE":
lines.append(
f"• *{source}* — CAC jumped from "
f"${a['cac_prior']:.2f} to ${a['cac_current']:.2f} "
"(>30% increase week-over-week)."
)
elif alert_type == "ROAS_FLOOR":
lines.append(
f"• *{source}* — ROAS is {a['roas_current']:.2f}, "
"below the 0.8 floor. Review spend allocation."
)
return "\n".join(lines)
def send_slack_alert(message):
payload = {"text": message}
response = requests.post(
SLACK_WEBHOOK_URL,
data=json.dumps(payload),
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
if __name__ == "__main__":
alerts = run_alert_check()
if alerts:
message = format_slack_message(alerts)
send_slack_alert(message)
print(f"Alert sent: {len(alerts)} issue(s) found.")
else:
print("No alerts. All metrics within thresholds.")
A few things worth noting about the script:
response.raise_for_status() means Airflow will mark the task as failed if the Slack webhook call fails — so a silent delivery failure won't go unnoticed.The alert check runs after the main ETL task each day. In Airflow, this means adding a new task to the DAG that depends on the transformation task completing successfully.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from alert_check import run_alert_check, format_slack_message, send_slack_alert
def alert_task():
alerts = run_alert_check()
if alerts:
send_slack_alert(format_slack_message(alerts))
with DAG(
dag_id="marketing_reporting",
schedule_interval="0 6 * * *", # 6 AM daily
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
run_etl = PythonOperator(
task_id="run_etl",
python_callable=run_marketing_etl, # your existing ETL function
)
check_alerts = PythonOperator(
task_id="check_alerts",
python_callable=alert_task,
)
run_etl >> check_alerts # alerts only run after ETL succeeds
The >> operator sets the dependency: check_alerts only runs if run_etl succeeds. This prevents false "zero signups" alerts from firing when the ETL itself failed to load data — a common source of noise in simpler setups.
When a threshold is crossed, the team sees a message like this in the designated Slack channel:
:rotating_light: Marketing Alert — Action Required
• META — CAC jumped from $8.40 to $12.70 (>30% increase week-over-week).
• TIKTOK — ROAS is 0.72, below the 0.8 floor. Review spend allocation.
Each line is specific enough to act on immediately — you know which channel, what the metric is, and what direction it moved. The team doesn't need to open a dashboard to understand the problem.
Alerts decay in value if you don't maintain them. A few practices that help:
The full pipeline is now end-to-end automated. Data is pulled from publisher APIs daily, transformed and loaded into Snowflake, aggregated into a single reporting table, and monitored for anomalies — with a Slack notification if anything crosses a threshold. From here, the reporting system runs itself. Your job shifts from building and checking reports to responding when something actually needs attention.