代码示例Foundry APIs本地环境

注意:以下翻译的准确性尚未经过验证。这是使用 AIP ↗ 从原始英文文本进行的机器翻译。

本地环境

Python

将资源添加到项目

如何使用API调用将资源添加到项目?

此代码使用requests库向Compass API发送HTTP POST请求,将工件的引用导入到指定项目中。它包括出错处理、日志记录和非必填代理配置。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 from requests.adapters import HTTPAdapter import requests from urllib3 import Retry import logging import json ''' 将工件的引用导入为指定项目的引用 ''' # 请求头 headers = { 'Authorization': 'Bearer xxx', # 使用你的Bearer令牌替换 'xxx' 'Content-Type': 'application/json', } # 主机地址 host = 'subdomain.domain.extension:port' # 代理 proxyDict = { 'https': 'protocol://subdomain.domain.extension:port' } # 重试策略 retry = Retry(connect=1, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) http = requests.Session() http.mount('https://', adapter) # 设置日志记录级别 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("debug.log"), # 将日志记录到文件 logging.StreamHandler() # 在控制台输出日志 ] ) ############### ## 变量定义 ## ############### RESSOURCE_TO_ADD = "ri......" # 要添加的资源 PROJECT_TO_ADD_TO = "ri.compass.main.folder.xxxx-xxx-xxx-xxx-xxxx" # 要添加到的项目 # 如果请求失败,则抛出错误并提供相关信息 try: print(f'Beginning script for ...') # 数据 source_data = { "requests": [ {"resourceRid": f"{RESSOURCE_TO_ADD}"} ] } # 序列化JSON data = json.dumps(source_data) response = http.post(f'https://{host}/compass/api/projects/imports/{PROJECT_TO_ADD_TO}/import', data=data, headers=headers, # 如果需要代理,取消下面行的注释 # proxies=proxyDict ) print('Completed request') print(f'The result of the script is ...') raw_response = response.text print(raw_response) print(response.status_code) except requests.exceptions.RequestException as e: raise Exception( f"请求过程中发生错误。\n失败原因:{response.status_code} - {response.text}\n异常:{e}")

此代码片段用于向指定项目导入资源引用。它配置了请求头、主机地址、代理和重试策略,并记录请求操作的日志。代码中包含了错误处理机制,以便在请求失败时提供详细的错误信息。

  • 提交日期: 2024-03-26
  • 标签: API, python, compass

数据集行数

如何批量计算多个数据集的行数?

此代码使用 Foundry API 触发对一系列数据集 RID 的行数计算。它发送一个 POST 请求到 Foundry Stats API,以数据集 RID 和分支作为参数。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from shutil import ExecError from wsgiref import headers import requests from urllib3 import Retry import json import pprint ''' 脚本将触发提供的数据集RID集合的行计数计算 ''' # 基础变量 base_url = "https://STACK_NAME.palantircloud.com" branch = "master" DATASETS_RIDS = [ "ri.foundry.main.dataset.6d2cd3de-0052-xxxxx-c7ae2c4ab1d8" ] headers = { 'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx', # 在这里放置您的认证令牌 'Content-Type': 'application/json' # 内容类型为JSON } # 代理 proxyDict = { # "https": "https://proxyIfNeeded:port" # 如果需要代理,在这里配置 } # 重试机制 retry = Retry(connect=1, backoff_factor=0.5) # 连接失败时重试一次,退避因子为0.5 adapter = requests.adapters.HTTPAdapter(max_retries=retry) # 设置请求适配器 http = requests.Session() # 创建一个会话 http.mount("https://", adapter) # 为HTTPS请求安装适配器 def trigger_row_count(dataset_rid, branch): # 发送POST请求,触发数据集行计数 response = http.post(f'{base_url}/foundry-stats/api/stats/datasets/{dataset_rid}/branches/{branch}', headers=headers, proxies=proxyDict) raw_response = response.text # 获取响应文本 curr_response = json.loads(raw_response) # 将响应文本解析为JSON格式 pprint.pprint(curr_response) # 打印美化后的JSON响应 return curr_response for curr_dataset_rid in DATASETS_RIDS: trigger_row_count(curr_dataset_rid, branch) # 对每个数据集RID触发行计数
  • 提交日期: 2024-03-26
  • 标签: 以{filetype}格式导出, python, 指标, 元数据, 本地

获取跨数据集的列超集

如何获取多个数据集的所有列的集合?

此代码使用requests库获取目标数据集列表中每个数据集的架构,然后遍历架构中的字段以创建一个字典,其中包含列超集中每个列的频率。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 import time from requests.packages.urllib3.util.retry import Retry from requests.adapters import HTTPAdapter import requests import json import pprint import logging import datetime import collections ''' 从一组数据集中生成列的超集及其频率的脚本 ''' headers = { 'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx', # 在这里放置你的Token 'Content-Type': 'application/json', } ## STACK_NAME base_url = "STACK_NAME.palantircloud.com" branch = "master" target_datasets = ["ri.foundry.main.dataset.4c2ac089-xxxx-4df863eaf823"] # 代理 proxyDict = { #"https": "https://proxyIfNeeded:port" # 如果需要代理,在这里设置 } # 重试机制 retry = Retry(connect=1, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) http = requests.Session() http.mount("https://", adapter) global_list_fields = {} for curr_dataset in target_datasets : # 获取数据集的schema print(f"Step 1. 获取数据集的Schema") response = http.get(f'{base_url}/foundry-metadata/api/schemas/datasets/{curr_dataset}/branches/{branch}', headers=headers, proxies=proxyDict) print(f"Step 1. 获取数据集Schema的响应") raw_response = response.text print(raw_response) curr_schema = json.loads(raw_response) list_fields = curr_schema["schema"]["fieldSchemaList"] for field in list_fields: curr_key = f"{field['name']} - {field['type']}" # 计数器递增 global_list_fields[curr_key] = global_list_fields.get(curr_key, 0) + 1 print("未排序的字典") pprint.pprint(global_list_fields) # 排序 sorted_dict = {k: v for k, v in sorted(global_list_fields.items(), key=lambda item: item[1])} print("已排序的字典") pprint.pprint(sorted_dict)
  • 提交日期: 2024-03-26
  • 标签: pythonAPI元数据代码库代码创作本地

OSS直接调用

如何使用Object Set Service(OSS)对对象集执行聚合?

此代码演示了如何直接调用Object Set Service(OSS)以对对象集执行聚合。这对于调试或理解OSS返回的数据很有用,OSS被OSDK和前端等其他服务使用。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 from requests.adapters import HTTPAdapter import requests from urllib3 import Retry import logging import json import pprint ''' 直接调用 object-set-service (OSS) 进行聚合等操作。 这不是在生产环境中用于“实际使用”的,但对于调试或更深入了解 OSS 实际返回的内容可能非常有用。 OSS 在其他服务(如 OSDK 和前端)中被底层使用。 ''' # 请求头 headers = { 'Authorization': 'Bearer xxx', # 用你的 Bearer 令牌替换 'xxx' 'Content-Type': 'application/json', } # 主机 host = 'subdomain.domain.extension:port' # 代理 proxyDict = { 'https': 'protocol://subdomain.domain.extension:port' } # 重试配置 retry = Retry(connect=1, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) http = requests.Session() http.mount('https://', adapter) # 设置要显示的日志级别 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("debug.log"), logging.StreamHandler() ] ) ############### ## 变量定义 ## ############### # 如果请求失败,则抛出错误,并提供相关信息 try: print(f'Beginning script for ...') # 数据 - 示例 OSS 的有效负载 data = { "executionMode":"PREFER_ACCURACY", # OSS 默认为 "PREFER_SPEED",这可能会提供不准确的结果 "objectSet": { "base": { "objectTypeId": "af-20m-instances-obv2" # 对象实例 ID }, "type": "base" }, "aggregation": { "metrics": {}, "subAggregations": { "test": { "type": "metrics", "metrics": { "dimension": { "type": "propertyValue", "propertyValue": { "propertyId": "example_bucket", # 要聚合的对象属性 "bucketing": { "type": "exactValue", "exactValue": { "maxBuckets": 10 # 响应中应包含的桶数量 } } } }, "ordering": [ { "type": "valueOrdering", "valueOrdering": { "direction": "DESCENDING", "metricName": "countM" } } ], "metrics": { "countM": { "type": "count", "count": {} } } } } } } } # 序列化 JSON 数据 data = json.dumps(source_data) response = http.put(f'https://{host}/object-set-service/api/aggregate', data=data, headers=headers, # 如果需要代理,请取消注释 # proxies=proxyDict ) print('Completed request') print(f'The result of the script is ...') pprint.pprint(response.json()) except requests.exceptions.RequestException as e: raise Exception(f"请求过程中发生错误。\n失败原因: {response.status_code} - {response.text}\n异常: {e}")

注释:

  1. 确保替换 'Bearer xxx' 中的 xxx 为你的实际 Bearer 令牌。
  2. 如果需要使用代理,请确保取消注释 proxies=proxyDict 行。
  3. objectTypeIdpropertyId 等字段需要根据具体的使用场景进行调整。
  4. hostproxyDict 中的 URL 和端口需要根据实际部署环境进行配置。
  • 提交日期: 2024-03-26
  • 标签: ontology, 聚合, objects, python, API, local

Ping Foundry: 无需词元

如何在没有认证词元的情况下ping我的Foundry实例或账户?

此代码演示了如何使用Python requests库发送ping请求到Palantir Cloud Stack,指定代理和重试设置。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from requests.packages.urllib3.util.retry import Retry from requests.adapters import HTTPAdapter import requests headers = { # 原则上:不需要令牌!'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx', 'Content-Type': 'application/json', } ## STACK 基础 URL base_url = "https://STACK_NAME.palantircloud.com" # 代理设置 proxyDict = { "https": "http://proxy.host.com:3333" } # 重试策略 retry = Retry(connect=1, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) http = requests.Session() http.mount("https://", adapter) # 发送请求并输出响应 print("Pinging ... ") response = http.get(f"{base_url}/compass/api/ping", headers=headers, proxies=proxyDict) print("Ping response :") raw_response = response.text print(raw_response)

在这段代码中:

  • headers 字典用于设置HTTP请求的头信息,其中目前没有添加授权令牌。
  • base_url 是API请求的基础URL,您需要将 STACK_NAME 替换为实际的栈名称。
  • proxyDict 用于配置HTTP请求的代理。
  • Retry 对象用于配置请求的重试策略,这里设置了连接重试一次,且每次重试之间的退避因子为0.5。
  • HTTPAdapter 通过 max_retries 参数使用之前定义的重试策略。
  • requests.Session() 用于创建一个会话对象,以便重用连接。
  • 代码最后发送GET请求到指定的URL,并打印返回的响应文本。
  • 提交日期: 2024-03-26
  • 标签: API, python, compass, local

获取给定资源RID的路径

我如何从资源的RID中找到路径?

此代码使用requests库向指定主机发送HTTP GET请求,并检索资源的路径。它还处理重试和代理。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from requests.adapters import HTTPAdapter import requests from urllib3 import Retry ''' 脚本用于返回给定资源标识符(RID)的路径。 ''' # 请求头 headers = { 'Authorization': 'Bearer xxx', # 用你的Bearer令牌替换'xxx' 'Content-Type': 'application/json', } # 主机 host = 'host.com:443' # 代理 proxyDict = { 'https': 'http://proxy.domain.com:3333' } # 重试设置 retry = Retry(connect=1, backoff_factor=0.5) # 设置重试策略,连接失败时重试1次,重试间隔0.5秒 adapter = HTTPAdapter(max_retries=retry) http = requests.Session() http.mount('https://', adapter) # 输入你想要获取路径的资源RID RESOURCE_RID = '' # 如果创建引用失败,则抛出错误 try: print(f'Fetching path for rid {RESOURCE_RID} ...') response = http.get(f'https://{host}/compass/api/resources/{RESOURCE_RID}/path-json', headers=headers, proxies=proxyDict) print('Completed request') print(f'The path is: {response.text}') except requests.exceptions.RequestException as e: # 如果请求发生异常则抛出错误,返回失败的信息 raise Exception(f"An error occurred in the request.\nReturning the path for the repository: {RESOURCE_RID} failed due to: {response.status_code} - {response.text}\nException: {e}")
  • 提交日期: 2024-03-26
  • 标签: api, python, 元数据, 本地

使用API触发操作

如何手动触发Object上的操作?

此代码使用requests库向操作API发送HTTP请求,遍历ID列表并为每个ID触发带有自定义参数的操作。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 from requests.packages.urllib3.util.retry import Retry from requests.adapters import HTTPAdapter import pprint import uuid import requests import json import time ''' Script that will trigger an action. ''' headers = { 'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx', 'Content-Type': 'application/json' } # Name of the stack STACK = "STACK_NAME.palantircloud.com" # List of ids, could be any list of parameters you want to iterate on list_ids = ["123", "456"] # Proxies proxyDict = { "https": "https://proxyIfNeeded:port" } # Retries retry = Retry(connect=1, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) http = requests.Session() http.mount("https://", adapter) # Iterate over the list of ids and trigger one action per ID. for curr_id in list_ids: curr_uuid = "GENERATED-OBJECT-" + str(uuid.uuid4()) # To generate a uuid 生成一个唯一标识符 curr_title = "GENERATED-" + str(time.time()) # To generate a timestamp 生成一个时间戳 user_rid = "xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx" # if a user rid is needed 如果需要用户rid try: # Generate the payload of the action. Look at a network tab from slate/Workshop to obtain it - or build it from scratch. # 生成操作的负载。可以从slate/Workshop中的网络选项卡获取,也可以从头开始构建。 payload = r'{"actionTypeRid":"ri.actions.main.action-type.xxxxx-xxxx-xxxx-xxxxxxxx",' \ r'"parameters":{"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"timestamp":"2021-09-30T23:59:59+02:00","type":"timestamp"},' \ r'"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"stringList":{"strings":["mystring1","mystring2"]},"type":"stringList"},' \ r'"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"string":"my_string","type":"string"},' \ r'"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"timestamp":"2022-01-01:23:26+00:00","type":"timestamp"}}' response = http.post(f'https://{STACK}/actions/api/actions', headers=headers, proxies=proxyDict, data=payload) print(f"Raw response of action call with : {curr_id}\r\n") raw_response = response.json() pprint.pprint(raw_response, indent=4) except Exception as e: print(e)

在此代码中,我们使用了requests库来进行HTTP请求,并通过循环遍历list_ids列表,对每个ID执行一个特定的操作。在每次操作中,我们生成一个唯一标识符和时间戳,并构建请求的负载数据。通过HTTPAdapterRetry设置了请求重试策略,以应对网络不稳定的问题。

  • 提交日期: 2024-03-26
  • 标签: 操作, objects, Ontology, 对象上的操作, python, api, local

上传本地文件到数据集

我可以使用哪个API在Foundry中上传本地文件到数据集?

此代码使用Foundry API将文件上传到指定的数据集。它为请求设置了headers, host, proxies和retries,然后读取文件并将其作为POST请求发送到数据集的files端点。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 from requests.adapters import HTTPAdapter import requests from urllib3 import Retry import logging import json # 请求头 headers = { 'Authorization': 'Bearer xxx', # 用你的 Bearer token 替换 'xxx' 'Content-type': 'application/octet-stream', ### 重要! } # 主机地址 host = 'subdomain.domain.extension:port' # 代理配置 proxyDict = { 'https': 'protocol://subdomain.domain.extension:port' } # 重试配置 retry = Retry(connect=1, backoff_factor=0.5) # 重试1次,退避因子为0.5 adapter = HTTPAdapter(max_retries=retry) http = requests.Session() http.mount('https://', adapter) # 配置日志记录级别 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("debug.log"), # 将日志写入到文件 logging.StreamHandler() # 同时在控制台输出日志 ] ) ############### ## 变量定义 ## ############### # 要上传的Dataset RID datasetRid = "rid.123..." params = { 'filePath': 'folder_name/my-file.csv', # 文件路径参数 } # 如果请求失败,则抛出错误并附带信息 try: print(f'开始执行一次上传的脚本') # 数据读取 with open('./data_example_file.csv') as f: data = f.read().replace('\n', '').replace('\r', '').encode() # 读取文件并去除换行符 response = http.post(f'https://{host}/api/v1/datasets/{datasetRid}/files:upload', params=params, data=data, headers=headers, # 如果需要代理,取消下面行的注释 # proxies=proxyDict ) print('请求已完成') print(f'脚本执行结果为 {response.status_code} - {response.text}') except requests.exceptions.RequestException as e: raise Exception( f"请求过程中发生了错误。\n失败原因: {response.status_code} - {response.text}\n异常信息: {e}")

代码功能说明:

  1. 请求头和主机配置:设置了请求头信息,其中包括授权信息和内容类型。主机地址和代理配置也在代码中定义。

  2. 重试机制:使用RetryHTTPAdapter配置了请求的重试机制,以处理潜在的连接失败。

  3. 日志记录:配置日志记录,输出到控制台和文件。

  4. 请求上传文件:打开并读取本地CSV文件,发送POST请求上传文件到指定的远程服务器。

  5. 错误处理:使用try-except块捕获请求异常并打印相关错误信息。

  • 提交日期: 2024-04-04
  • 标签: API, 文件上传, 数据集, python, csv, 本地