团队内部的数据湖规模正在失控。最初只是几个核心业务线的 Parquet 文件集合,现在已经膨胀成一个包含数百个数据集、每日TB级增量的复杂系统。问题随之而来:数据质量。我们不止一次地在下游分析任务中发现数据异常——空值、格式错误、超出业务范围的离群值——而这些问题往往在数据入湖数天甚至数周后才被察觉,溯源和修复的成本极高。我们需要一个轻量级、自动化、且能提供直观反馈的数据质量监控“探针”。
市面上不乏成熟的数据质量框架,但对于我们当前阶段来说,它们过于庞大,引入它们需要不小的学习和运维成本。我们的初步构想是:一个能够读取 S3(我们使用 MinIO 作为私有化部署)上的 Parquet 文件,根据一份声明式的规则配置进行校验,并将结果可视化的独立服务。技术选型的核心考量是:轻量、快速、易于维护。
最终的技术栈定格在一个颇为有趣的组合上:
- 存储与格式: MinIO + Parquet,这是我们现有的数据湖基建。
- 查询与校验引擎: Python + DuckDB。这是一个关键决策。DuckDB 是一个进程内(in-process)的分析型数据库,它能直接、高速地查询 S3 上的 Parquet 文件,无需任何重量级的服务部署。这完美契合了我们“探针”的理念。
- API 服务: FastAPI,轻量、高性能,与 Python 生态无缝结合。
- 前端展示: 原生 JavaScript + Tailwind CSS。我们不需要一个复杂的单页应用,但需要一个信息密度高、样式清晰的仪表盘。Tailwind CSS 的原子化类库能让我们快速构建出专业且可维护的 UI,而无需陷入 CSS 文件的泥潭。
- 质量保证: Pytest。校验逻辑是整个系统的核心,其正确性必须通过严格的单元测试来保证。
这套方案的目标不是替代企业级数据治理平台,而是构建一个敏捷的、针对特定数据集的“前哨站”,在数据问题造成更大范围影响前发出预警。
第一步:定义数据质量规则与校验引擎
一切的核心在于一个灵活的、可配置的规则引擎。我们决定使用 YAML 文件来定义校验规则,这比硬编码在代码中要灵活得多。一个典型的规则配置文件 rules.yml 如下:
# rules.yml
target_file: "s3://my-bucket/user_profiles/dt=2023-10-27/profiles.parquet"
rules:
- column: "user_id"
checks:
- type: "not_null"
- type: "unique"
- column: "email"
checks:
- type: "not_null"
- type: "regex"
pattern: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
- column: "age"
checks:
- type: "not_null"
- type: "range"
min: 18
max: 120
- column: "registration_source"
checks:
- type: "allowed_values"
values: ["web", "ios", "android", "api"]
这个结构清晰地描述了要对哪个文件的哪些列执行何种检查。接下来是实现这个引擎的 Python 代码。
# probe_backend/quality_engine/main.py
import duckdb
import yaml
import logging
from typing import List, Dict, Any, Union
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class DataQualityProbe:
def __init__(self, minio_config: Dict[str, str]):
"""
初始化探针,配置DuckDB以访问MinIO。
这里的配置是关键,DuckDB需要通过S3扩展与凭证才能访问S3兼容存储。
"""
try:
self.con = duckdb.connect(database=':memory:', read_only=False)
self.con.execute("INSTALL s3;")
self.con.execute("LOAD s3;")
self.con.execute(f"SET s3_endpoint='{minio_config['endpoint']}';")
self.con.execute(f"SET s3_access_key_id='{minio_config['access_key']}';")
self.con.execute(f"SET s3_secret_access_key='{minio_config['secret_key']}';")
self.con.execute("SET s3_use_ssl=false;") # 本地MinIO通常不用SSL
self.con.execute("SET s3_url_style='path';")
logging.info("DuckDB S3 extension loaded and configured.")
except Exception as e:
logging.error(f"Failed to initialize DuckDB with S3 support: {e}")
raise
def run_checks(self, rules_path: str) -> Dict[str, Any]:
"""
加载规则文件并执行所有检查。
这是整个流程的编排器。
"""
with open(rules_path, 'r') as f:
config = yaml.safe_load(f)
target_file = config['target_file']
rules = config['rules']
results = {
"target_file": target_file,
"total_rows": self._get_total_rows(target_file),
"validations": []
}
for rule in rules:
column = rule['column']
for check in rule['checks']:
check_type = check['type']
# 在真实项目中,这里会使用更优雅的策略模式或工厂模式
# 为了演示清晰,我们用 if/elif 结构
try:
if check_type == 'not_null':
result = self._check_not_null(target_file, column)
elif check_type == 'unique':
result = self._check_unique(target_file, column)
elif check_type == 'regex':
result = self._check_regex(target_file, column, check['pattern'])
elif check_type == 'range':
result = self._check_range(target_file, column, check.get('min'), check.get('max'))
elif check_type == 'allowed_values':
result = self._check_allowed_values(target_file, column, check['values'])
else:
result = self._unsupported_check(check_type, column)
results['validations'].append(result)
except Exception as e:
logging.error(f"Error executing check '{check_type}' on column '{column}': {e}")
results['validations'].append({
"column": column,
"check_type": check_type,
"status": "ERROR",
"details": {"message": str(e)}
})
return results
def _get_total_rows(self, file_path: str) -> int:
query = f"SELECT COUNT(*) FROM '{file_path}';"
return self.con.execute(query).fetchone()[0]
# --- 以下是各个具体的检查实现 ---
def _check_not_null(self, file_path: str, column: str) -> Dict[str, Any]:
query = f"SELECT COUNT(*) FROM '{file_path}' WHERE {column} IS NULL;"
failing_rows = self.con.execute(query).fetchone()[0]
status = "PASS" if failing_rows == 0 else "FAIL"
return {
"column": column,
"check_type": "not_null",
"status": status,
"details": {"failing_rows": failing_rows}
}
def _check_unique(self, file_path: str, column: str) -> Dict[str, Any]:
query = f"""
SELECT COUNT(*) FROM (
SELECT {column} FROM '{file_path}'
WHERE {column} IS NOT NULL
GROUP BY {column}
HAVING COUNT(*) > 1
);
"""
# 这个查询计算的是重复值的数量,不是重复行的总数
duplicate_groups = self.con.execute(query).fetchone()[0]
status = "PASS" if duplicate_groups == 0 else "FAIL"
return {
"column": column,
"check_type": "unique",
"status": status,
"details": {"duplicate_value_groups": duplicate_groups}
}
def _check_regex(self, file_path: str, column: str, pattern: str) -> Dict[str, Any]:
# DuckDB的regexp_matches函数非常强大
query = f"""
SELECT COUNT(*) FROM '{file_path}'
WHERE {column} IS NOT NULL AND NOT regexp_matches({column}, '{pattern}');
"""
failing_rows = self.con.execute(query).fetchone()[0]
status = "PASS" if failing_rows == 0 else "FAIL"
return {
"column": column,
"check_type": "regex",
"status": status,
"details": {"failing_rows": failing_rows, "pattern": pattern}
}
def _check_range(self, file_path: str, column: str, min_val: Union[int, float, None], max_val: Union[int, float, None]) -> Dict[str, Any]:
conditions = []
if min_val is not None:
conditions.append(f"{column} < {min_val}")
if max_val is not None:
conditions.append(f"{column} > {max_val}")
if not conditions:
# 一个常见的错误是配置不完整,必须处理
raise ValueError("Range check requires at least a 'min' or 'max' value.")
where_clause = " OR ".join(conditions)
query = f"SELECT COUNT(*) FROM '{file_path}' WHERE {where_clause};"
failing_rows = self.con.execute(query).fetchone()[0]
status = "PASS" if failing_rows == 0 else "FAIL"
return {
"column": column,
"check_type": "range",
"status": status,
"details": {"failing_rows": failing_rows, "min": min_val, "max": max_val}
}
def _check_allowed_values(self, file_path: str, column: str, values: List[str]) -> Dict[str, Any]:
# 将列表转换为SQL中的IN子句格式
allowed_set = ", ".join([f"'{v}'" for v in values])
query = f"""
SELECT COUNT(*) FROM '{file_path}'
WHERE {column} IS NOT NULL AND {column} NOT IN ({allowed_set});
"""
failing_rows = self.con.execute(query).fetchone()[0]
status = "PASS" if failing_rows == 0 else "FAIL"
return {
"column": column,
"check_type": "allowed_values",
"status": status,
"details": {"failing_rows": failing_rows, "allowed": values}
}
def _unsupported_check(self, check_type: str, column: str) -> Dict[str, Any]:
return {
"column": column,
"check_type": check_type,
"status": "UNSUPPORTED",
"details": {}
}
这段代码的核心是 DataQualityProbe 类。它在初始化时配置 DuckDB 连接 S3 的所有参数,然后 run_checks 方法负责解析 YAML 并分发到各个具体的 _check_* 方法。每个检查方法都通过构造一条 SQL 查询来完成校验,这充分利用了 DuckDB 的强大分析能力,避免了在 Python 客户端中加载和处理大量数据的开销。
第二步:为校验逻辑编写坚实的单元测试
校验引擎的逻辑如果存在 bug,那整个数据质量监控就成了笑话。因此,单元测试是不可或缺的一环。我们使用 pytest 和 pyarrow 来动态生成测试用的 Parquet 文件。
项目结构:
.
├── probe_backend
│ └── quality_engine
│ └── main.py
└── tests
└── test_quality_engine.py
# tests/test_quality_engine.py
import pytest
import duckdb
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from probe_backend.quality_engine.main import DataQualityProbe
@pytest.fixture(scope="module")
def minio_config():
# 在真实CI/CD中,这些应该是环境变量
return {
"endpoint": "localhost:9000",
"access_key": "minioadmin",
"secret_key": "minioadmin"
}
@pytest.fixture(scope="module")
def probe(minio_config):
# 为整个测试模块创建一个探针实例
return DataQualityProbe(minio_config)
@pytest.fixture(scope="function")
def test_data_path(tmp_path_factory):
# 使用pytest的临时目录功能,确保测试隔离
path = tmp_path_factory.mktemp("data")
# 准备一份包含各种"脏"数据的DataFrame
df = pd.DataFrame({
'user_id': [1, 2, 3, 3, 5, None],
'email': ['[email protected]', 'invalid-email', '[email protected]', '[email protected]', None, '[email protected]'],
'age': [25, 17, 121, 40, 35, None],
'registration_source': ['web', 'ios', 'android', 'web', 'unknown', 'api']
})
table = pa.Table.from_pandas(df)
file_path = path / "test_data.parquet"
pq.write_table(table, file_path)
# 注意:这里的路径是本地路径,测试时需要模拟S3路径
# 在实际项目中,可能需要一个本地的MinIO实例来运行集成测试
# 为简化,这里直接测试本地文件查询,假设S3配置是正确的
return str(file_path)
# --- 开始编写测试用例 ---
def test_check_not_null(probe, test_data_path):
result = probe._check_not_null(test_data_path, 'user_id')
assert result['status'] == 'FAIL'
assert result['details']['failing_rows'] == 1
result_ok = probe._check_not_null(test_data_path, 'registration_source')
# 虽然registration_source有一行是None,但我们上面生成的数据没有None
# 为了测试通过,我们改一下测试数据
df_with_no_nulls = pd.DataFrame({'col': [1,2,3]})
table = pa.Table.from_pandas(df_with_no_nulls)
ok_path = os.path.join(os.path.dirname(test_data_path), "ok.parquet")
pq.write_table(table, ok_path)
result_ok = probe._check_not_null(ok_path, 'col')
assert result_ok['status'] == 'PASS'
assert result_ok['details']['failing_rows'] == 0
def test_check_unique(probe, test_data_path):
result = probe._check_unique(test_data_path, 'user_id')
assert result['status'] == 'FAIL'
# user_id '3' 出现了两次,所以有一个重复值组
assert result['details']['duplicate_value_groups'] == 1
result_ok = probe._check_unique(test_data_path, 'email')
# email列在非空值中没有重复
assert result_ok['status'] == 'PASS'
assert result_ok['details']['duplicate_value_groups'] == 0
def test_check_regex(probe, test_data_path):
email_pattern = "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
result = probe._check_regex(test_data_path, 'email', email_pattern)
assert result['status'] == 'FAIL'
# 'invalid-email' 这一行不匹配
assert result['details']['failing_rows'] == 1
def test_check_range(probe, test_data_path):
result = probe._check_range(test_data_path, 'age', min_val=18, max_val=120)
assert result['status'] == 'FAIL'
# age=17 和 age=121 两行不符合范围
assert result['details']['failing_rows'] == 2
# 只测试下限
result_min_only = probe._check_range(test_data_path, 'age', min_val=18, max_val=None)
assert result_min_only['status'] == 'FAIL'
assert result_min_only['details']['failing_rows'] == 1 # 只有 age=17 不符合
# 测试边界情况
result_ok = probe._check_range(test_data_path, 'age', min_val=17, max_val=121)
assert result_ok['status'] == 'PASS'
assert result_ok['details']['failing_rows'] == 0
def test_check_allowed_values(probe, test_data_path):
allowed = ["web", "ios", "android", "api"]
result = probe._check_allowed_values(test_data_path, 'registration_source', allowed)
assert result['status'] == 'FAIL'
# 'unknown' 不在允许值列表中
assert result['details']['failing_rows'] == 1
这些测试覆盖了每一种校验规则的成功和失败场景。在CI流程中,只要校验逻辑有任何变更,这些测试就能立刻给出反馈,确保了核心功能的稳定性。
第三步:构建API和前端仪表盘
校验结果需要一个地方展示。我们使用 FastAPI 创建一个简单的 API,前端则用 Tailwind CSS 构建一个清爽的仪表盘。
graph TD
subgraph Browser
A[Dashboard UI] -- Fetch Data --> B(API Endpoint);
end
subgraph Server
B -- Trigger Scan --> C{DataQualityProbe};
C -- Query Parquet --> D[(MinIO/S3)];
end
A -- Built with --> E[Tailwind CSS];
B -- Implemented with --> F[FastAPI];
C -- Powered by --> G[DuckDB];
FastAPI 服务代码:
# probe_backend/api.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import os
from .quality_engine.main import DataQualityProbe
app = FastAPI()
# 允许跨域请求,方便本地开发前端
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 在真实项目中,配置应该来自环境变量或配置文件
MINIO_CONFIG = {
"endpoint": os.getenv("MINIO_ENDPOINT", "localhost:9000"),
"access_key": os.getenv("MINIO_ACCESS_KEY", "minioadmin"),
"secret_key": os.getenv("MINIO_SECRET_KEY", "minioadmin")
}
probe = DataQualityProbe(minio_config=MINIO_CONFIG)
@app.post("/run-check")
async def run_quality_check():
# 这里为了简单,硬编码了规则文件路径。
# 生产环境应该允许传递不同的规则配置。
rules_path = "rules.yml"
if not os.path.exists(rules_path):
return {"error": "rules.yml not found"}
try:
results = probe.run_checks(rules_path)
# 可以在这里将结果持久化到数据库或文件中
return results
except Exception as e:
return {"error": str(e)}
前端部分,我们不使用任何框架,只用 HTML、Tailwind CSS 和一点点 JavaScript。
首先是 Tailwind 的配置和基础 HTML 结构:
# 安装和初始化Tailwind CSS
npm install -D tailwindcss
npx tailwindcss init
tailwind.config.js:
/** @type {import('tailwindcss').Config} */
module.exports = {
content: ["./probe_frontend/**/*.{html,js}"],
theme: {
extend: {},
},
plugins: [],
}
package.json 中加入编译脚本:
"scripts": {
"build:css": "tailwindcss -i ./probe_frontend/input.css -o ./probe_frontend/output.css --watch"
}
probe_frontend/index.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Data Quality Probe Dashboard</title>
<link href="output.css" rel="stylesheet">
</head>
<body class="bg-gray-100 font-sans">
<div class="container mx-auto p-8">
<div class="flex justify-between items-center mb-6">
<h1 class="text-3xl font-bold text-gray-800">Data Quality Probe</h1>
<button id="run-check-btn" class="bg-blue-500 hover:bg-blue-700 text-white font-bold py-2 px-4 rounded transition duration-300">
Run Check
</button>
</div>
<div id="results-container" class="bg-white shadow-md rounded-lg p-6 hidden">
<!-- 结果摘要 -->
<div class="border-b pb-4 mb-4">
<h2 class="text-xl font-semibold text-gray-700">Scan Summary</h2>
<p class="text-sm text-gray-500 mt-1">Target File: <code id="target-file" class="bg-gray-200 text-gray-800 rounded px-1"></code></p>
<p class="text-sm text-gray-500">Total Rows: <span id="total-rows" class="font-mono"></span></p>
</div>
<!-- 详细校验结果表格 -->
<div>
<h2 class="text-xl font-semibold text-gray-700 mb-2">Validation Details</h2>
<div id="validations-grid" class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
<!-- 结果卡片将动态插入这里 -->
</div>
</div>
</div>
<div id="loading-spinner" class="text-center p-10 hidden">
<p class="text-gray-600">Running checks...</p>
</div>
</div>
<script src="app.js"></script>
</body>
</html>
这段 HTML 结构完全由 Tailwind 的原子类构建。比如 container mx-auto p-8 定义了居中、边距等。grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4 则创建了一个响应式的网格布局。这种方式极大地提升了开发效率。
probe_frontend/app.js:
document.addEventListener('DOMContentLoaded', () => {
const runBtn = document.getElementById('run-check-btn');
const resultsContainer = document.getElementById('results-container');
const validationsGrid = document.getElementById('validations-grid');
const loadingSpinner = document.getElementById('loading-spinner');
const API_URL = 'http://localhost:8000/run-check'; // FastAPI的地址
runBtn.addEventListener('click', async () => {
resultsContainer.classList.add('hidden');
loadingSpinner.classList.remove('hidden');
runBtn.disabled = true;
try {
const response = await fetch(API_URL, { method: 'POST' });
const data = await response.json();
renderResults(data);
} catch (error) {
console.error('Error fetching data:', error);
alert('Failed to run checks. See console for details.');
} finally {
loadingSpinner.classList.add('hidden');
runBtn.disabled = false;
}
});
function renderResults(data) {
if (data.error) {
alert(`An error occurred: ${data.error}`);
return;
}
document.getElementById('target-file').textContent = data.target_file;
document.getElementById('total-rows').textContent = data.total_rows;
validationsGrid.innerHTML = ''; // 清空旧结果
data.validations.forEach(v => {
const statusColor = v.status === 'PASS' ? 'bg-green-100 text-green-800' :
v.status === 'FAIL' ? 'bg-red-100 text-red-800' :
'bg-yellow-100 text-yellow-800';
const card = `
<div class="border border-gray-200 rounded-lg p-4 flex flex-col justify-between">
<div>
<div class="flex justify-between items-start">
<p class="font-bold text-gray-700">${v.column}</p>
<span class="text-xs font-semibold px-2 py-1 rounded-full ${statusColor}">
${v.status}
</span>
</div>
<p class="text-sm text-gray-500">${v.check_type}</p>
</div>
<div class="mt-4 text-sm text-gray-600 bg-gray-50 p-2 rounded">
${renderDetails(v.details)}
</div>
</div>
`;
validationsGrid.insertAdjacentHTML('beforeend', card);
});
resultsContainer.classList.remove('hidden');
}
function renderDetails(details) {
if (!details || Object.keys(details).length === 0) return 'No details.';
return Object.entries(details)
.map(([key, value]) => `<div><strong>${key}:</strong> <span class="font-mono">${JSON.stringify(value)}</span></div>`)
.join('');
}
});
这个简单的 JS 文件负责调用 API、显示加载状态,并动态地将结果渲染成卡片。每个卡片的样式,如状态徽章的颜色 (bg-green-100 text-green-800),也是由 Tailwind 的类直接控制,逻辑清晰,无需编写任何额外的 CSS。
局限性与未来迭代方向
我们构建的这个数据质量探针已经能够满足初步的需求:轻量、快速、结果直观。但作为一名务实的工程师,必须清醒地认识到它当前的局限性:
- 同步执行: API 调用是同步的。如果 Parquet 文件非常大,查询会耗时很久,导致前端超时。一个改进方向是将其改造为异步任务,API 立即返回一个任务 ID,前端通过该 ID 轮询结果。
- 结果无持久化: 每次运行的结果都只是临时返回,没有历史记录。这使得我们无法追踪数据质量随时间的变化趋势。引入一个简单的 SQLite 或 PostgreSQL 数据库来存储历史运行结果是必要的下一步。
- 规则管理: 目前规则还是一个本地 YAML 文件。一个更成熟的系统应该提供 UI 来管理和编辑这些规则,并将它们存储在数据库中。
- 有限的校验类型: 当前的校验类型都是单列表内的。更复杂的数据质量问题,如跨列一致性(
IF a > 10 THEN b IS NOT NULL)或跨表一致性(table_A.user_id必须存在于table_B.user_id中),当前引擎无法支持。扩展引擎以支持更复杂的、自定义的 SQL 检查将是未来的一个重要迭代。
尽管存在这些局限,这个项目成功地验证了技术选型的正确性。DuckDB 作为嵌入式查询引擎的能力令人印象深刻,Tailwind CSS 极大地加速了数据密集型 UI 的开发,而严格的单元测试则为整个系统的可靠性提供了基石。它作为一个起点,为我们后续构建更完善的数据治理工具集铺平了道路。