Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 164 additions & 2 deletions examples/basic/functional_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import sys
import time
from enum import Enum
from typing import Optional, Dict, Any
from datetime import datetime

Expand Down Expand Up @@ -909,7 +910,12 @@ def test_batch_all_operations(client: DataverseClient, table_info: Dict[str, Any
pass


def cleanup_test_data(client: DataverseClient, table_info: Dict[str, Any], record_id: str) -> None:
def cleanup_test_data(
client: DataverseClient,
table_info: Dict[str, Any],
record_id: str,
picklist_table_schema_name: Optional[str] = None,
) -> None:
"""Clean up test data."""
print("\n-> Cleanup")
print("=" * 50)
Expand Down Expand Up @@ -979,6 +985,46 @@ def cleanup_test_data(client: DataverseClient, table_info: Dict[str, Any], recor
else:
print("Test table kept for future testing")

# --- Picklist test table cleanup ---
if picklist_table_schema_name:
picklist_cleanup = (
input(f"Do you want to delete the picklist test table '{picklist_table_schema_name}'? (y/N): ")
.strip()
.lower()
)
if picklist_cleanup in ["y", "yes"]:
for attempt in range(1, retries + 1):
try:
client.tables.delete(picklist_table_schema_name)
print(f"[OK] Picklist test table '{picklist_table_schema_name}' deleted successfully")
break
except HttpError as err:
status = getattr(err, "status_code", None)
if status == 404:
if _table_still_exists(client, picklist_table_schema_name):
if attempt < retries:
print(
f" Picklist table delete retry {attempt}/{retries} after metadata 404 ({err}). Waiting {delay_seconds}s..."
)
time.sleep(delay_seconds)
continue
print(f"[WARN] Failed to delete picklist test table due to metadata delay: {err}")
break
print("[OK] Picklist test table deleted successfully (404 reported).")
break
if attempt < retries:
print(
f" Picklist table delete retry {attempt}/{retries} after error ({err}). Waiting {delay_seconds}s..."
)
time.sleep(delay_seconds)
continue
print(f"[WARN] Failed to delete picklist test table: {err}")
except Exception as e:
print(f"[WARN] Failed to delete picklist test table: {e}")
break
else:
print("Picklist test table kept for future testing")


def backoff(op, *, delays=(0, 2, 5, 10, 20, 20)):
"""Retry helper with exponential backoff for metadata propagation delays."""
Expand Down Expand Up @@ -1260,6 +1306,118 @@ def _get_or_create(schema, columns, label):
print(f" [WARN] Could not delete {tbl}: {e}")


def test_picklist_table(client: DataverseClient) -> str:
"""Create a table with a local picklist column and write/read records.

Demonstrates:
- Defining a local OptionSet via an ``Enum`` subclass passed as the column ``dtype``.
- Optional multi-language labels via the ``__labels__`` class attribute.
- Writing records using either the enum member's integer value OR its label.
- Reading the integer value back, and the formatted label via
``include_annotations="OData.Community.Display.V1.FormattedValue"``.

Returns the schema name of the table so the caller can clean it up later.
"""
print("\n-> Picklist Column Test")
print("=" * 50)

table_schema_name = "test_PicklistAttribute"

# Define a local option set as an Enum. Optional __labels__ provides
# display labels per language code (1033 = English, 1036 = French).
class TaskStatus(Enum):
NotStarted = 1
InProgress = 2
Completed = 3
Cancelled = 4

__labels__ = {
1033: {
"NotStarted": "Not Started",
"InProgress": "In Progress",
"Completed": "Completed",
"Cancelled": "Cancelled",
},
1036: {
"NotStarted": "Non commencé",
"InProgress": "En cours",
"Completed": "Terminé",
"Cancelled": "Annulé",
},
}

record_id: Optional[str] = None
try:
# Drop any leftover table from a prior failed run so this example is idempotent.
try:
existing = client.tables.get(table_schema_name)
if existing:
print(f" Removing leftover '{table_schema_name}' from a previous run...")
client.tables.delete(table_schema_name)
except Exception:
pass

print(f"Creating table '{table_schema_name}' with a picklist column 'test_status'...")

client.tables.create(
table_schema_name,
primary_column="test_name",
columns={
"test_status": TaskStatus, # Enum subclass => local picklist
"test_notes": "string",
},
)

table_info = wait_for_table_metadata(client, table_schema_name)
print(f"[OK] Picklist table ready: entity_set='{table_info.get('entity_set_name')}'")

# --- Insert one record using the enum's integer value ---
rec_by_int = {
"test_name": f"Picklist Int {datetime.now().strftime('%H:%M:%S')}",
"test_status": TaskStatus.InProgress.value, # integer 2
"test_notes": "Created using TaskStatus.InProgress.value",
}
record_id = client.records.create(table_schema_name, rec_by_int)
print(f"[OK] Created record by int value: {record_id} (status={TaskStatus.InProgress.value})")

# --- Insert another record using the picklist label (SDK resolves label -> int) ---
rec_by_label = {
"test_name": f"Picklist Label {datetime.now().strftime('%H:%M:%S')}",
"test_status": "Completed", # resolved via label cache to int 3
"test_notes": "Created using label string 'Completed'",
}
record_id_2 = client.records.create(table_schema_name, rec_by_label)
print(f"[OK] Created record by label: {record_id_2} (status='Completed' -> 3)")

# --- Read back including the FormattedValue annotation ---
annotation = "OData.Community.Display.V1.FormattedValue"
retrieved = client.records.retrieve(
table_schema_name,
record_id,
select=["test_name", "test_status"],
include_annotations=annotation,
)
status_int = retrieved.get("test_status")
status_label = retrieved.get(f"test_status@{annotation}")
print(f" Retrieved: test_status={status_int}, formatted='{status_label}'")
assert status_int == TaskStatus.InProgress.value, f"expected {TaskStatus.InProgress.value}, got {status_int}"

# --- List records, filtering by the picklist column ---
completed = client.records.list(
table_schema_name,
select=["test_name", "test_status"],
filter=f"test_status eq {TaskStatus.Completed.value}",
include_annotations=annotation,
)
print(f"[OK] Query by picklist value found {len(completed)} 'Completed' record(s).")

except HttpError as e:
print(f"[ERR] HTTP error during picklist test: {e}")
raise

return table_schema_name


def _table_still_exists(client: DataverseClient, table_schema_name: Optional[str]) -> bool:
if not table_schema_name:
return False
Expand Down Expand Up @@ -1304,6 +1462,9 @@ def main():
# Test querying
test_query_records(client, table_info)

# Test picklist (local OptionSet) column creation, write, read
picklist_table_info = test_picklist_table(client)

# Test relationships
test_relationships(client)

Expand All @@ -1321,13 +1482,14 @@ def main():
print("[OK] Record Creation: Success")
print("[OK] Record Reading: Success")
print("[OK] Record Querying: Success")
print("[OK] Picklist Column: Success")
print("[OK] Relationship Operations: Success")
print("[OK] SQL Encoding: Success")
print("[OK] Batch Operations: Success")
print("\nYour PowerPlatform Dataverse Client SDK is fully functional!")

# Cleanup
cleanup_test_data(client, table_info, record_id)
cleanup_test_data(client, table_info, record_id, picklist_table_info)

except KeyboardInterrupt:
print("\n\n[WARN] Test interrupted by user")
Expand Down
30 changes: 17 additions & 13 deletions src/PowerPlatform/Dataverse/data/_odata.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,18 +816,22 @@ def _create_entity(
attributes: List[Dict[str, Any]],
solution_unique_name: Optional[str] = None,
) -> Dict[str, Any]:
url = f"{self.api}/EntityDefinitions"
url = f"{self.api}/CreateEntities"
Comment thread
vrathee-msft marked this conversation as resolved.
Comment thread
vrathee-msft marked this conversation as resolved.
payload = {
"@odata.type": "Microsoft.Dynamics.CRM.EntityMetadata",
"SchemaName": table_schema_name,
"DisplayName": self._label(display_name),
"DisplayCollectionName": self._label(display_name + "s"),
"Description": self._label(f"Custom entity for {display_name}"),
"OwnershipType": "UserOwned",
"HasActivities": False,
"HasNotes": True,
"IsActivity": False,
"Attributes": attributes,
"Entities": [
{
"@odata.type": "Microsoft.Dynamics.CRM.ComplexEntityMetadata",
"SchemaName": table_schema_name,
"DisplayName": self._label(display_name),
"DisplayCollectionName": self._label(display_name + "s"),
"Description": self._label(f"Custom entity for {display_name}"),
"OwnershipType": "UserOwned",
"HasActivities": False,
"HasNotes": True,
"IsActivity": False,
"Attributes": attributes,
Comment thread
vrathee-msft marked this conversation as resolved.
}
]
}
params = None
if solution_unique_name:
Comment thread
vrathee-msft marked this conversation as resolved.
Expand Down Expand Up @@ -1298,9 +1302,9 @@ def _create_table(
)

attributes: List[Dict[str, Any]] = []
attributes.append(self._attribute_payload(primary_attr_schema, "string", is_primary_name=True))
attributes.append(self._attribute_payload(primary_attr_schema, "string", is_primary_name=True, complex=True))
for col_name, dtype in schema.items():
payload = self._attribute_payload(col_name, dtype)
payload = self._attribute_payload(col_name, dtype, complex=True)
if not payload:
raise ValueError(f"Unsupported column type '{dtype}' for '{col_name}'.")
attributes.append(payload)
Expand Down
Loading
Loading