将发现结果流式传输到 BigQuery 进行分析

本页面介绍如何使用适用于 BigQuery 的 Security Command Center 导出功能将新的和更新后的发现结果流式传输到 BigQuery 数据集。现有发现结果不会更新到 BigQuery,除非对其进行更新。

BigQuery 是 Google Cloud 的全代管式 PB 级经济实惠的分析数据仓库,可让您近乎实时地分析大量数据。您可以使用 BigQuery 对新发现和更新后的发现结果运行查询,过滤数据以查找所需内容,并生成自定义报告。如需详细了解 BigQuery,请参阅 BigQuery 文档

概览

启用此功能后,写入到 Security Command Center 的新发现结果将近乎实时地导出到 BigQuery 表。然后,您可以将数据集成到现有工作流中 并创建 自定义分析您可以在组织、文件夹、 和项目级别,以根据您的要求导出发现结果。

此功能是将 Security Command Center 发现结果导出到 BigQuery 的推荐方法,因为它是全代管式的,不需要执行手动操作或编写自定义代码。

数据集结构

此功能会将每个新发现结果及其后续更新添加为 findings 表中的新行,该表按 source_idfinding_idevent_time 聚簇。

发现结果更新后,此功能会创建具有相同 source_idfinding_id 值但具有不同 event_time 值的多个发现结果记录。通过此数据集结构,您可以查看每个发现结果的状态随时间的变化。

请注意,您的数据集中可能存在重复条目。如需对其进行解析,您可以使用 DISTINCT 子句,如第一个示例查询所示。

每个数据集包含一个 findings 表,其中包含以下字段:

字段 说明
source_id Security Command Center 分配给发现结果来源的唯一标识符。例如,来自 Cloud Anomaly Detection 来源的所有发现结果都具有相同的 source_id 值。

示例:1234567890
finding_id 表示发现结果的唯一标识符。该标识符在组织来源范围内是唯一的。它由字母数字构成,并且少于或等于 32 个字符。
event_time 事件的发生时间或发现结果的更新时间。例如,如果发现结果表明防火墙打开,则“event_time”会捕获检测器认为防火墙处于打开状态的时间。如果发现结果在之后得以解决,那么该时间会反映解决发现结果的时间。

示例:2019-09-26 12:48:00.985000 UTC
finding 被提取到 Security Command Center 以进行展示、通知、分析、政策测试和实施的评估数据(如安全、风险、运行状况或隐私)记录。例如,App Engine 应用中的跨站脚本攻击 (XSS) 漏洞是一个发现结果。

如需详细了解嵌套字段,请参阅 Finding 对象的 API 参考文档。
资源 与此发现结果关联的 Google Cloud 资源的相关信息。

如需详细了解嵌套字段,请参阅 Resource 对象的 API 参考文档。

费用

您需要支付与此功能相关的 BigQuery 费用。如需了解详情,请参阅 BigQuery 价格

准备工作

您必须先完成以下步骤,然后才能启用此功能。

设置权限

如需完成本指南,您必须拥有以下 Identity and Access Management (IAM) 角色:

创建 BigQuery 数据集

创建 BigQuery 数据集。如需了解详情,请参阅创建数据集

规划数据驻留

如果为数据驻留 启用 Security Command Center 时, 流式传输导出到 BigQuery(BigQueryExport 资源)是正文 并存储在 Security Command Center 位置 您选择的代码。

如需将 Security Command Center 位置中的发现结果导出到 您必须配置 BigQuery 与发现结果在同一 Security Command Center 位置。

由于 BigQuery 中使用的过滤条件 导出内容可包含受驻留控制措施约束的数据, 请务必先指定正确的位置,然后再创建它们。 Security Command Center 不会限制您创建的位置 导出的数据

BigQuery Export 仅存储在 由其创建,无法在其他位置查看或修改。

创建 BigQuery 导出作业后,您将无法更改 其位置。要更改位置,您需要删除 BigQuery Export,并在新位置重新创建它。

如需使用 API 调用检索 BigQuery Export, 您需要在 bigQueryExport。例如:

GET https://securitycenter.googleapis.com/v2/{name=organizations/123/locations/eu/bigQueryExports/my-export-01}

同样,如需检索 BigQuery Export 使用 gcloud CLI,您需要指定位置 添加到配置的完整资源名称中,或使用 --locations 标记。例如:

gcloud scc scc bqexports get myBigQueryExport organizations/123 \
    --location=locations/us

将发现结果从 Security Command Center 导出到 BigQuery

如需导出发现结果,请先启用 Security Command Center API。

启用 Security Command Center API

要启用 Security Command Center API,请执行以下操作:

  1. 转到 Google Cloud 控制台中的“API 库”页面。

    转到 API 库

  2. 选择要启用 Security Command Center API 的项目。

  3. 搜索框中,输入 Security Command Center,然后点击搜索结果中的 Security Command Center。

  4. 在随即显示的 API 页面上,点击启用

您的项目已启用 Security Command Center API。接下来,您将使用 gcloud CLI 创建到 BigQuery 的新导出配置。

在 VPC Service Controls 中授予边界访问权限

如果您使用 VPC Service Controls,并且 BigQuery 数据集属于服务边界内的项目,则必须授予对项目的访问权限,以便导出发现结果。

如需授予对项目的访问权限,请为导出发现结果的主账号和项目创建入站和出站规则。这些规则允许访问受保护的资源,并可让 BigQuery 验证用户是否具有对 BigQuery 数据集的 setIamPolicy 权限。

设置到 BigQuery 的新导出之前的准备工作

  1. 转到 Google Cloud 控制台中的 VPC Service Controls 页面。

    转到 VPC Service Controls

  2. 如有必要,请选择您的组织。

  3. 点击要更改的服务边界的名称。

    如需查找您需要修改的服务边界,您可以查看日志中是否存在显示 RESOURCES_NOT_IN_SAME_SERVICE_PERIMETER 违规行为的条目。在这些条目中,检查 servicePerimeterName 字段:accessPolicies/ACCESS_POLICY_ID/servicePerimeters/SERVICE_PERIMETER_NAME

  4. 点击修改边界

  5. 在导航菜单中,点击入站流量政策

  6. 如需为用户或服务账号配置入站流量规则,请使用以下参数:

    • API 客户端的 FROM 特性
      • 身份下拉菜单中,选择选定的身份
      • 来源下拉菜单中,选择所有来源
      • 点击选择,然后输入用于调用 Security Command Center API 的主账号。
    • Google Cloud 服务/资源的 TO 属性
      • 项目下拉菜单中,选择选定的项目
      • 点击选择,然后输入 BigQuery 数据集所属的项目。
      • 服务下拉菜单中,选择选定的服务,然后选择 BigQuery API
      • 方法下拉菜单中,选择所有操作
  7. 点击保存

  8. 在导航菜单中,点击出站流量政策

  9. 点击添加规则 (Add Rule)。

  10. 如需为用户或服务账号配置出站流量规则,请输入以下参数:

    • API 客户端的 FROM 特性
      • 身份下拉菜单中,选择选定的身份
      • 点击选择,然后输入用于调用 Security Command Center API 的主账号。
    • Google Cloud 服务/资源的 TO 属性
      • 项目下拉菜单中,选择所有项目
      • 服务下拉菜单中,选择选定的服务,然后选择 BigQuery API
      • 方法下拉菜单中,选择所有操作
  11. 点击保存

设置到 BigQuery 的新导出

在此步骤中,您将创建一个导出配置,以将发现结果导出到 BigQuery 实例。您可以在项目、文件夹或组织级层创建导出配置。例如,如果您要将发现结果从项目导出到 BigQuery 数据集,则可以在项目级层创建导出配置,以仅导出与该项目相关的发现结果。(可选)您可以指定过滤条件,以便仅导出特定发现结果。

请务必在适当的级层创建导出配置。例如,如果您在项目 B 中创建导出配置,但从项目 A 中导出发现结果,并且定义了 resource.project_display_name: project-a-id 等过滤条件,则该配置不会导出任何发现结果。

您最多可以为组织创建 500 项到 BigQuery 的导出配置。您可以将同一数据集用于多项导出配置。如果您使用同一数据集,则系统会对同一发现结果表执行所有更新。

创建第一个导出配置时,系统会自动为您创建服务账号。必须使用此服务账号才能在数据集中创建或更新发现结果表,以及将发现结果导出到表中。该账号格式为 service-org-ORGANIZATION_ID@gcp-sa-scc-notification.iam.gservicaccount.com,并且在 BigQuery 数据集级层被授予 BigQuery Data Editor (roles/bigquery.dataEditor) 角色。

gcloud

  1. 转到 Google Cloud 控制台。

    转到 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需创建新的导出配置,请运行以下命令:

    gcloud scc bqexports create BIG_QUERY_EXPORT \
      --dataset=DATASET_NAME \
      --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
      --location=LOCATION \
      [--description=DESCRIPTION] \
      [--filter=FILTER]
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为此导出配置的名称。

    • DATASET_NAME 替换为 BigQuery 数据集的名称,例如 projects/<PROJECT_ID>/datasets/<DATASET_ID>

    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于项目,该名称是项目编号或项目 ID。

    • LOCATION:如果启用了数据驻留, 指定 Security Command Center 位置 将在其中创建 BigQuery Export通过 BigQuery Export 配置存储在 位置。只有来自此位置的发现结果才会包含在导出范围内。

      如果未启用数据驻留,请指定 --location 标志 使用 Cloud Build 创建 BigQuery Export Security Command Center API v2 是唯一 此标志的有效值为 global

    • 包含简单易懂的说明的 DESCRIPTION 导出配置的名称。此变量为可选项。

    • FILTER 与定义什么 要纳入到导出范围内的发现结果。例如,如果您希望 在 XSS_SCRIPTING 类别下输入 "category=\"XSS_SCRIPTING\"。 此变量为可选项。

Java

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.CreateBigQueryExportRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import java.io.IOException;
import java.util.UUID;

public class CreateBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // filter: Expression that defines the filter to apply across create/update events of findings.
    String filter =
        "severity=\"LOW\" OR severity=\"MEDIUM\" AND "
            + "category=\"Persistence: IAM Anomalous Grant\" AND "
            + "-resource.type:\"compute\"";

    // bigQueryDatasetId: The BigQuery dataset to write findings' updates to.
    String bigQueryDatasetId = "your-bigquery-dataset-id";

    // bigQueryExportId: Unique identifier provided by the client.
    // For more info, see:
    // https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    String bigQueryExportId = "default-" + UUID.randomUUID().toString().split("-")[0];

    createBigQueryExport(parent, filter, bigQueryDatasetId, bigQueryExportId);
  }

  // Create export configuration to export findings from a project to a BigQuery dataset.
  // Optionally specify filter to export certain findings only.
  public static void createBigQueryExport(
      String parent, String filter, String bigQueryDatasetId, String bigQueryExportId)
      throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      // Create the BigQuery export configuration.
      BigQueryExport bigQueryExport =
          BigQueryExport.newBuilder()
              .setDescription(
                  "Export low and medium findings if the compute resource "
                      + "has an IAM anomalous grant")
              .setFilter(filter)
              .setDataset(String.format("%s/datasets/%s", parent, bigQueryDatasetId))
              .build();

      CreateBigQueryExportRequest bigQueryExportRequest =
          CreateBigQueryExportRequest.newBuilder()
              .setParent(parent)
              .setBigQueryExport(bigQueryExport)
              .setBigQueryExportId(bigQueryExportId)
              .build();

      // Create the export request.
      BigQueryExport response = client.createBigQueryExport(bigQueryExportRequest);

      System.out.printf("BigQuery export request created successfully: %s\n", response.getName());
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。



def create_bigquery_export(
    parent: str, export_filter: str, bigquery_dataset_id: str, bigquery_export_id: str
):
    from google.cloud import securitycenter

    """
    Create export configuration to export findings from a project to a BigQuery dataset.
    Optionally specify filter to export certain findings only.

    Args:
        parent: Use any one of the following resource paths:
             - organizations/{organization_id}
             - folders/{folder_id}
             - projects/{project_id}
        export_filter: Expression that defines the filter to apply across create/update events of findings.
        bigquery_dataset_id: The BigQuery dataset to write findings' updates to.
        bigquery_export_id: Unique identifier provided by the client.
             - example id: f"default-{str(uuid.uuid4()).split('-')[0]}"
        For more info, see:
        https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    """
    client = securitycenter.SecurityCenterClient()

    # Create the BigQuery export configuration.
    bigquery_export = securitycenter.BigQueryExport()
    bigquery_export.description = "Export low and medium findings if the compute resource has an IAM anomalous grant"
    bigquery_export.filter = export_filter
    bigquery_export.dataset = f"{parent}/datasets/{bigquery_dataset_id}"

    request = securitycenter.CreateBigQueryExportRequest()
    request.parent = parent
    request.big_query_export = bigquery_export
    request.big_query_export_id = bigquery_export_id

    # Create the export request.
    response = client.create_big_query_export(request)

    print(f"BigQuery export request created successfully: {response.name}\n")

创建导出配置后,您应该会在大约 15 分钟内在 BigQuery 数据集中看到发现结果。创建 BigQuery 表后,与过滤条件和范围匹配的所有新发现结果和更新后的发现结果都将近乎实时地显示在该表中。

如需查看您的发现结果,请参阅查看发现结果

为到 BigQuery 的新导出创建入站规则

如果您使用 VPC Service Controls,并且 BigQuery 数据集属于服务边界内的项目,则必须为到 BigQuery 的新导出创建入站规则。

  1. 重新打开设置到 BigQuery 的新导出中的服务边界。

    转到 VPC Service Controls

  2. 点击入站流量政策

  3. 点击添加规则 (Add Rule)。

  4. 如需为导出配置设置入站规则,请输入以下参数:

    • API 客户端的 FROM 特性
      • 来源下拉菜单中,选择所有来源
      • 身份下拉菜单中,选择选定的身份
      • 点击选择,然后输入 BigQuery Export 配置服务账号的名称:service-org-ORGANIZATION_ID@gcp-sa-scc-notification.iam.gserviceaccount.com
    • GCP 服务/资源的 TO 特性
      • 项目下拉菜单中,选择选定的项目
      • 点击选择,然后选择 BigQuery 数据集所属的项目。
      • 服务下拉菜单中,选择选定的服务,然后选择 BigQuery API
      • 方法下拉菜单中,选择所有操作
  5. 在导航菜单中,点击保存

选定的项目、用户和服务账号现在可以访问受保护的资源并导出发现结果。

如果您已按照本指南中的所有步骤操作,并且导出正常工作,您现在可以删除以下内容:

  • 主账号的入站规则
  • 主账号的出站规则

这些规则仅在配置导出配置时才需要。但是,为了让导出配置继续正常工作,您必须保留上面创建的入站规则,该规则允许 Security Command Center 将发现结果导出到服务边界后面的 BigQuery 数据集。

查看导出配置的详细信息

gcloud

  1. 转到 Google Cloud 控制台。

    转到 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需验证导出配置的详细信息,请运行以下命令:

    gcloud scc bqexports get BIG_QUERY_EXPORT \
      --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
      --location=LOCATION
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为此导出配置的名称。

    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于项目,该名称是项目编号或项目 ID。

    • LOCATION:如果是数据驻留,则为必需 或 BigQueryExport 资源是使用 v2 API。

      如果已启用数据驻留,请指定 用来存储导出数据的 Security Command Center 位置

      如果未启用数据驻留,请添加 /locations/LOCATION,前提是 BigQueryExport资源是使用 Security Command Center API v2,在这种情况下,是唯一有效的位置 为global

    例如,如需从组织 ID 设置为 123 的组织中获取名为 my-bq-export 的导出配置,请运行以下命令:

    gcloud scc bqexports get my-bq-export --organization=123
    

Java

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.GetBigQueryExportRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import java.io.IOException;

public class GetBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // bigQueryExportId: Unique identifier that is used to identify the export.
    String bigQueryExportId = "export-id";

    getBigQueryExport(parent, bigQueryExportId);
  }

  // Retrieve an existing BigQuery export.
  public static void getBigQueryExport(String parent, String bigQueryExportId) throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      GetBigQueryExportRequest bigQueryExportRequest =
          GetBigQueryExportRequest.newBuilder()
              .setName(String.format("%s/bigQueryExports/%s", parent, bigQueryExportId))
              .build();

      BigQueryExport response = client.getBigQueryExport(bigQueryExportRequest);
      System.out.printf("Retrieved the BigQuery export: %s", response.getName());
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。

def get_bigquery_export(parent: str, bigquery_export_id: str):
    from google.cloud import securitycenter

    """
    Retrieve an existing BigQuery export.
    Args:
        parent: Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
        bigquery_export_id: Unique identifier that is used to identify the export.
    """

    client = securitycenter.SecurityCenterClient()

    request = securitycenter.GetBigQueryExportRequest()
    request.name = f"{parent}/bigQueryExports/{bigquery_export_id}"

    response = client.get_big_query_export(request)
    print(f"Retrieved the BigQuery export: {response.name}")

更新导出配置

如有必要,您可以修改现有导出配置的过滤条件、数据集和说明。您无法更改导出配置的名称。

gcloud

  1. 转到 Google Cloud 控制台。

    转到 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需更新导出配置,请运行以下命令:

    gcloud scc bqexports update BIG_QUERY_EXPORT \
      --dataset=DATASET_NAME \
      --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
      --location=LOCATION \
      [--description=DESCRIPTION] \
      [--filter=FILTER]
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为您要更新的导出配置的名称。

    • DATASET_NAME 替换为 BigQuery 数据集的名称,例如 projects/<PROJECT_ID>/datasets/<DATASET_ID>

    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于项目,该名称是项目编号或项目 ID。

    • LOCATION:如果是数据驻留,则为必需 或 BigQueryExport 资源是使用 v2 API。

      如果已启用数据驻留,请指定 用来存储导出数据的 Security Command Center 位置

      如果未启用数据驻留,请添加 全名中包含 /locations/LOCATION,或者 仅当 BigQueryExport 时,才指定 --location 标志 是使用 Security Command Center API v2 创建的, 在这种情况下,唯一的有效位置是 global

    • 包含简单易懂的说明的 DESCRIPTION 导出配置的名称。此变量为可选项。

    • FILTER 与定义什么 要纳入到导出范围内的发现结果。例如,如果您希望 在 XSS_SCRIPTING 类别下输入 "category=\"XSS_SCRIPTING\"。 此变量为可选项。

Java

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import com.google.cloud.securitycenter.v1.UpdateBigQueryExportRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;

public class UpdateBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // filter: Expression that defines the filter to apply across create/update events of findings.
    String filter =
        "severity=\"LOW\" OR severity=\"MEDIUM\" AND "
            + "category=\"Persistence: IAM Anomalous Grant\" AND "
            + "-resource.type:\"compute\"";

    // bigQueryExportId: Unique identifier provided by the client.
    // For more info, see:
    // https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    String bigQueryExportId = "big-query-export-id";

    updateBigQueryExport(parent, filter, bigQueryExportId);
  }

  // Updates an existing BigQuery export.
  public static void updateBigQueryExport(String parent, String filter, String bigQueryExportId)
      throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      //  Set the new values for export configuration.
      BigQueryExport bigQueryExport =
          BigQueryExport.newBuilder()
              .setName(String.format("%s/bigQueryExports/%s", parent, bigQueryExportId))
              .setFilter(filter)
              .build();

      UpdateBigQueryExportRequest request =
          UpdateBigQueryExportRequest.newBuilder()
              .setBigQueryExport(bigQueryExport)
              // Set the update mask to specify which properties should be updated.
              // If empty, all mutable fields will be updated.
              // For more info on constructing field mask path, see the proto or:
              // https://cloud.google.com/java/docs/reference/protobuf/latest/com.google.protobuf.FieldMask
              .setUpdateMask(FieldMask.newBuilder().addPaths("filter").build())
              .build();

      BigQueryExport response = client.updateBigQueryExport(request);
      if (!response.getFilter().equalsIgnoreCase(filter)) {
        System.out.println("Failed to update BigQueryExport!");
        return;
      }
      System.out.println("BigQueryExport updated successfully!");
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。

def update_bigquery_export(parent: str, export_filter: str, bigquery_export_id: str):
    """
    Updates an existing BigQuery export.
    Args:
        parent: Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
        export_filter: Expression that defines the filter to apply across create/update events of findings.
        bigquery_export_id: Unique identifier provided by the client.
        For more info, see:
        https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to
    """
    from google.cloud import securitycenter
    from google.protobuf import field_mask_pb2

    client = securitycenter.SecurityCenterClient()

    # Set the new values for export configuration.
    bigquery_export = securitycenter.BigQueryExport()
    bigquery_export.name = f"{parent}/bigQueryExports/{bigquery_export_id}"
    bigquery_export.filter = export_filter

    # Field mask to only update the export filter.
    # Set the update mask to specify which properties should be updated.
    # If empty, all mutable fields will be updated.
    # For more info on constructing field mask path, see the proto or:
    # https://googleapis.dev/python/protobuf/latest/google/protobuf/field_mask_pb2.html
    field_mask = field_mask_pb2.FieldMask(paths=["filter"])

    request = securitycenter.UpdateBigQueryExportRequest()
    request.big_query_export = bigquery_export
    request.update_mask = field_mask

    response = client.update_big_query_export(request)

    if response.filter != export_filter:
        print("Failed to update BigQueryExport!")
        return
    print("BigQueryExport updated successfully!")

查看所有导出配置

您可以查看组织、文件夹或项目中的所有导出配置。

gcloud

  1. 转到 Google Cloud 控制台。

    转到 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需列出导出配置,请运行以下命令:

    gcloud scc bqexports list \
      --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
      --location=LOCATION \
      [--limit=LIMIT] \
      [--page-size=PAGE_SIZE]
    

    替换以下内容:

    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于项目,该名称是项目编号或项目 ID。

      如果您指定了组织 ID,该列表会包含该组织中定义的所有导出配置,包括在文件夹和项目级层定义的配置。如果您指定了文件夹 ID,该列表会包含在文件夹级层以及在该文件夹内的项目中定义的所有导出配置。如果您指定了项目编号或项目 ID,该列表仅包含该项目的所有导出配置。

    • LOCATION:如果是数据驻留,则为必需 或者BigQueryExport资源是使用 v2 API。

      如果已启用数据驻留,请指定 用来存储导出的 Security Command Center 位置

      如果未启用数据驻留,包括 --location 标志仅列出 BigQueryExport 资源 使用 Security Command Center API v2 创建的 唯一的有效位置是 global

    • LIMIT 替换为要查看的导出配置数量。 此变量为可选项。

    • PAGE_SIZE 替换为页面大小值。此变量为可选项。

Java

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。


import com.google.cloud.securitycenter.v1.BigQueryExport;
import com.google.cloud.securitycenter.v1.ListBigQueryExportsRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import com.google.cloud.securitycenter.v1.SecurityCenterClient.ListBigQueryExportsPagedResponse;
import java.io.IOException;

public class ListBigQueryExports {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: The parent, which owns the collection of BigQuery exports.
    //         Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    listBigQueryExports(parent);
  }

  // List BigQuery exports in the given parent.
  public static void listBigQueryExports(String parent) throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      ListBigQueryExportsRequest request =
          ListBigQueryExportsRequest.newBuilder().setParent(parent).build();

      ListBigQueryExportsPagedResponse response = client.listBigQueryExports(request);

      System.out.println("Listing BigQuery exports:");
      for (BigQueryExport bigQueryExport : response.iterateAll()) {
        System.out.println(bigQueryExport.getName());
      }
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。

def list_bigquery_exports(parent: str):
    from google.cloud import securitycenter

    """
    List BigQuery exports in the given parent.
    Args:
         parent: The parent which owns the collection of BigQuery exports.
             Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
    """

    client = securitycenter.SecurityCenterClient()

    request = securitycenter.ListBigQueryExportsRequest()
    request.parent = parent

    response = client.list_big_query_exports(request)

    print("Listing BigQuery exports:")
    for bigquery_export in response:
        print(bigquery_export.name)

删除导出配置

如果您不再需要导出配置,可以将其删除。

gcloud

  1. 转到 Google Cloud 控制台。

    转到 Google Cloud 控制台

  2. 选择启用了 Security Command Center API 的项目。

  3. 点击激活 Cloud Shell

  4. 如需删除导出配置,请运行以下命令:

    gcloud scc bqexports delete BIG_QUERY_EXPORT \
      --folder=FOLDER_ID | --organization=ORGANIZATION_ID | --project=PROJECT_ID \
      --location=LOCATION
    

    替换以下内容:

    • BIG_QUERY_EXPORT 替换为您要删除的导出配置的名称。

    • FOLDER_IDORGANIZATION_IDPROJECT_ID 替换为您的文件夹、组织或项目的名称。您必须设置其中一个选项。对于文件夹和组织,该名称是文件夹 ID 或组织 ID。对于项目,该名称是项目编号或项目 ID。

    • LOCATION:如果是数据驻留,则为必需 或者 BigQueryExport 资源是使用 v2 API。

      如果已启用数据驻留,请指定 用来存储导出数据的 Security Command Center 位置

      如果未启用数据驻留,请添加 /locations/LOCATION,前提是 BigQueryExport资源是使用 Security Command Center API v2,在这种情况下,是唯一有效的位置 为global

    例如,如需从组织 ID 设置为 123 的组织中删除名为 my-bq-export 的导出配置,请运行以下命令:

    gcloud scc bqexports delete my-bq-export --organization=123
    

Java

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。


import com.google.cloud.securitycenter.v1.DeleteBigQueryExportRequest;
import com.google.cloud.securitycenter.v1.SecurityCenterClient;
import java.io.IOException;

public class DeleteBigQueryExport {

  public static void main(String[] args) throws IOException {
    // TODO(Developer): Modify the following variable values.

    // parent: Use any one of the following resource paths:
    //              - organizations/{organization_id}
    //              - folders/{folder_id}
    //              - projects/{project_id}
    String parent = String.format("projects/%s", "your-google-cloud-project-id");

    // bigQueryExportId: Unique identifier that is used to identify the export.
    String bigQueryExportId = "export-id";

    deleteBigQueryExport(parent, bigQueryExportId);
  }

  // Delete an existing BigQuery export.
  public static void deleteBigQueryExport(String parent, String bigQueryExportId)
      throws IOException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (SecurityCenterClient client = SecurityCenterClient.create()) {

      DeleteBigQueryExportRequest bigQueryExportRequest =
          DeleteBigQueryExportRequest.newBuilder()
              .setName(String.format("%s/bigQueryExports/%s", parent, bigQueryExportId))
              .build();

      client.deleteBigQueryExport(bigQueryExportRequest);
      System.out.printf("BigQuery export request deleted successfully: %s", bigQueryExportId);
    }
  }
}

Python

如需向 Security Command Center 进行身份验证,请设置应用默认值 凭据。如需了解详情,请参阅 为本地开发环境设置身份验证

以下示例使用 v1 API。修改 v2 的示例,将 v1 替换为 v2,并添加 将 /locations/LOCATION 设置为资源名称。

对于大多数资源,请将 /locations/LOCATION 添加到 资源名称在 /PARENT/PARENT_ID 之后,其中 PARENTorganizationsfolders、 或 projects

对于发现结果,请将 /locations/LOCATION 添加到资源中 名称在 /sources/SOURCE_ID 之后,其中 SOURCE_IDSecurity Command Center 服务 发出该发现结果。

def delete_bigquery_export(parent: str, bigquery_export_id: str):
    """
    Delete an existing BigQuery export.
    Args:
        parent: Use any one of the following resource paths:
                 - organizations/{organization_id}
                 - folders/{folder_id}
                 - projects/{project_id}
        bigquery_export_id: Unique identifier that is used to identify the export.
    """
    from google.cloud import securitycenter

    client = securitycenter.SecurityCenterClient()

    request = securitycenter.DeleteBigQueryExportRequest()
    request.name = f"{parent}/bigQueryExports/{bigquery_export_id}"

    client.delete_big_query_export(request)
    print(f"BigQuery export request deleted successfully: {bigquery_export_id}")

删除导出配置后,您可以从 Looker 数据洞察中移除数据。如需了解详情,请参阅移除、删除和恢复数据源

在 BigQuery 中查看发现结果

创建导出配置后,系统会将新的发现结果导出到您指定的项目中的 BigQuery 数据集。

如需在 BigQuery 中查看发现结果,请执行以下操作:

  1. 转到 BigQuery 中的项目。

    转至 BigQuery

  2. 如果您不在正确的项目中,请执行以下步骤:

    1. 在工具栏上,点击 项目选择器。
    2. “请选择:”旁边,选择您的组织。
    3. 在项目列表中,选择您的项目。
  3. 探索器窗格中,展开项目的节点。

  4. 展开您的数据集。

  5. 点击发现结果表。

  6. 在打开的标签页上,点击预览。此时会显示一组数据示例。

实用的查询

本部分提供了用于分析发现结果数据的示例查询。在以下示例中,将 DATASET 替换为分配给数据集的名称,将 PROJECT_ID 替换为数据集的项目名称。

如需排查遇到的任何错误,请参阅错误消息

每天创建和更新的新发现结果的数量

SELECT
    FORMAT_DATETIME("%Y-%m-%d", event_time) AS date,
    count(DISTINCT finding_id)
FROM `PROJECT_ID.DATASET.findings`
GROUP BY date
ORDER BY date DESC

每个发现结果的最新发现结果记录

SELECT
    * EXCEPT(row)
FROM (
    SELECT *, ROW_NUMBER() OVER(
        PARTITION BY finding_id
        ORDER BY event_time DESC, finding.mute_update_time DESC
    ) AS row
    FROM `PROJECT_ID.DATASET.findings`
)
WHERE row = 1

按时间排序的当前活跃发现结果

WITH latestFindings AS (
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding
FROM latestFindings
WHERE finding.state = "ACTIVE"
ORDER BY event_time DESC

项目中的当前发现结果

WITH latestFindings AS (
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding, resource
FROM latestFindings
WHERE resource.project_display_name = 'PROJECT'

PROJECT 替换为项目名称。

文件夹中的当前发现结果

WITH latestFindings AS(
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding, resource
FROM latestFindings
CROSS JOIN UNNEST(resource.folders) AS folder
WHERE folder.resource_folder_display_name = 'FOLDER'

FOLDER 替换为文件夹名称。

扫描程序 Logging Scanner 的当前发现结果

WITH latestFindings AS (
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding
FROM latestFindings
CROSS JOIN UNNEST(finding.source_properties) AS source_property
WHERE source_property.key = "ScannerName"
  AND source_property.value = "LOGGING_SCANNER"

类型为 Persistence: IAM Anomalous Grant 的当前活跃发现结果

WITH latestFindings AS(
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT finding_id, event_time, finding
FROM latestFindings
WHERE finding.state = "ACTIVE"
  AND finding.category = "Persistence: IAM Anomalous Grant"

将给定类型的活跃发现结果与 Cloud Audit Logs 关联

此示例查询会显示授权者在异常 IAM 授权操作之前和之后的时段内所执行的管理员活动操作序列,以便使用 Cloud Audit Logs 中的 Event Threat Detection 功能来调查异常 IAM 授权的发现结果。以下查询将发现结果时间戳之前 1 小时到之后 1 小时的管理员活动日志进行关联。

WITH latestFindings AS(
    SELECT * EXCEPT(row)
    FROM (
        SELECT *, ROW_NUMBER() OVER(
            PARTITION BY finding_id
            ORDER BY event_time DESC, finding.mute_update_time DESC
        ) AS row
        FROM `PROJECT_ID.DATASET.findings`
    ) WHERE row = 1
)
SELECT
  finding_id,
  ANY_VALUE(event_time) as event_time,
  ANY_VALUE(finding.access.principal_email) as grantor,
  JSON_VALUE_ARRAY(ANY_VALUE(finding.source_properties_json), '$.properties.sensitiveRoleGrant.members') as grantees,
  ARRAY_AGG(
    STRUCT(
      timestamp,
      IF(timestamp < event_time, 'before', 'after') as timeline,
      protopayload_auditlog.methodName,
      protopayload_auditlog.resourceName,
      protopayload_auditlog.serviceName
    )
    ORDER BY timestamp ASC
  ) AS recent_activity
FROM (
  SELECT
    f.*,
    a.*,
  FROM latestFindings AS f
  LEFT JOIN `PROJECT_ID.DATASET.cloudaudit_googleapis_com_activity` AS a
  ON a.protopayload_auditlog.authenticationInfo.principalEmail = f.finding.access.principal_email
  WHERE f.finding.state = "ACTIVE"
    AND f.finding.category = "Persistence: IAM Anomalous Grant"
    AND a.timestamp >= TIMESTAMP_SUB(f.event_time, INTERVAL 1 HOUR)
    AND a.timestamp <= TIMESTAMP_ADD(f.event_time, INTERVAL 1 HOUR)
  )
GROUP BY
  finding_id
ORDER BY
  event_time DESC

输出内容类似如下:

查询结果的屏幕截图,其中显示了发现结果及其关联的审核日志

在 Looker 数据洞察中创建图表

利用 Looker 数据洞察,您可以创建交互式报告和信息中心。

一般来说,通过 Looker 数据洞察访问 BigQuery 时会产生 BigQuery 使用费。如需了解详情,请参阅使用 Looker 数据洞察直观呈现 BigQuery 数据

要创建按严重性和类别直观呈现发现结果数据的图表,请执行以下操作:

  1. 打开 Looker 数据洞察并登录。
  2. 如果出现提示,请提供其他信息并设置其他偏好设置。阅读服务条款,如果满意,则继续。
  3. 点击空白报告
  4. 连接到数据标签页上,点击 BigQuery 卡。
  5. 如果出现提示,请授权 Looker 数据洞察访问 BigQuery 项目。
  6. 连接到您的发现结果数据:
    1. 项目部分,为数据集选择该项目。或者,在我的项目标签页中,输入您的项目 ID 以进行搜索。
    2. 对于数据集,点击数据集的名称。
    3. 对于表格,点击发现结果
    4. 点击添加
    5. 在对话框中,点击添加到报告
  7. 添加报告后,点击添加图表
  8. 点击堆叠柱形图,然后点击要放置的区域。

    图表选择界面的屏幕截图

  9. 图表 > 条形图窗格的数据标签页上,设置以下字段:

    1. 维度字段中,选择 finding.severity
    2. 细分维度字段中,选择 finding.category
    按严重性分类并按类别进一步分类的发现结果图表的屏幕截图

报告已更新,以显示按严重程度和类别拆分的结果的多个列。

后续步骤

了解如何在 BigQuery 中运行查询