Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies = [
"mermaid-builder==0.0.3",
"graphtty==0.1.8",
"applicationinsights>=0.11.10",
"sqlparse>=0.4.4",
]
classifiers = [
"Intended Audience :: Developers",
Expand Down
7 changes: 7 additions & 0 deletions src/uipath/agent/models/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class AgentContextRetrievalMode(str, Enum):
STRUCTURED = "Structured"
DEEP_RAG = "DeepRAG"
BATCH_TRANSFORM = "BatchTransform"
DATA_FABRIC = "DataFabric"
UNKNOWN = "Unknown" # fallback branch discriminator


Expand Down Expand Up @@ -342,6 +343,7 @@ class AgentContextSettings(BaseCfg):
AgentContextRetrievalMode.STRUCTURED,
AgentContextRetrievalMode.DEEP_RAG,
AgentContextRetrievalMode.BATCH_TRANSFORM,
AgentContextRetrievalMode.DATA_FABRIC,
AgentContextRetrievalMode.UNKNOWN,
] = Field(alias="retrievalMode")
threshold: float = Field(default=0)
Expand All @@ -361,6 +363,10 @@ class AgentContextSettings(BaseCfg):
output_columns: Optional[List[AgentContextOutputColumn]] = Field(
None, alias="outputColumns"
)
# Data Fabric specific settings
entity_identifiers: Optional[List[str]] = Field(
None, alias="entityIdentifiers"
)


class AgentContextResourceConfig(BaseAgentResourceConfig):
Expand Down Expand Up @@ -1198,6 +1204,7 @@ def _normalize_resources(v: Dict[str, Any]) -> None:
"structured": "Structured",
"deeprag": "DeepRAG",
"batchtransform": "BatchTransform",
"datafabric": "DataFabric",
"unknown": "Unknown",
}

Expand Down
261 changes: 256 additions & 5 deletions src/uipath/platform/entities/_entities_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
from typing import Any, List, Optional, Type
from typing import Any, Dict, List, Optional, Type

import sqlparse
from httpx import Response
from sqlparse.sql import (
IdentifierList,
Parenthesis,
Statement,
Token,
TokenList,
Where,
)
from sqlparse.tokens import DML, Comment, Keyword, Punctuation, Wildcard

from ..._utils import Endpoint, RequestSpec
from ...tracing import traced
Expand All @@ -11,15 +21,39 @@
EntityRecordsBatchResponse,
)

_FORBIDDEN_SQL_KEYWORDS = {
"INSERT",
"UPDATE",
"DELETE",
"MERGE",
"DROP",
"ALTER",
"CREATE",
"TRUNCATE",
"REPLACE",
}
_DISALLOWED_SQL_OPERATORS = {
"WITH",
"UNION",
"INTERSECT",
"EXCEPT",
"OVER",
"ROLLUP",
"CUBE",
"GROUPING SETS",
"PARTITION BY",
}


class EntitiesService(BaseService):
"""Service for managing UiPath Data Service entities.

Entities are database tables in UiPath Data Service that can store
structured data for automation processes.
Entities represent business objects that provide structured data storage and access via the Data Service.
This service allows you to retrieve entity metadata, list entities, and query records using SQL.

See Also:
https://docs.uipath.com/data-service/automation-cloud/latest/user-guide/introduction
!!! warning "Preview Feature"
This function is currently experimental.
Behavior and parameters, request and response formats are subject to change in future versions.
"""

def __init__(
Expand Down Expand Up @@ -389,6 +423,102 @@ class CustomerRecord:
EntityRecord.from_data(data=record, model=schema) for record in records_data
]

@traced(name="query_multiple_entities", run_type="uipath")
def query_multiple_entities(
self,
sql_query: str,
schema: Optional[Type[Any]] = None,
) -> List[Dict[str, Any]]:
"""Query entity records using a SQL query.

This method allows executing SQL queries directly against entity data
via the Data Fabric query endpoint.

Args:
sql_query (str): The full SQL query to execute. Should be a valid
SELECT statement targeting the entity.
schema (Optional[Type[Any]]): Optional schema class for validation.

Returns:
List[Dict[str, Any]]: A list of record dictionaries matching the query.

Examples:
Basic query::

records = entities_service.query_multiple_entities(
"SELECT * FROM Customers WHERE Status = 'Active' LIMIT 100"
)

Query with specific fields::

records = entities_service.query_multiple_entities(
"SELECT OrderId, CustomerName, Amount FROM Orders WHERE Amount > 1000"
)
"""
self._validate_sql_query(sql_query)
spec = self._query_multiple_entities_spec(sql_query)
headers = {
"X-UiPath-Internal-TenantName": self._url.tenant_name,
"X-UiPath-Internal-AccountName": self._url.org_name,
}
# Use absolute URL to bypass scoping since org/tenant are embedded in the path
full_url = f"{self._url.base_url}{spec.endpoint}"
response = self.request(spec.method, full_url, json=spec.json, headers=headers)

if response.status_code == 200:
records_data = response.json().get("results", [])
return records_data
Comment on lines +469 to +470

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor schema argument in entity query methods

The new query_multiple_entities API accepts schema and documents it as validation input, but the method returns raw response.json()["results"] without ever applying that schema (and the async variant has the same behavior). In callers that rely on schema validation, malformed or shape-shifted records will pass through silently and fail later in downstream logic instead of failing at the SDK boundary.

Useful? React with 👍 / 👎.

else:
response.raise_for_status()
Comment on lines +468 to +472

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle non-200 successful responses consistently

This branch treats only 200 as success even though BaseService.request already raises for error statuses; for any other successful 2xx response (for example 204), execution falls into else, raise_for_status() is a no-op, and the method returns None despite its declared List[Dict[str, Any]] return type. That creates a latent runtime type break if the endpoint ever returns a non-200 success code.

Useful? React with 👍 / 👎.



@traced(name="query_multiple_entities_async", run_type="uipath")
async def query_multiple_entities_async(
self,
sql_query: str,
schema: Optional[Type[Any]] = None,
) -> List[Dict[str, Any]]:
"""Asynchronously query entity records using a SQL query.

This method allows executing SQL queries directly against entity data
via the Data Fabric query endpoint.

Args:
sql_query (str): The full SQL query to execute. Should be a valid
SELECT statement targeting the entity.
schema (Optional[Type[Any]]): Optional schema class for validation.

Returns:
List[Dict[str, Any]]: A list of record dictionaries matching the query.

Examples:
Basic query::

records = await entities_service.query_multiple_entities_async(
"SELECT * FROM Customers WHERE Status = 'Active' LIMIT 100"
)

Query with specific fields::

records = await entities_service.query_multiple_entities_async(
"SELECT OrderId, CustomerName, Amount FROM Orders WHERE Amount > 1000"
)
"""
self._validate_sql_query(sql_query)
spec = self._query_multiple_entities_spec(sql_query)
headers = {
"X-UiPath-Internal-TenantName": self._url.tenant_name,
"X-UiPath-Internal-AccountName": self._url.org_name,
}
full_url = f"{self._url.base_url}{spec.endpoint}"
response = await self.request_async(spec.method, full_url, json=spec.json, headers=headers)

if response.status_code == 200:
records_data = response.json().get("results", [])
return records_data
else:
response.raise_for_status()

@traced(name="entity_record_insert_batch", run_type="uipath")
def insert_records(
self,
Expand Down Expand Up @@ -872,6 +1002,17 @@ def _list_records_spec(
params=({"start": start, "limit": limit}),
)

def _query_multiple_entities_spec(
self,
sql_query: str,
) -> RequestSpec:
endpoint = f"/dataservice_/{self._url.org_name}/{self._url.tenant_name}/datafabric_/api/v1/query/execute"
return RequestSpec(
method="POST",
endpoint=Endpoint(endpoint),
json={"query": sql_query},
)

def _insert_batch_spec(self, entity_key: str, records: List[Any]) -> RequestSpec:
return RequestSpec(
method="POST",
Expand Down Expand Up @@ -900,3 +1041,113 @@ def _delete_batch_spec(self, entity_key: str, record_ids: List[str]) -> RequestS
),
json=record_ids,
)

def _validate_sql_query(self, sql_query: str) -> None:
query = sql_query.strip()
if not query:
raise ValueError("SQL query cannot be empty.")

statements = [stmt for stmt in sqlparse.parse(query) if stmt.tokens]
if len(statements) != 1:
raise ValueError("Only a single SELECT statement is allowed.")

statement = statements[0]
if statement.get_type() != "SELECT":
raise ValueError("Only SELECT statements are allowed.")

normalized_keywords = {
token.normalized
for token in statement.flatten()
if token.ttype in Keyword or token.ttype is DML
}

for keyword in _FORBIDDEN_SQL_KEYWORDS:
if keyword in normalized_keywords:
raise ValueError(f"SQL keyword '{keyword}' is not allowed.")

for operator in _DISALLOWED_SQL_OPERATORS:
if operator in normalized_keywords:
raise ValueError(
f"SQL construct '{operator}' is not allowed in entity queries."
)

if self._contains_subquery(statement):
raise ValueError("Subqueries are not allowed.")

has_where = any(isinstance(token, Where) for token in statement.tokens)
has_limit = any(
token.ttype in Keyword and token.normalized == "LIMIT"
for token in statement.flatten()
)
if not has_where and not has_limit:
raise ValueError("Queries without WHERE must include a LIMIT clause.")

projection_tokens = self._projection_tokens(statement)
has_wildcard_projection = any(
token.ttype is Wildcard
for projection_token in projection_tokens
for token in projection_token.flatten()
)
if has_wildcard_projection and not has_where:
raise ValueError("SELECT * without filtering is not allowed.")
if not has_where and self._projection_column_count(projection_tokens) > 4:
raise ValueError(
"Selecting more than 4 columns without filtering is not allowed."
)

def _contains_subquery(self, token_list: TokenList) -> bool:
for token in token_list.tokens:
if isinstance(token, Parenthesis):
if any(
nested.ttype is DML and nested.normalized == "SELECT"
for nested in token.flatten()
):
return True
if isinstance(token, TokenList) and self._contains_subquery(token):
return True
return False

def _projection_tokens(self, statement: Statement) -> List[Token]:
projection: List[Token] = []
found_select = False

for token in statement.tokens:
if token.is_whitespace or token.ttype in Comment:
continue
if not found_select:
if token.ttype is DML and token.normalized == "SELECT":
found_select = True
continue
if token.ttype in Keyword and token.normalized == "FROM":
break
projection.append(token)
return projection

def _projection_column_count(self, projection_tokens: List[Token]) -> int:
identifier_list = next(
(
token
for token in projection_tokens
if isinstance(token, IdentifierList)
),
None,
)
if identifier_list is not None:
return sum(1 for _ in identifier_list.get_identifiers())

count = 0
has_current_expression = False

for token in projection_tokens:
if token.is_whitespace or token.ttype in Comment:
continue
if token.ttype is Punctuation and token.value == ",":
if has_current_expression:
count += 1
has_current_expression = False
continue
has_current_expression = True

if has_current_expression:
count += 1
return count
50 changes: 50 additions & 0 deletions tests/agent/models/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2761,6 +2761,56 @@ def test_is_conversational_false_by_default(self):
assert config.is_conversational is False


class TestDataFabricContextConfig:
"""Tests for Data Fabric context resource configuration."""

def test_datafabric_retrieval_mode_exists(self):
"""Test that DATA_FABRIC retrieval mode is defined."""
assert AgentContextRetrievalMode.DATA_FABRIC == "DataFabric"

def test_datafabric_context_config_parses(self):
"""Test that Data Fabric context config parses correctly."""
config = {
"$resourceType": "context",
"name": "Customer Data",
"description": "Query customer and order data",
"isEnabled": True,
"folderPath": "Shared",
"indexName": "",
"settings": {
"retrievalMode": "DataFabric",
"resultCount": 100,
"entityIdentifiers": ["customers-key", "orders-key"],
},
}

parsed = AgentContextResourceConfig.model_validate(config)

assert parsed.name == "Customer Data"
assert parsed.settings.retrieval_mode == AgentContextRetrievalMode.DATA_FABRIC
assert parsed.settings.entity_identifiers == ["customers-key", "orders-key"]

def test_datafabric_context_config_without_entity_identifiers(self):
"""Test that entity_identifiers is optional."""
config = {
"$resourceType": "context",
"name": "Test",
"description": "Test",
"isEnabled": True,
"folderPath": "Shared",
"indexName": "",
"settings": {
"retrievalMode": "DataFabric",
"resultCount": 10,
},
}

parsed = AgentContextResourceConfig.model_validate(config)

assert parsed.settings.retrieval_mode == AgentContextRetrievalMode.DATA_FABRIC
assert parsed.settings.entity_identifiers is None


class TestAgentBuilderConfigResources:
"""Tests for AgentDefinition resource configuration parsing."""

Expand Down
Loading
Loading