Great Expectations (GE) 对笔记本环境中数据质量保证方法 (Databricks) 的总结

概述

在本文中,我将分享 Great Expectations (GE) 如何在笔记本环境 (Databricks) 中进行数据质量保证。 GE 提供了该工具的概述,然后是代码及其执行结果。

什么是远大的期望?

Great Expectations (GE) 是一个 OSS Python 库,它通过验证、记录和分析数据来帮助确保和提高数据质量。基于数据质量保证条件(GE 称之为期望),您可以针对数据源进行验证并记录验证结果。

报价来源:

定义期望如下:在下图中,我们确认passenger_count 列的最小值为1,最大值为6。定义多个此类期望并执行数据质量验证。

报价来源:

生成的文档是显示以下项目的 HTML 文件,可以作为静态站点托管。每次验证都会生成那个时间点的文件,也可以及时回溯。

报价来源:

使用 GE 的基本流程如下。文档的结构也是基于过程的。

设置 数据连接 创造期望 验证数据

下图显示了文档的结构。

报价来源:

可以验证的数据源包括:本文下面的步骤使用 Spark Dataframe 和 Pandas Dataframe。

数据库(通过 SQLAlchemy) 熊猫数据框 Spark 数据框

可以通过 SQLAlchemy 连接的数据库如下所示:

(以前的 Presto SQL)

可用于每个数据源的期望不同,以下链接很有帮助。

GE有两种使用方式:CLI方式和笔记本环境方式。本文后面的过程实现了后一种方法,以下文档很有帮助。

数据验证后,除了创建文档之外,还有类似可以设置。

有关 GE 的基本信息可以在以下链接中找到。

# 关联 概述 1 概述 2 教程 3 词汇表 4 使用注意事项 5 期望清单 6 社区相关 7 案例分析 在笔记本环境中运行远大期望 (Databricks) 1. 提前准备使用Greate Exceptions 安装远大期望
%pip install great_expectations -q

准备数据(数据框)
schema = '''
`VendorID` INT,
`tpep_pickup_datetime` TIMESTAMP,
`tpep_dropoff_datetime` TIMESTAMP,
`passenger_count` INT,
`trip_distance` DOUBLE,
`RatecodeID` INT,
`store_and_fwd_flag` STRING,
`PULocationID` INT,
`DOLocationID` INT,
`payment_type` INT,
`fare_amount` DOUBLE,
`extra` DOUBLE,
`mta_tax` DOUBLE,
`tip_amount` DOUBLE,
`tolls_amount` DOUBLE,
`improvement_surcharge` DOUBLE,
`total_amount` DOUBLE,
`congestion_surcharge` DOUBLE
'''
 
src_files = [
    "/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-01.csv.gz",
#     "/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-02.csv.gz",
]
 
tgt_df = (
    spark
    .read
    .format("csv")
    .schema(schema)
    .option("header", "true")
    .option("inferSchema", "false")
    .load(src_files)
)

2. 进行基础数据质量验证

本章参考以下文章:

树立远大的期望
import datetime
 
from ruamel import yaml
 
import great_expectations as ge
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
    DataContextConfig,
    FilesystemStoreBackendDefaults,
)
# root_directory の初期化
dbutils.fs.rm(root_directory_in_spark_api, True)
 
try:
    # ディレクトリを確認
    display(dbutils.fs.ls(root_directory_in_spark_api))
except:
    print('Directory is empty.')
# Great expectaions 利用時のエントリーポイントである Data Context を定義
# https://docs.greatexpectations.io/docs/terms/data_context/
 
# great_expectations.yml を参照せずに定義を実施
data_context_config = DataContextConfig(
    store_backend_defaults=FilesystemStoreBackendDefaults(
        root_directory=root_directory
    ),
)
context = BaseDataContext(project_config=data_context_config)
 
# 利用状況の情報共有を提供を停止
# https://docs.greatexpectations.io/docs/reference/anonymous_usage_statistics/
context.anonymous_usage_statistics.enabled = False
# ディレクトリを確認
display(dbutils.fs.ls(root_directory_in_spark_api))

连接到数据
datasource_name = "taxi_datasource"
dataconnector_name = "databricks_df"
data_asset_name = "nyctaxi_tripdata_yellow_yellow_tripdata"
tgt_deploy_env = "prod"

datasource_config = {
    # データソースを定義
    # https://docs.greatexpectations.io/docs/terms/datasource
    "name": datasource_name,
    "class_name": "Datasource",
 
    # execution_engine を定義
    # https://docs.greatexpectations.io/docs/terms/execution_engine/
    "execution_engine": {
        "module_name": "great_expectations.execution_engine",
        "class_name": "SparkDFExecutionEngine",
    },
 
    # データコネクターを定義
    # https://docs.greatexpectations.io/docs/terms/data_connector/
    "data_connectors": {
        dataconnector_name: {
            "module_name": "great_expectations.datasource.data_connector",
            "class_name": "RuntimeDataConnector",
            "batch_identifiers": [
                "some_key_maybe_pipeline_stage",
                "some_other_key_maybe_run_id",
            ],
        }
    },
}
context.add_datasource(**datasource_config)
batch_request = RuntimeBatchRequest(
    datasource_name=datasource_name,
    data_connector_name=dataconnector_name,
    data_asset_name = data_asset_name,
    batch_identifiers={
        "some_key_maybe_pipeline_stage": tgt_deploy_env,
        "some_other_key_maybe_run_id": f"my_run_name_{datetime.date.today().strftime('%Y%m%d')}",
    },
    runtime_parameters={"batch_data": tgt_df},
)

创造期望
checkpoint_config_name = "nyctaxi_tripdata_yellow_yellow_tripdata__checkpoint"
# チェックポイントを定義
checkpoint_config = {
    "name":checkpoint_config_name,
    "config_version": 1,
    "class_name": "SimpleCheckpoint",
    "expectation_suite_name": expectation_suite_name,
    "run_name_template": "%Y%m%d-%H%M%S-yctaxi_tripdata_yellow_yellow_tripdata",
}
context.add_checkpoint(**checkpoint_config)

# checkpoints にファイルが作成されたことを確認
checkpoints_file_path = f'{root_directory_in_spark_api}/checkpoints/{checkpoint_config_name}.yml'
print(dbutils.fs.head(checkpoints_file_path))
checkpoint_result = context.run_checkpoint(
    checkpoint_name=checkpoint_config_name,
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": expectation_suite_name,
        }
    ],
)

检查验证结果
# 品質チェック結果を表示
checkpoint_result["success"]
# 品質チェック結果の HTML ファイルを表示
first_validation_result_identifier = (
    checkpoint_result.list_validation_result_identifiers()[0]
)
first_run_result = checkpoint_result.run_results[first_validation_result_identifier]
 
docs_path = first_run_result['actions_results']['update_data_docs']['local_site']
 
html = dbutils.fs.head(docs_path,)
 
displayHTML(html)

3. 出现质量错误时的操作验证
# エラーとなる expectation を追加
validator.expect_column_values_to_not_be_null(
    column="congestion_surcharge",
)
 
validator.save_expectation_suite(discard_failed_expectations=False)
checkpoint_result = context.run_checkpoint(
    checkpoint_name=checkpoint_config_name,
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": expectation_suite_name,
        }
    ],
)
# 品質チェック結果のを表示
checkpoint_result["success"]
# 品質チェック結果の HTML ファイルを表示
first_validation_result_identifier = (
    checkpoint_result.list_validation_result_identifiers()[0]
)
first_run_result = checkpoint_result.run_results[first_validation_result_identifier]
 
docs_path = first_run_result['actions_results']['update_data_docs']['local_site']
 
html = dbutils.fs.head(docs_path,)
 
displayHTML(html)

4. 数据剖析

本章参考以下文章:

from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler
from great_expectations.dataset.sparkdf_dataset import  SparkDFDataset
from great_expectations.render.renderer import *
from great_expectations.render.view import DefaultJinjaPageView
basic_dataset_profiler = BasicDatasetProfiler()
from great_expectations.dataset.pandas_dataset import PandasDataset
gdf = PandasDataset(
    tgt_df
    .limit(1000)
    .toPandas()
) 

用下面的代码,能不能用spark数据框执行,还是有性能问题?

from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
gdf = SparkDFDataset(
    tgt_df
    .limit(1000)
) 

print(gdf.spark_df.count())
gdf.spark_df.display()

# データを確認
print(gdf.count())
gdf.head()

from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler
 
# データをプロファイリング
expectation_suite, validation_result = gdf.profile(BasicDatasetProfiler)
from great_expectations.render.renderer import (
    ProfilingResultsPageRenderer,
    ExpectationSuitePageRenderer,
)
from great_expectations.render.view import DefaultJinjaPageView
 
profiling_result_document_content = ProfilingResultsPageRenderer().render(validation_result)
expectation_based_on_profiling_document_content = ExpectationSuitePageRenderer().render(expectation_suite)

# HTML を生成
profiling_result_HTML = DefaultJinjaPageView().render(profiling_result_document_content) # type string or str
expectation_based_on_profiling_HTML = DefaultJinjaPageView().render(expectation_based_on_profiling_document_content)

displayHTML(profiling_result_HTML)

5. 资源清理
dbutils.fs.rm(root_directory_in_spark_api, True)

原创声明:本文系作者授权爱码网发表,未经许可,不得转载;

原文地址:https://www.likecs.com/show-308623462.html

31人参与, 0条评论 登录后显示评论回复

你需要登录后才能评论 登录/ 注册