diff --git a/acceptance/bin/assert_exists.py b/acceptance/bin/assert_exists.py new file mode 100755 index 00000000000..0d33b46d2aa --- /dev/null +++ b/acceptance/bin/assert_exists.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +import os, sys + +errors = 0 + +for filename in sys.argv[1:]: + if not os.path.exists(filename): + sys.stderr.write(f"Unexpected: {filename} does not exist.\n") + errors += 1 + +if errors: + sys.exit(1) diff --git a/acceptance/bin/assert_not_exists.py b/acceptance/bin/assert_not_exists.py new file mode 100755 index 00000000000..76d467e4515 --- /dev/null +++ b/acceptance/bin/assert_not_exists.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +import os, sys + +errors = 0 + +for filename in sys.argv[1:]: + if os.path.exists(filename): + sys.stderr.write(f"Unexpected: {filename} exists.\n") + errors += 1 + +if errors: + sys.exit(1) diff --git a/acceptance/bin/kill_after.py b/acceptance/bin/kill_after.py new file mode 100755 index 00000000000..029123a13f5 --- /dev/null +++ b/acceptance/bin/kill_after.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +"""Set up a kill rule on the testserver for the current test token. + +Usage: kill_after.py PATTERN OFFSET TIMES + + PATTERN HTTP method and path, e.g. "POST /api/2.2/jobs/create" + OFFSET number of requests to let through before killing starts + TIMES number of times to kill the caller + +The rule is scoped to the current DATABRICKS_TOKEN so it only affects +the test that registers it, even when tests share a server. +""" + +import json +import os +import sys +import urllib.request + +host = os.environ.get("DATABRICKS_HOST", "") +token = os.environ.get("DATABRICKS_TOKEN", "") + +if not host: + print("DATABRICKS_HOST not set", file=sys.stderr) + sys.exit(1) + +if len(sys.argv) != 4: + print(f"usage: {sys.argv[0]} PATTERN OFFSET TIMES", file=sys.stderr) + sys.exit(1) + +pattern, offset, times = sys.argv[1], int(sys.argv[2]), int(sys.argv[3]) + +data = json.dumps({"pattern": pattern, "offset": offset, "times": times}).encode() +req = urllib.request.Request( + f"{host}/__testserver/kill", + data=data, + headers={"Content-Type": "application/json", "Authorization": f"Bearer {token}"}, + method="POST", +) +urllib.request.urlopen(req) diff --git a/acceptance/bundle/deploy/wal/chain-3-jobs/databricks.yml b/acceptance/bundle/deploy/wal/chain-3-jobs/databricks.yml new file mode 100644 index 00000000000..342a4516235 --- /dev/null +++ b/acceptance/bundle/deploy/wal/chain-3-jobs/databricks.yml @@ -0,0 +1,37 @@ +bundle: + name: wal-chain-test + +resources: + jobs: + # Linear chain: job_01 -> job_02 -> job_03 + # Execution order: job_01 first, job_03 last + job_01: + name: "job-01" + description: "first in chain" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + job_02: + name: "job-02" + description: "depends on ${resources.jobs.job_01.id}" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + job_03: + name: "job-03" + description: "depends on ${resources.jobs.job_02.id}" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge diff --git a/acceptance/bundle/deploy/wal/chain-3-jobs/out.test.toml b/acceptance/bundle/deploy/wal/chain-3-jobs/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/deploy/wal/chain-3-jobs/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/chain-3-jobs/output.txt b/acceptance/bundle/deploy/wal/chain-3-jobs/output.txt new file mode 100644 index 00000000000..7e04ba4dae3 --- /dev/null +++ b/acceptance/bundle/deploy/wal/chain-3-jobs/output.txt @@ -0,0 +1,110 @@ +=== First deploy (crashes on job_03) === + +>>> errcode [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files... +Deploying resources... +[PROCESS_KILLED] + +Exit code: [KILLED] + +=== WAL content after crash === +{ + "cli_version": "[DEV_VERSION]", + "lineage": "[UUID]", + "serial": 1, + "state_version": 2 +} +{ + "k": "resources.jobs.job_01", + "v": { + "__id__": "[JOB_01_ID]", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/state/metadata.json" + }, + "description": "first in chain", + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "job-01", + "queue": { + "enabled": true + }, + "tasks": [ + { + "new_cluster": { + "node_type_id": "[NODE_TYPE_ID]", + "spark_version": "15.4.x-scala2.12" + }, + "spark_python_task": { + "python_file": "/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files/test.py" + }, + "task_key": "task" + } + ] + } + } +} +{ + "k": "resources.jobs.job_02", + "v": { + "__id__": "[JOB_02_ID]", + "depends_on": [ + { + "label": "${resources.jobs.job_01.id}", + "node": "resources.jobs.job_01" + } + ], + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/state/metadata.json" + }, + "description": "depends on [JOB_01_ID]", + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "job-02", + "queue": { + "enabled": true + }, + "tasks": [ + { + "new_cluster": { + "node_type_id": "[NODE_TYPE_ID]", + "spark_version": "15.4.x-scala2.12" + }, + "spark_python_task": { + "python_file": "/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files/test.py" + }, + "task_key": "task" + } + ] + } + } +} + +=== Number of jobs saved in WAL === +2 + +=== Bundle summary (reads from WAL) === +Name: wal-chain-test +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default +Resources: + Jobs: + job_01: + Name: job-01 + URL: [DATABRICKS_URL]/jobs/[JOB_01_ID]?o=[NUMID] + job_02: + Name: job-02 + URL: [DATABRICKS_URL]/jobs/[JOB_02_ID]?o=[NUMID] + job_03: + Name: job-03 + URL: (not deployed) + +=== WAL after successful deploy === +WAL deleted (expected) diff --git a/acceptance/bundle/deploy/wal/chain-3-jobs/script b/acceptance/bundle/deploy/wal/chain-3-jobs/script new file mode 100644 index 00000000000..a5afc6f51d5 --- /dev/null +++ b/acceptance/bundle/deploy/wal/chain-3-jobs/script @@ -0,0 +1,24 @@ +# Linear chain: job_01 -> job_02 -> job_03 +# Let first 2 jobs/create succeed, then kill on the 3rd +kill_after.py "POST /api/2.2/jobs/create" 2 1 + +echo "=== First deploy (crashes on job_03) ===" +trace errcode $CLI bundle deploy + +echo "" +echo "=== WAL content after crash ===" +jq -S . .databricks/bundle/default/resources.json.wal 2>/dev/null || echo "No WAL file" + +echo "" +echo "=== Number of jobs saved in WAL ===" +grep -c '"k":"resources.jobs' .databricks/bundle/default/resources.json.wal 2>/dev/null || echo "0" + +echo "" +echo "=== Bundle summary (reads from WAL) ===" +$CLI bundle summary + +echo "" +echo "=== WAL after successful deploy ===" +cat .databricks/bundle/default/resources.json.wal 2>/dev/null || echo "WAL deleted (expected)" + +replace_ids.py diff --git a/acceptance/bundle/deploy/wal/chain-3-jobs/test.py b/acceptance/bundle/deploy/wal/chain-3-jobs/test.py new file mode 100644 index 00000000000..1ff8e07c707 --- /dev/null +++ b/acceptance/bundle/deploy/wal/chain-3-jobs/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/databricks.yml b/acceptance/bundle/deploy/wal/corrupted-wal-entry/databricks.yml new file mode 100644 index 00000000000..a7a5cc2dfe0 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/databricks.yml @@ -0,0 +1,23 @@ +bundle: + name: wal-corrupted-test + +resources: + jobs: + valid_job: + name: "valid-job" + tasks: + - task_key: "task-a" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + another_valid: + name: "another-valid" + tasks: + - task_key: "task-b" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/out.test.toml b/acceptance/bundle/deploy/wal/corrupted-wal-entry/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/output.txt b/acceptance/bundle/deploy/wal/corrupted-wal-entry/output.txt new file mode 100644 index 00000000000..1aee4fe481d --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/output.txt @@ -0,0 +1,34 @@ + +>>> cat .databricks/bundle/default/resources.json.wal +{"lineage":"test-lineage-123","serial":6} +{"k":"resources.jobs.valid_job","v":{"__id__":"","state":{"name":"valid-job"}}} +{"k":"resources.jobs.another_valid","v":{"__id__":"","state":{"name":"another-valid"}}} +{"k":"resources.jobs.partial_write","v":{"__id__":"33","state":{"name":"partial- + +>>> [CLI] bundle deploy +Warn: Skipping corrupted WAL entry at [TEST_TMP_DIR]/.databricks/bundle/default/resources.json.wal:4: unexpected end of JSON input +Warn: Saved 1 corrupted WAL entries to [TEST_TMP_DIR]/.databricks/bundle/default/resources.json.wal.corrupted +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-corrupted-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> [CLI] bundle summary +Name: wal-corrupted-test +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/wal-corrupted-test/default +Resources: + Jobs: + another_valid: + Name: another-valid + URL: [DATABRICKS_URL]/jobs/[NUMID]?o=[NUMID] + valid_job: + Name: valid-job + URL: [DATABRICKS_URL]/jobs/[NUMID]?o=[NUMID] + +>>> cat .databricks/bundle/default/resources.json.wal.corrupted +{"k":"resources.jobs.partial_write","v":{"__id__":"33","state":{"name":"partial- +=== WAL after successful deploy === +WAL deleted (expected) diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/resources.json b/acceptance/bundle/deploy/wal/corrupted-wal-entry/resources.json new file mode 100644 index 00000000000..f9f4e54d1ed --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/resources.json @@ -0,0 +1,7 @@ +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "test-lineage-123", + "serial": 5, + "state": {} +} diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/resources.json.wal.tmpl b/acceptance/bundle/deploy/wal/corrupted-wal-entry/resources.json.wal.tmpl new file mode 100644 index 00000000000..7ef5773a4ea --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/resources.json.wal.tmpl @@ -0,0 +1,4 @@ +{"lineage":"test-lineage-123","serial":6} +{"k":"resources.jobs.valid_job","v":{"__id__":"$JOB1","state":{"name":"valid-job"}}} +{"k":"resources.jobs.another_valid","v":{"__id__":"$JOB2","state":{"name":"another-valid"}}} +{"k":"resources.jobs.partial_write","v":{"__id__":"33","state":{"name":"partial- diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/script b/acceptance/bundle/deploy/wal/corrupted-wal-entry/script new file mode 100644 index 00000000000..d6f151a29c6 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/script @@ -0,0 +1,22 @@ +# Create pre-existing jobs in the testserver so WAL recovery triggers DoUpdate (reset) instead of DoCreate +JOB1=$($CLI jobs create --json '{"name":"valid-job"}' | jq -r '.job_id') +JOB2=$($CLI jobs create --json '{"name":"another-valid"}' | jq -r '.job_id') +echo "$JOB1:JOB1_ID" >> ACC_REPLS +echo "$JOB2:JOB2_ID" >> ACC_REPLS + +mkdir -p .databricks/bundle/default +cp resources.json .databricks/bundle/default/ + +envsubst < resources.json.wal.tmpl > .databricks/bundle/default/resources.json.wal + +trace cat .databricks/bundle/default/resources.json.wal +trace $CLI bundle deploy +trace $CLI bundle summary +trace cat .databricks/bundle/default/resources.json.wal.corrupted + +printf "\n=== WAL after successful deploy ===\n" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL exists (unexpected)" +else + echo "WAL deleted (expected)" +fi diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.py b/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.py new file mode 100644 index 00000000000..1ff8e07c707 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/crash-after-create/databricks.yml b/acceptance/bundle/deploy/wal/crash-after-create/databricks.yml new file mode 100644 index 00000000000..25b2efe2f8c --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/databricks.yml @@ -0,0 +1,25 @@ +bundle: + name: wal-crash-test + +resources: + jobs: + job_a: + name: "test-job-a" + description: "first job" + tasks: + - task_key: "task-a" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + job_b: + name: "test-job-b" + description: "depends on ${resources.jobs.job_a.id}" + tasks: + - task_key: "task-b" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge diff --git a/acceptance/bundle/deploy/wal/crash-after-create/out.test.toml b/acceptance/bundle/deploy/wal/crash-after-create/out.test.toml new file mode 100644 index 00000000000..1d895a16c96 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.COMMAND = ["plan", "deploy --force-lock", "summary"] +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/crash-after-create/output.txt b/acceptance/bundle/deploy/wal/crash-after-create/output.txt new file mode 100644 index 00000000000..0a50333e729 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/output.txt @@ -0,0 +1,57 @@ +=== First deploy (crashes after job_a create, before job_b) === + +>>> errcode [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-crash-test/default/files... +Deploying resources... +[PROCESS_KILLED] + +Exit code: [KILLED] + +>>> assert_exists.py .databricks/bundle/default/resources.json.wal + +>>> assert_not_exists.py .databricks/bundle/default/resources.json + +>>> cat .databricks/bundle/default/resources.json.wal +{ + "lineage": "[UUID]", + "serial": 1, + "state_version": 2, + "cli_version": "[DEV_VERSION]" +} +{ + "k": "resources.jobs.job_a", + "v": { + "__id__": "[NUMID]", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/wal-crash-test/default/state/metadata.json" + }, + "description": "first job", + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "test-job-a", + "queue": { + "enabled": true + }, + "tasks": [ + { + "new_cluster": { + "node_type_id": "[NODE_TYPE_ID]", + "spark_version": "15.4.x-scala2.12" + }, + "spark_python_task": { + "python_file": "/Workspace/Users/[USERNAME]/.bundle/wal-crash-test/default/files/test.py" + }, + "task_key": "task-a" + } + ] + } + } +} + +=== Any other command recovers state +>>> assert_exists.py .databricks/bundle/default/resources.json + +>>> assert_not_exists.py .databricks/bundle/default/resources.json.wal diff --git a/acceptance/bundle/deploy/wal/crash-after-create/script b/acceptance/bundle/deploy/wal/crash-after-create/script new file mode 100644 index 00000000000..264d84648d3 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/script @@ -0,0 +1,17 @@ +# WAL recovery after real crash. First deploy creates job_a then crashes. +# Second deploy recovers from WAL and completes successfully. +# job_b depends on job_a, so jobs/get is called after job_a's SaveState. +kill_after.py "POST /api/2.2/jobs/create" 1 1 + +echo "=== First deploy (crashes after job_a create, before job_b) ===" +trace errcode $CLI bundle deploy + +trace assert_exists.py .databricks/bundle/default/resources.json.wal +trace assert_not_exists.py .databricks/bundle/default/resources.json +trace cat .databricks/bundle/default/resources.json.wal | jq + +title "Any other command recovers state" +$CLI bundle $COMMAND &> LOG.COMMAND.txt + +trace assert_exists.py .databricks/bundle/default/resources.json +trace assert_not_exists.py .databricks/bundle/default/resources.json.wal diff --git a/acceptance/bundle/deploy/wal/crash-after-create/test.py b/acceptance/bundle/deploy/wal/crash-after-create/test.py new file mode 100644 index 00000000000..1ff8e07c707 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/crash-after-create/test.toml b/acceptance/bundle/deploy/wal/crash-after-create/test.toml new file mode 100644 index 00000000000..ecd87c31a8b --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/test.toml @@ -0,0 +1,2 @@ +EnvMatrix.COMMAND = ["plan", "deploy --force-lock", "summary"] +EnvRepl.COMMAND = false diff --git a/acceptance/bundle/deploy/wal/empty-wal/databricks.yml b/acceptance/bundle/deploy/wal/empty-wal/databricks.yml new file mode 100644 index 00000000000..8da92255ff1 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/databricks.yml @@ -0,0 +1,14 @@ +bundle: + name: wal-empty-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge diff --git a/acceptance/bundle/deploy/wal/empty-wal/out.test.toml b/acceptance/bundle/deploy/wal/empty-wal/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/empty-wal/output.txt b/acceptance/bundle/deploy/wal/empty-wal/output.txt new file mode 100644 index 00000000000..bba6d249fce --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/output.txt @@ -0,0 +1,6 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-empty-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! diff --git a/acceptance/bundle/deploy/wal/empty-wal/script b/acceptance/bundle/deploy/wal/empty-wal/script new file mode 100644 index 00000000000..ac104951c58 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/script @@ -0,0 +1,4 @@ +mkdir -p .databricks/bundle/default +touch .databricks/bundle/default/resources.json.wal +trace $CLI bundle deploy +assert_not_exists.py .databricks/bundle/default/resources.json.wal* diff --git a/acceptance/bundle/deploy/wal/empty-wal/test.py b/acceptance/bundle/deploy/wal/empty-wal/test.py new file mode 100644 index 00000000000..11b15b1a458 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/test.py @@ -0,0 +1 @@ +print("hello") diff --git a/acceptance/bundle/deploy/wal/empty-wal/test.toml b/acceptance/bundle/deploy/wal/empty-wal/test.toml new file mode 100644 index 00000000000..ad64cd6e746 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/test.toml @@ -0,0 +1,13 @@ +# Empty WAL file should be moved to .wal.corrupted and deploy should proceed normally. + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.Body = '{"job_id": 1001}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' + +[[Repls]] +Old = '-rw[^\s]+\s+\d+\s+[^\s]+\s+[^\s]+\s+\d+\s+[A-Z][a-z]+\s+\d+\s+\d+:\d+' +New = '[FILE_INFO]' diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/databricks.yml b/acceptance/bundle/deploy/wal/future-serial-wal/databricks.yml new file mode 100644 index 00000000000..56fa1313376 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/databricks.yml @@ -0,0 +1,14 @@ +bundle: + name: wal-future-serial-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/out.test.toml b/acceptance/bundle/deploy/wal/future-serial-wal/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/output.txt b/acceptance/bundle/deploy/wal/future-serial-wal/output.txt new file mode 100644 index 00000000000..48c23ddf84f --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/output.txt @@ -0,0 +1,10 @@ +=== WAL content === +{"lineage":"test-lineage-123","serial":5} +{"k":"resources.jobs.test_job","v":{"__id__":"1001","state":{"name":"test-job"}}} +=== Deploy (should fail with corruption error) === + +>>> errcode [CLI] bundle deploy +Error: reading state from [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: WAL recovery failed: WAL serial (5) is ahead of expected (3), state may be corrupted + + +Exit code: 1 diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/resources.json b/acceptance/bundle/deploy/wal/future-serial-wal/resources.json new file mode 100644 index 00000000000..f2f06b34bf4 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/resources.json @@ -0,0 +1,12 @@ +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "test-lineage-123", + "serial": 2, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/resources.json.wal b/acceptance/bundle/deploy/wal/future-serial-wal/resources.json.wal new file mode 100644 index 00000000000..98a8e48802b --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/resources.json.wal @@ -0,0 +1,2 @@ +{"lineage":"test-lineage-123","serial":5} +{"k":"resources.jobs.test_job","v":{"__id__":"1001","state":{"name":"test-job"}}} diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/script b/acceptance/bundle/deploy/wal/future-serial-wal/script new file mode 100644 index 00000000000..f7a57192255 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/script @@ -0,0 +1,9 @@ +mkdir -p .databricks/bundle/default +cp resources.json .databricks/bundle/default/ +cp resources.json.wal .databricks/bundle/default/ + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should fail with corruption error) ===" +trace errcode $CLI bundle deploy diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/test.py b/acceptance/bundle/deploy/wal/future-serial-wal/test.py new file mode 100644 index 00000000000..1ff8e07c707 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/databricks.yml b/acceptance/bundle/deploy/wal/lineage-mismatch/databricks.yml new file mode 100644 index 00000000000..32461d14676 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/databricks.yml @@ -0,0 +1,14 @@ +bundle: + name: wal-lineage-mismatch-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/out.test.toml b/acceptance/bundle/deploy/wal/lineage-mismatch/out.test.toml new file mode 100644 index 00000000000..9448f875df7 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.COMMAND = ["deploy", "plan", "summary"] +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/output.txt b/acceptance/bundle/deploy/wal/lineage-mismatch/output.txt new file mode 100644 index 00000000000..cae1ffac083 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/output.txt @@ -0,0 +1,7 @@ +Any command should fail with lineage mismatch error +Error: reading state from [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: WAL recovery failed: WAL lineage (wal-lineage-bbb) does not match state lineage (state-lineage-aaa) + + +>>> musterr [CLI] bundle destroy --auto-approve +Error: reading state from [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: WAL recovery failed: WAL lineage (wal-lineage-bbb) does not match state lineage (state-lineage-aaa) + diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/resources.json b/acceptance/bundle/deploy/wal/lineage-mismatch/resources.json new file mode 100644 index 00000000000..444a9ea888d --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/resources.json @@ -0,0 +1,12 @@ +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "state-lineage-aaa", + "serial": 1, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/resources.json.wal b/acceptance/bundle/deploy/wal/lineage-mismatch/resources.json.wal new file mode 100644 index 00000000000..d14fb4a9713 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/resources.json.wal @@ -0,0 +1,2 @@ +{"lineage":"wal-lineage-bbb","serial":2} +{"k":"resources.jobs.test_job","v":{"__id__":"1001","state":{"name":"test-job"}}} diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/script b/acceptance/bundle/deploy/wal/lineage-mismatch/script new file mode 100644 index 00000000000..0629a37c0f9 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/script @@ -0,0 +1,8 @@ +mkdir -p .databricks/bundle/default +cp resources.json .databricks/bundle/default/ +cp resources.json.wal .databricks/bundle/default/ + +echo "Any command should fail with lineage mismatch error" +musterr $CLI bundle $COMMAND + +trace musterr $CLI bundle destroy --auto-approve diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/test.py b/acceptance/bundle/deploy/wal/lineage-mismatch/test.py new file mode 100644 index 00000000000..1ff8e07c707 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/test.toml b/acceptance/bundle/deploy/wal/lineage-mismatch/test.toml new file mode 100644 index 00000000000..0b3a9e0b7cc --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/test.toml @@ -0,0 +1 @@ +EnvMatrix.COMMAND = ["deploy", "plan", "summary"] diff --git a/acceptance/bundle/deploy/wal/stale-wal/databricks.yml b/acceptance/bundle/deploy/wal/stale-wal/databricks.yml new file mode 100644 index 00000000000..443283607e6 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/databricks.yml @@ -0,0 +1,14 @@ +bundle: + name: wal-stale-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge diff --git a/acceptance/bundle/deploy/wal/stale-wal/out.test.toml b/acceptance/bundle/deploy/wal/stale-wal/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/stale-wal/output.txt b/acceptance/bundle/deploy/wal/stale-wal/output.txt new file mode 100644 index 00000000000..91a7a07643d --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/output.txt @@ -0,0 +1,19 @@ +=== WAL content before deploy === +{"lineage":"stale-test-lineage","serial":1} +{"k":"resources.jobs.stale_job","v":{"__id__":"9999","state":{"name":"stale-job"}}} +=== Deploy (should ignore stale WAL) === + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-stale-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Checking WAL file after deploy === +Stale WAL deleted (expected) +=== State file should NOT contain stale_job === +{ + "serial": 3, + "state_keys": [ + "resources.jobs.test_job" + ] +} diff --git a/acceptance/bundle/deploy/wal/stale-wal/resources.json b/acceptance/bundle/deploy/wal/stale-wal/resources.json new file mode 100644 index 00000000000..6fd38b67ae8 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/resources.json @@ -0,0 +1,12 @@ +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "stale-test-lineage", + "serial": 2, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} diff --git a/acceptance/bundle/deploy/wal/stale-wal/resources.json.wal b/acceptance/bundle/deploy/wal/stale-wal/resources.json.wal new file mode 100644 index 00000000000..ef5f380ed84 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/resources.json.wal @@ -0,0 +1,2 @@ +{"lineage":"stale-test-lineage","serial":1} +{"k":"resources.jobs.stale_job","v":{"__id__":"9999","state":{"name":"stale-job"}}} diff --git a/acceptance/bundle/deploy/wal/stale-wal/script b/acceptance/bundle/deploy/wal/stale-wal/script new file mode 100644 index 00000000000..4de1bc1e921 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/script @@ -0,0 +1,19 @@ +mkdir -p .databricks/bundle/default +cp resources.json .databricks/bundle/default/ +cp resources.json.wal .databricks/bundle/default/ + +echo "=== WAL content before deploy ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should ignore stale WAL) ===" +trace $CLI bundle deploy + +echo "=== Checking WAL file after deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL file exists (unexpected)" +else + echo "Stale WAL deleted (expected)" +fi + +echo "=== State file should NOT contain stale_job ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}' diff --git a/acceptance/bundle/deploy/wal/stale-wal/test.py b/acceptance/bundle/deploy/wal/stale-wal/test.py new file mode 100644 index 00000000000..1ff8e07c707 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/stale-wal/test.toml b/acceptance/bundle/deploy/wal/stale-wal/test.toml new file mode 100644 index 00000000000..934683ba6d8 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/test.toml @@ -0,0 +1,9 @@ +# Deploy with a stale WAL (old serial) - WAL should be deleted and ignored. + +[[Server]] +Pattern = "POST /api/2.2/jobs/reset" +Response.Body = '{}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' diff --git a/acceptance/bundle/deploy/wal/test.toml b/acceptance/bundle/deploy/wal/test.toml new file mode 100644 index 00000000000..e60e6992455 --- /dev/null +++ b/acceptance/bundle/deploy/wal/test.toml @@ -0,0 +1,34 @@ +# WAL (Write-Ahead Log) tests verify crash recovery during bundle deployment. +# These tests simulate process crashes using KillCaller and verify state recovery. +# Only runs with direct engine since WAL is a direct-engine feature. + +Local = true +Env.DATABRICKS_CLI_TEST_PID = "1" + +[EnvMatrix] +DATABRICKS_BUNDLE_ENGINE = ["direct"] + +[[Repls]] +Old = 'script: line \d+:\s+\d+ Killed(: 9)?\s+"\$@"' +New = '[PROCESS_KILLED]' + +[[Repls]] +Old = '(\n>>> errcode [^\n]+\n)\nExit code:' +New = "${1}[PROCESS_KILLED]\n\nExit code:" + +[[Repls]] +Old = 'Exit code: 137' +New = 'Exit code: [KILLED]' + +# On Linux, a KillCaller kill may surface as exit code 1 rather than 137. +# Only normalise exit code 1 when it directly follows [PROCESS_KILLED] to +# avoid masking genuine error exits (lineage-mismatch, future-serial-wal). +[[Repls]] +Old = '(\[PROCESS_KILLED\]\n\nExit code: )1' +New = '${1}[KILLED]' + +# On Windows, no bash "Killed" message appears when CLI has produced output before termination. +# Match the raw exit code 1 (Windows never gets 137 or [PROCESS_KILLED] marker first). +[[Repls]] +Old = '(Deploying resources\.\.\.)\n\nExit code: 1' +New = "${1}\n[PROCESS_KILLED]\n\nExit code: [KILLED]" diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/databricks.yml b/acceptance/bundle/deploy/wal/wal-with-delete/databricks.yml new file mode 100644 index 00000000000..128bbe37f56 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/databricks.yml @@ -0,0 +1,4 @@ +bundle: + name: wal-delete-test + +resources: {} diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/out.test.toml b/acceptance/bundle/deploy/wal/wal-with-delete/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/output.txt b/acceptance/bundle/deploy/wal/wal-with-delete/output.txt new file mode 100644 index 00000000000..4eb0fb5724a --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/output.txt @@ -0,0 +1,19 @@ +=== WAL content === +{"lineage":"delete-test-lineage","serial":2} +{"k":"resources.jobs.test_job","v":null} +=== Deploy (should recover delete from WAL) === + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-delete-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Final state (should have no jobs) === + +>>> [CLI] bundle summary +Name: wal-delete-test +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/wal-delete-test/default +Resources: diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/resources.json b/acceptance/bundle/deploy/wal/wal-with-delete/resources.json new file mode 100644 index 00000000000..04263ec36f9 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/resources.json @@ -0,0 +1,12 @@ +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "delete-test-lineage", + "serial": 1, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/resources.json.wal b/acceptance/bundle/deploy/wal/wal-with-delete/resources.json.wal new file mode 100644 index 00000000000..9b5c6169e3f --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/resources.json.wal @@ -0,0 +1,2 @@ +{"lineage":"delete-test-lineage","serial":2} +{"k":"resources.jobs.test_job","v":null} diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/script b/acceptance/bundle/deploy/wal/wal-with-delete/script new file mode 100644 index 00000000000..1b6708bc0ff --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/script @@ -0,0 +1,12 @@ +mkdir -p .databricks/bundle/default +cp resources.json .databricks/bundle/default/ +cp resources.json.wal .databricks/bundle/default/ + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should recover delete from WAL) ===" +trace $CLI bundle deploy + +echo "=== Final state (should have no jobs) ===" +trace $CLI bundle summary diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/test.py b/acceptance/bundle/deploy/wal/wal-with-delete/test.py new file mode 100644 index 00000000000..1ff8e07c707 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/migrate/basic/out.plan_update.json b/acceptance/bundle/migrate/basic/out.plan_update.json index 44ba986a2f6..99e22ec08b7 100644 --- a/acceptance/bundle/migrate/basic/out.plan_update.json +++ b/acceptance/bundle/migrate/basic/out.plan_update.json @@ -2,7 +2,7 @@ "plan_version": 2, "cli_version": "[DEV_VERSION]", "lineage": "[UUID]", - "serial": 8, + "serial": 6, "plan": { "resources.jobs.test_job": { "action": "update", diff --git a/acceptance/bundle/migrate/basic/output.txt b/acceptance/bundle/migrate/basic/output.txt index 0d31bbd682f..dafa3a4086e 100644 --- a/acceptance/bundle/migrate/basic/output.txt +++ b/acceptance/bundle/migrate/basic/output.txt @@ -39,7 +39,7 @@ Deployment complete! === Should show that it's already migrated >>> musterr [CLI] bundle deployment migrate Error: already using direct engine -Details: [TEST_TMP_DIR]/.databricks/bundle/dev/resources.json: local direct state serial=7 lineage="[UUID]" +Details: [TEST_TMP_DIR]/.databricks/bundle/dev/resources.json: local direct state serial=6 lineage="[UUID]" >>> DATABRICKS_BUNDLE_ENGINE=direct [CLI] bundle plan Plan: 0 to add, 0 to change, 0 to delete, 3 unchanged @@ -86,14 +86,14 @@ Deployment complete! === Should show that it's already migrated >>> musterr [CLI] bundle deployment migrate Error: already using direct engine -Details: [TEST_TMP_DIR]/.databricks/bundle/dev/resources.json: local direct state serial=8 lineage="[UUID]" +Details: [TEST_TMP_DIR]/.databricks/bundle/dev/resources.json: local direct state serial=6 lineage="[UUID]" >>> DATABRICKS_BUNDLE_ENGINE= [CLI] bundle debug states -[TEST_TMP_DIR]/.databricks/bundle/dev/resources.json: local direct state serial=8 lineage="[UUID]" +[TEST_TMP_DIR]/.databricks/bundle/dev/resources.json: local direct state serial=6 lineage="[UUID]" >>> DATABRICKS_BUNDLE_ENGINE= [CLI] bundle debug states --force-pull -resources.json: remote direct state serial=8 lineage="[UUID]" -[TEST_TMP_DIR]/.databricks/bundle/dev/resources.json: local direct state serial=8 lineage="[UUID]" +resources.json: remote direct state serial=6 lineage="[UUID]" +[TEST_TMP_DIR]/.databricks/bundle/dev/resources.json: local direct state serial=6 lineage="[UUID]" === Extra plan: should have no drift >>> DATABRICKS_BUNDLE_ENGINE= [CLI] bundle plan diff --git a/acceptance/bundle/migrate/dashboards/out.plan_after_migrate.json b/acceptance/bundle/migrate/dashboards/out.plan_after_migrate.json index 0f73ce72be1..6b55f64bd8d 100644 --- a/acceptance/bundle/migrate/dashboards/out.plan_after_migrate.json +++ b/acceptance/bundle/migrate/dashboards/out.plan_after_migrate.json @@ -2,7 +2,7 @@ "plan_version": 2, "cli_version": "[DEV_VERSION]", "lineage": "[UUID]", - "serial": 4, + "serial": 3, "plan": { "resources.dashboards.dashboard1": { "action": "skip", diff --git a/acceptance/bundle/migrate/dashboards/output.txt b/acceptance/bundle/migrate/dashboards/output.txt index 7cbd91a2f68..19a4f1c7bb5 100644 --- a/acceptance/bundle/migrate/dashboards/output.txt +++ b/acceptance/bundle/migrate/dashboards/output.txt @@ -47,11 +47,11 @@ Deployment complete! === Should show that it's already migrated >>> musterr [CLI] bundle deployment migrate Error: already using direct engine -Details: [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=5 lineage="[UUID]" +Details: [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=3 lineage="[UUID]" >>> DATABRICKS_BUNDLE_ENGINE= [CLI] bundle debug states -[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=5 lineage="[UUID]" +[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=3 lineage="[UUID]" >>> DATABRICKS_BUNDLE_ENGINE= [CLI] bundle debug states --force-pull -resources.json: remote direct state serial=5 lineage="[UUID]" -[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=5 lineage="[UUID]" +resources.json: remote direct state serial=3 lineage="[UUID]" +[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=3 lineage="[UUID]" diff --git a/acceptance/bundle/migrate/grants/output.txt b/acceptance/bundle/migrate/grants/output.txt index 44ec67fb48a..146787d549a 100644 --- a/acceptance/bundle/migrate/grants/output.txt +++ b/acceptance/bundle/migrate/grants/output.txt @@ -45,11 +45,11 @@ Deployment complete! === Should show that it's already migrated >>> musterr [CLI] bundle deployment migrate Error: already using direct engine -Details: [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=11 lineage="[UUID]" +Details: [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=9 lineage="[UUID]" >>> DATABRICKS_BUNDLE_ENGINE= [CLI] bundle debug states -[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=11 lineage="[UUID]" +[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=9 lineage="[UUID]" >>> DATABRICKS_BUNDLE_ENGINE= [CLI] bundle debug states --force-pull -resources.json: remote direct state serial=11 lineage="[UUID]" -[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=11 lineage="[UUID]" +resources.json: remote direct state serial=9 lineage="[UUID]" +[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=9 lineage="[UUID]" diff --git a/acceptance/bundle/migrate/permissions/output.txt b/acceptance/bundle/migrate/permissions/output.txt index 953a4bae979..f85c8d7bdbf 100644 --- a/acceptance/bundle/migrate/permissions/output.txt +++ b/acceptance/bundle/migrate/permissions/output.txt @@ -62,11 +62,11 @@ Deployment complete! === Should show that it's already migrated >>> musterr [CLI] bundle deployment migrate Error: already using direct engine -Details: [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=8 lineage="[UUID]" +Details: [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=7 lineage="[UUID]" >>> [CLI] bundle debug states -[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=8 lineage="[UUID]" +[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=7 lineage="[UUID]" >>> [CLI] bundle debug states --force-pull -resources.json: remote direct state serial=8 lineage="[UUID]" -[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=8 lineage="[UUID]" +resources.json: remote direct state serial=7 lineage="[UUID]" +[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=7 lineage="[UUID]" diff --git a/acceptance/bundle/resources/apps/create_already_exists/output.txt b/acceptance/bundle/resources/apps/create_already_exists/output.txt index e4438d47b04..82deb4ab43a 100644 --- a/acceptance/bundle/resources/apps/create_already_exists/output.txt +++ b/acceptance/bundle/resources/apps/create_already_exists/output.txt @@ -37,7 +37,6 @@ HTTP Status: 409 Conflict API error_code: RESOURCE_ALREADY_EXISTS API message: An app with the same name already exists: test-app-already-exists -Updating deployment state... >>> [CLI] apps delete test-app-already-exists { diff --git a/acceptance/bundle/resources/dashboards/publish-failure-cleans-up-dashboard/out.deploy.direct.txt b/acceptance/bundle/resources/dashboards/publish-failure-cleans-up-dashboard/out.deploy.direct.txt index 84918b848bf..705bd09cb32 100644 --- a/acceptance/bundle/resources/dashboards/publish-failure-cleans-up-dashboard/out.deploy.direct.txt +++ b/acceptance/bundle/resources/dashboards/publish-failure-cleans-up-dashboard/out.deploy.direct.txt @@ -9,6 +9,5 @@ HTTP Status: 400 Bad Request API error_code: RESOURCE_DOES_NOT_EXIST API message: Warehouse doesnotexist does not exist -Updating deployment state... Exit code: 1 diff --git a/acceptance/bundle/resources/jobs/create-error/output.txt b/acceptance/bundle/resources/jobs/create-error/output.txt index 0fcd944efd2..4211f239d91 100644 --- a/acceptance/bundle/resources/jobs/create-error/output.txt +++ b/acceptance/bundle/resources/jobs/create-error/output.txt @@ -9,4 +9,3 @@ HTTP Status: 400 Bad Request API error_code: INVALID_PARAMETER_VALUE API message: Shared job cluster feature is only supported in multi-task jobs. -Updating deployment state... diff --git a/acceptance/bundle/resources/jobs/update/out.plan_update.direct.json b/acceptance/bundle/resources/jobs/update/out.plan_update.direct.json index bdb8e9f5e99..7bf628435bb 100644 --- a/acceptance/bundle/resources/jobs/update/out.plan_update.direct.json +++ b/acceptance/bundle/resources/jobs/update/out.plan_update.direct.json @@ -2,7 +2,7 @@ "plan_version": 2, "cli_version": "[DEV_VERSION]", "lineage": "[UUID]", - "serial": 2, + "serial": 1, "plan": { "resources.jobs.foo": { "action": "update", diff --git a/acceptance/bundle/resources/postgres_projects/without_project_id/out.deploy.direct.txt b/acceptance/bundle/resources/postgres_projects/without_project_id/out.deploy.direct.txt index 79d1f7200e1..8103b944c46 100644 --- a/acceptance/bundle/resources/postgres_projects/without_project_id/out.deploy.direct.txt +++ b/acceptance/bundle/resources/postgres_projects/without_project_id/out.deploy.direct.txt @@ -11,4 +11,3 @@ HTTP Status: 400 Bad Request API error_code: INVALID_PARAMETER_VALUE API message: Field 'project_id' is required, expected non-default value (not "")! -Updating deployment state... diff --git a/acceptance/bundle/state/state_present/output.txt b/acceptance/bundle/state/state_present/output.txt index 706b54a67a0..cccf089828c 100644 --- a/acceptance/bundle/state/state_present/output.txt +++ b/acceptance/bundle/state/state_present/output.txt @@ -91,14 +91,14 @@ Deployment complete! >>> print_state.py 3 -15 +13 contains error: '12' not found in the output. >>> DATABRICKS_BUNDLE_ENGINE= [CLI] bundle debug states [TEST_TMP_DIR]/.databricks/bundle/default/terraform/terraform.tfstate: local terraform state serial=3 lineage="test-lineage" -[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=15 lineage="test-lineage" +[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=13 lineage="test-lineage" >>> DATABRICKS_BUNDLE_ENGINE= [CLI] bundle debug states --force-pull [TEST_TMP_DIR]/.databricks/bundle/default/terraform/terraform.tfstate: local terraform state serial=3 lineage="test-lineage" -resources.json: remote direct state serial=15 lineage="test-lineage" -[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=15 lineage="test-lineage" +resources.json: remote direct state serial=13 lineage="test-lineage" +[TEST_TMP_DIR]/.databricks/bundle/default/resources.json: local direct state serial=13 lineage="test-lineage" diff --git a/acceptance/internal/config.go b/acceptance/internal/config.go index 06ac61c39b8..559e11d0ca6 100644 --- a/acceptance/internal/config.go +++ b/acceptance/internal/config.go @@ -153,12 +153,6 @@ type ServerStub struct { // Configure as "1ms", "2s", "3m", etc. // See [time.ParseDuration] for details. Delay time.Duration - - // Number of times to kill the caller process before returning normal responses. - // 0 = never kill (default), 1 = kill once then allow, 2 = kill twice then allow, etc. - // Useful for testing crash recovery scenarios where first deploy crashes but retry succeeds. - // Requires DATABRICKS_CLI_TEST_PID=1 to be set in the test environment. - KillCaller int } // FindConfigs finds all the config relevant for this test, diff --git a/acceptance/internal/prepare_server.go b/acceptance/internal/prepare_server.go index 8f18d1c61bc..299d48f03ee 100644 --- a/acceptance/internal/prepare_server.go +++ b/acceptance/internal/prepare_server.go @@ -183,10 +183,6 @@ func startLocalServer(t *testing.T, s.ResponseCallback = logResponseCallback(t) } - // Track remaining kill counts per pattern (for KillCaller > 0) - killCounters := make(map[string]int) - killCountersMu := &sync.Mutex{} - for ind := range stubs { // Later stubs take precedence over earlier ones (leaf configs override parent configs). // The first handler registered for a given pattern wins, so we reverse the order. @@ -195,11 +191,6 @@ func startLocalServer(t *testing.T, items := strings.Split(stub.Pattern, " ") require.Len(t, items, 2) - // Initialize kill counter for this pattern - if stub.KillCaller > 0 { - killCounters[stub.Pattern] = stub.KillCaller - } - s.Handle(items[0], items[1], func(req testserver.Request) any { if stub.Delay > 0 { ctx := req.Context @@ -218,10 +209,6 @@ func startLocalServer(t *testing.T, } } - if shouldKillCaller(stub, killCounters, killCountersMu) { - killCaller(t, stub.Pattern, req.Headers) - } - return stub.Response }) } @@ -232,45 +219,6 @@ func startLocalServer(t *testing.T, return s.URL } -func shouldKillCaller(stub ServerStub, killCounters map[string]int, mu *sync.Mutex) bool { - if stub.KillCaller <= 0 { - return false - } - mu.Lock() - defer mu.Unlock() - if killCounters[stub.Pattern] <= 0 { - return false - } - killCounters[stub.Pattern]-- - return true -} - -func killCaller(t *testing.T, pattern string, headers http.Header) { - pid := testserver.ExtractPidFromHeaders(headers) - if pid == 0 { - t.Errorf("KillCaller configured but test-pid not found in User-Agent") - return - } - - process, err := os.FindProcess(pid) - if err != nil { - t.Errorf("Failed to find process %d: %s", pid, err) - return - } - - // Use process.Kill() for cross-platform compatibility. - // On Unix, this sends SIGKILL. On Windows, this calls TerminateProcess. - if err := process.Kill(); err != nil { - t.Errorf("Failed to kill process %d: %s", pid, err) - return - } - - if !waitForProcessExit(pid, 2*time.Second) { - t.Logf("KillCaller: timed out waiting for PID %d to exit (pattern: %s)", pid, pattern) - } - t.Logf("KillCaller: killed PID %d (pattern: %s)", pid, pattern) -} - func startProxyServer(t *testing.T, recordRequests bool, logRequests bool, diff --git a/acceptance/selftest/kill_caller/currentuser/script b/acceptance/selftest/kill_caller/currentuser/script index 821c42d8cf7..dbd96b12a94 100644 --- a/acceptance/selftest/kill_caller/currentuser/script +++ b/acceptance/selftest/kill_caller/currentuser/script @@ -1,2 +1,4 @@ +# Kill the CLI when it calls /Me endpoint (once, then allow) +kill_after.py "GET /api/2.0/preview/scim/v2/Me" 0 1 trace errcode $CLI current-user me echo "Script continued after kill" diff --git a/acceptance/selftest/kill_caller/currentuser/test.toml b/acceptance/selftest/kill_caller/currentuser/test.toml deleted file mode 100644 index b76fe401fcb..00000000000 --- a/acceptance/selftest/kill_caller/currentuser/test.toml +++ /dev/null @@ -1,4 +0,0 @@ -# Kill the CLI when it calls /Me endpoint (once, then allow) -[[Server]] -Pattern = "GET /api/2.0/preview/scim/v2/Me" -KillCaller = 1 diff --git a/acceptance/selftest/kill_caller/multi_pattern/output.txt b/acceptance/selftest/kill_caller/multi_pattern/output.txt index 9b41f23ec4d..b3528428352 100644 --- a/acceptance/selftest/kill_caller/multi_pattern/output.txt +++ b/acceptance/selftest/kill_caller/multi_pattern/output.txt @@ -13,8 +13,8 @@ Me attempt 2 done >>> [CLI] current-user me { - "id": "123", - "userName": "test@example.com" + "id": "[USERID]", + "userName": "[USERNAME]" } Me attempt 3 done - success! diff --git a/acceptance/selftest/kill_caller/multi_pattern/script b/acceptance/selftest/kill_caller/multi_pattern/script index ba9447a29a7..e0b5523c45c 100644 --- a/acceptance/selftest/kill_caller/multi_pattern/script +++ b/acceptance/selftest/kill_caller/multi_pattern/script @@ -1,3 +1,6 @@ +kill_after.py "GET /api/2.0/preview/scim/v2/Me" 0 2 +kill_after.py "GET /api/2.0/workspace/list" 0 1 + # Test pattern 1: /Me endpoint (kills first 2, then allows) trace errcode $CLI current-user me echo "Me attempt 1 done" diff --git a/acceptance/selftest/kill_caller/multi_pattern/test.toml b/acceptance/selftest/kill_caller/multi_pattern/test.toml index 08bdc17085d..4565475423d 100644 --- a/acceptance/selftest/kill_caller/multi_pattern/test.toml +++ b/acceptance/selftest/kill_caller/multi_pattern/test.toml @@ -1,17 +1,5 @@ -# Test that multiple patterns can have independent KillCaller counts -# Pattern 1: Kill first 2 requests to /Me endpoint -# Pattern 2: Kill first 1 request to /workspace/list endpoint - -[[Server]] -Pattern = "GET /api/2.0/preview/scim/v2/Me" -KillCaller = 2 -Response.Body = ''' -{ - "id": "123", - "userName": "test@example.com" -} -''' +# Test that multiple patterns can have independent kill counts [[Server]] Pattern = "GET /api/2.0/workspace/list" -KillCaller = 1 +Response.Body = '{"objects": []}' diff --git a/acceptance/selftest/kill_caller/multiple/output.txt b/acceptance/selftest/kill_caller/multiple/output.txt index 27b034cfcb1..3b6aea849fd 100644 --- a/acceptance/selftest/kill_caller/multiple/output.txt +++ b/acceptance/selftest/kill_caller/multiple/output.txt @@ -19,7 +19,7 @@ Attempt 3 done >>> [CLI] current-user me { - "id": "123", - "userName": "test@example.com" + "id": "[USERID]", + "userName": "[USERNAME]" } Attempt 4 done - success! diff --git a/acceptance/selftest/kill_caller/multiple/script b/acceptance/selftest/kill_caller/multiple/script index 03628e203ed..1e089f3cc0f 100644 --- a/acceptance/selftest/kill_caller/multiple/script +++ b/acceptance/selftest/kill_caller/multiple/script @@ -1,3 +1,6 @@ +# Kill the CLI 3 times, then allow the 4th request to succeed +kill_after.py "GET /api/2.0/preview/scim/v2/Me" 0 3 + # First 3 attempts should be killed trace errcode $CLI current-user me echo "Attempt 1 done" diff --git a/acceptance/selftest/kill_caller/multiple/test.toml b/acceptance/selftest/kill_caller/multiple/test.toml deleted file mode 100644 index 5485fc6a6bb..00000000000 --- a/acceptance/selftest/kill_caller/multiple/test.toml +++ /dev/null @@ -1,10 +0,0 @@ -# Kill the CLI 3 times, then allow the 4th request to succeed -[[Server]] -Pattern = "GET /api/2.0/preview/scim/v2/Me" -KillCaller = 3 -Response.Body = ''' -{ - "id": "123", - "userName": "test@example.com" -} -''' diff --git a/acceptance/selftest/kill_caller/offset/out.test.toml b/acceptance/selftest/kill_caller/offset/out.test.toml new file mode 100644 index 00000000000..f784a183258 --- /dev/null +++ b/acceptance/selftest/kill_caller/offset/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/selftest/kill_caller/offset/output.txt b/acceptance/selftest/kill_caller/offset/output.txt new file mode 100644 index 00000000000..b6959aec5e2 --- /dev/null +++ b/acceptance/selftest/kill_caller/offset/output.txt @@ -0,0 +1,33 @@ + +>>> [CLI] current-user me +{ + "id": "[USERID]", + "userName": "[USERNAME]" +} +Attempt 1 done - success (offset) + +>>> [CLI] current-user me +{ + "id": "[USERID]", + "userName": "[USERNAME]" +} +Attempt 2 done - success (offset) + +>>> errcode [CLI] current-user me +[PROCESS_KILLED] + +Exit code: [KILLED] +Attempt 3 done - killed + +>>> errcode [CLI] current-user me +[PROCESS_KILLED] + +Exit code: [KILLED] +Attempt 4 done - killed + +>>> [CLI] current-user me +{ + "id": "[USERID]", + "userName": "[USERNAME]" +} +Attempt 5 done - success (past kill window) diff --git a/acceptance/selftest/kill_caller/offset/script b/acceptance/selftest/kill_caller/offset/script new file mode 100644 index 00000000000..6abee0dcac7 --- /dev/null +++ b/acceptance/selftest/kill_caller/offset/script @@ -0,0 +1,20 @@ +# Let first 2 requests pass, kill next 2, then allow rest +kill_after.py "GET /api/2.0/preview/scim/v2/Me" 2 2 + +# First 2 attempts should succeed (offset period) +trace $CLI current-user me +echo "Attempt 1 done - success (offset)" + +trace $CLI current-user me +echo "Attempt 2 done - success (offset)" + +# Attempts 3-4 should be killed +trace errcode $CLI current-user me +echo "Attempt 3 done - killed" + +trace errcode $CLI current-user me +echo "Attempt 4 done - killed" + +# Attempt 5 should succeed again +trace $CLI current-user me +echo "Attempt 5 done - success (past kill window)" diff --git a/acceptance/selftest/kill_caller/workspace/script b/acceptance/selftest/kill_caller/workspace/script index 076972136c9..5a21881ab3f 100644 --- a/acceptance/selftest/kill_caller/workspace/script +++ b/acceptance/selftest/kill_caller/workspace/script @@ -1,2 +1,4 @@ +# Kill the CLI when it calls workspace list endpoint (once, then allow) +kill_after.py "GET /api/2.0/workspace/list" 0 1 trace errcode $CLI workspace list / echo "Script continued after kill" diff --git a/acceptance/selftest/kill_caller/workspace/test.toml b/acceptance/selftest/kill_caller/workspace/test.toml deleted file mode 100644 index eac10a6329b..00000000000 --- a/acceptance/selftest/kill_caller/workspace/test.toml +++ /dev/null @@ -1,4 +0,0 @@ -# Kill the CLI when it calls workspace list endpoint (once, then allow) -[[Server]] -Pattern = "GET /api/2.0/workspace/list" -KillCaller = 1 diff --git a/bundle/configsync/diff.go b/bundle/configsync/diff.go index dee7fa48116..b02cd345e1f 100644 --- a/bundle/configsync/diff.go +++ b/bundle/configsync/diff.go @@ -14,6 +14,7 @@ import ( "github.com/databricks/cli/bundle/deploy" "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/bundle/direct/dstate" "github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn/convert" "github.com/databricks/cli/libs/log" @@ -134,7 +135,7 @@ func DetectChanges(ctx context.Context, b *bundle.Bundle, engine engine.EngineTy } else { deployBundle = &direct.DeploymentBundle{} _, statePath := b.StateFilenameConfigSnapshot(ctx) - if err := deployBundle.StateDB.Open(statePath); err != nil { + if err := deployBundle.StateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { return nil, fmt.Errorf("failed to open state: %w", err) } } diff --git a/bundle/configsync/variables.go b/bundle/configsync/variables.go index e7bdff3696d..0745bfba43b 100644 --- a/bundle/configsync/variables.go +++ b/bundle/configsync/variables.go @@ -144,7 +144,7 @@ func resourceIDLookup(ctx context.Context, b *bundle.Bundle) func(string) string } _, statePath := b.StateFilenameConfigSnapshot(ctx) db := &dstate.DeploymentState{} - if err := db.Open(statePath); err != nil { + if err := db.Open(ctx, statePath, dstate.WithRecovery(false), dstate.WithWrite(false)); err != nil { log.Debugf(ctx, "variable restoration: failed to open state DB at %s: %v", statePath, err) return nil } diff --git a/bundle/direct/bind.go b/bundle/direct/bind.go index ed5cbbc07bc..c16f763afcc 100644 --- a/bundle/direct/bind.go +++ b/bundle/direct/bind.go @@ -62,8 +62,12 @@ type BindResult struct { func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.WorkspaceClient, configRoot *config.Root, statePath, resourceKey, resourceID string) (*BindResult, error) { // Check if the resource is already managed (bound to a different ID) var checkStateDB dstate.DeploymentState - if err := checkStateDB.Open(statePath); err == nil { - if existingID := checkStateDB.GetResourceID(resourceKey); existingID != "" { + if err := checkStateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(false)); err == nil { + existingID := checkStateDB.GetResourceID(resourceKey) + if _, err := checkStateDB.Finalize(ctx); err != nil { + log.Warnf(ctx, "failed to finalize state: %v", err) + } + if existingID != "" { return nil, ErrResourceAlreadyBound{ ResourceKey: resourceKey, ExistingID: existingID, @@ -82,7 +86,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac } // Open temp state - err := b.StateDB.Open(tmpStatePath) + err := b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(true)) if err != nil { os.Remove(tmpStatePath) return nil, err @@ -96,7 +100,7 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac } // Finalize to persist temp state to disk - err = b.StateDB.Finalize() + _, err = b.StateDB.Finalize(ctx) if err != nil { os.Remove(tmpStatePath) return nil, err @@ -105,11 +109,19 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac log.Infof(ctx, "Bound %s to id=%s (in temp state)", resourceKey, resourceID) // First plan + update: populate state with resolved config + err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(false)) + if err != nil { + os.Remove(tmpStatePath) + return nil, err + } plan, err := b.CalculatePlan(ctx, client, configRoot) if err != nil { os.Remove(tmpStatePath) return nil, err } + if _, err := b.StateDB.Finalize(ctx); err != nil { + log.Warnf(ctx, "failed to finalize state: %v", err) + } // Populate the state with the resolved config entry := plan.Plan[resourceKey] @@ -132,13 +144,19 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac } } + err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(true)) + if err != nil { + os.Remove(tmpStatePath) + return nil, err + } + err = b.StateDB.SaveState(resourceKey, resourceID, sv.Value, dependsOn) if err != nil { os.Remove(tmpStatePath) return nil, err } - err = b.StateDB.Finalize() + _, err = b.StateDB.Finalize(ctx) if err != nil { os.Remove(tmpStatePath) return nil, err @@ -146,7 +164,15 @@ func (b *DeploymentBundle) Bind(ctx context.Context, client *databricks.Workspac } // Second plan: this is the plan to present to the user (change between remote resource and config) + err = b.StateDB.Open(ctx, tmpStatePath, dstate.WithRecovery(true), dstate.WithWrite(false)) + if err != nil { + os.Remove(tmpStatePath) + return nil, err + } plan, err = b.CalculatePlan(ctx, client, configRoot) + if _, ferr := b.StateDB.Finalize(ctx); ferr != nil { + log.Warnf(ctx, "failed to finalize state: %v", ferr) + } if err != nil { os.Remove(tmpStatePath) return nil, err @@ -188,7 +214,7 @@ func (result *BindResult) Cancel() { // Unbind removes a resource from direct engine state without deleting // the workspace resource. Also removes associated permissions/grants entries. func (b *DeploymentBundle) Unbind(ctx context.Context, statePath, resourceKey string) error { - err := b.StateDB.Open(statePath) + err := b.StateDB.Open(ctx, statePath, dstate.WithRecovery(true), dstate.WithWrite(true)) if err != nil { return err } @@ -216,5 +242,6 @@ func (b *DeploymentBundle) Unbind(ctx context.Context, statePath, resourceKey st } } - return b.StateDB.Finalize() + _, err = b.StateDB.Finalize(ctx) + return err } diff --git a/bundle/direct/bundle_apply.go b/bundle/direct/bundle_apply.go index a7f3ee65fc2..6bad8091469 100644 --- a/bundle/direct/bundle_apply.go +++ b/bundle/direct/bundle_apply.go @@ -25,7 +25,7 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa return } - b.StateDB.AssertOpened() + b.StateDB.AssertOpenedForWrite() b.RemoteStateCache.Clear() g, err := makeGraph(plan) diff --git a/bundle/direct/bundle_plan.go b/bundle/direct/bundle_plan.go index f6bcea316cd..eb80f49b687 100644 --- a/bundle/direct/bundle_plan.go +++ b/bundle/direct/bundle_plan.go @@ -37,24 +37,19 @@ func (b *DeploymentBundle) init(client *databricks.WorkspaceClient) error { return err } -// ValidatePlanAgainstState validates that a plan's lineage and serial match the current state. -// This should be called early in the deployment process, before any file operations. +// ValidatePlanAgainstState validates that a plan's lineage and serial match the given state. // If the plan has no lineage (first deployment), validation is skipped. func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan.Plan) error { - // If plan has no lineage, this is a first deployment before any state exists - // No validation needed if plan.Lineage == "" { return nil } - stateDB.AssertOpened() + stateDB.AssertOpenedForReadOrWrite() - // Validate that the plan's lineage matches the current state's lineage if plan.Lineage != stateDB.Data.Lineage { return fmt.Errorf("plan lineage %q does not match state lineage %q; the state may have been modified by another process", plan.Lineage, stateDB.Data.Lineage) } - // Validate that the plan's serial matches the current state's serial if plan.Serial != stateDB.Data.Serial { return fmt.Errorf("plan serial %d does not match state serial %d; the state has been modified since the plan was created. Please run 'bundle plan' again", plan.Serial, stateDB.Data.Serial) } @@ -63,9 +58,9 @@ func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan. } // InitForApply initializes the DeploymentBundle for applying a pre-computed plan. -// This is used when --plan is specified to skip the planning phase. +// StateDB must already be open for write before calling this function. func (b *DeploymentBundle) InitForApply(ctx context.Context, client *databricks.WorkspaceClient, plan *deployplan.Plan) error { - b.StateDB.AssertOpened() + b.StateDB.AssertOpenedForWrite() err := b.init(client) if err != nil { @@ -97,8 +92,10 @@ func (b *DeploymentBundle) InitForApply(ctx context.Context, client *databricks. return nil } +// CalculatePlan computes the deployment plan by comparing local config against remote state. +// StateDB must already be open for read before calling this function. func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks.WorkspaceClient, configRoot *config.Root) (*deployplan.Plan, error) { - b.StateDB.AssertOpened() + b.StateDB.AssertOpenedForRead() err := b.init(client) if err != nil { diff --git a/bundle/direct/dstate/state.go b/bundle/direct/dstate/state.go index 3f6bcce2fc5..7dc9b97f981 100644 --- a/bundle/direct/dstate/state.go +++ b/bundle/direct/dstate/state.go @@ -1,6 +1,8 @@ package dstate import ( + "bufio" + "bytes" "context" "encoding/json" "errors" @@ -14,23 +16,41 @@ import ( "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/statemgmt/resourcestate" "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/log" "github.com/google/uuid" ) -const currentStateVersion = 2 +const ( + currentStateVersion = 2 + initialBufferSize = 64 * 1024 + maxWalEntrySize = 1024 * 1024 + walSuffix = ".wal" +) + +// errStaleWAL is returned when the WAL serial is behind the expected serial. +// The caller should delete the stale WAL and proceed normally. +var errStaleWAL = errors.New("stale WAL") type DeploymentState struct { - Path string - Data Database - mu sync.Mutex + Path string + Data Database + mu sync.Mutex + walFile *os.File + + // Maps resource key to ID. Unlike Data.State, this is up to during writes (deploys). + stateIDs map[string]string } type Database struct { - StateVersion int `json:"state_version"` - CLIVersion string `json:"cli_version"` - Lineage string `json:"lineage"` - Serial int `json:"serial"` - State map[string]ResourceEntry `json:"state"` + StateVersion int `json:"state_version"` + CLIVersion string `json:"cli_version"` + Lineage string `json:"lineage"` + Serial int `json:"serial"` + + // Maps resource key to ResourceEntry which includes ID + full serialized state. + // This is not updated during write/deploy, those writes go to WAL instead. + // The State is then reconstructed from WAL. + State map[string]ResourceEntry `json:"state"` } type ResourceEntry struct { @@ -39,6 +59,18 @@ type ResourceEntry struct { DependsOn []deployplan.DependsOnEntry `json:"depends_on,omitempty"` } +type WALHeader struct { + Lineage string `json:"lineage"` + Serial int `json:"serial"` + StateVersion int `json:"state_version"` + CLIVersion string `json:"cli_version"` +} + +type WALEntry struct { + Key string `json:"k"` + Value *ResourceEntry `json:"v,omitempty"` // nil means delete +} + func NewDatabase(lineage string, serial int) Database { return Database{ StateVersion: currentStateVersion, @@ -50,7 +82,7 @@ func NewDatabase(lineage string, serial int) Database { } func (db *DeploymentState) SaveState(key, newID string, state any, dependsOn []deployplan.DependsOnEntry) error { - db.AssertOpened() + db.AssertOpenedForWrite() db.mu.Lock() defer db.mu.Unlock() @@ -58,22 +90,27 @@ func (db *DeploymentState) SaveState(key, newID string, state any, dependsOn []d db.Data.State = make(map[string]ResourceEntry) } - jsonMessage, err := json.MarshalIndent(state, " ", " ") + // don't indent so that every WAL entry remains on a single line + jsonMessage, err := json.Marshal(state) if err != nil { return err } - db.Data.State[key] = ResourceEntry{ + entry := ResourceEntry{ ID: newID, State: json.RawMessage(jsonMessage), DependsOn: dependsOn, } - return nil + err = appendJSONLine(db.walFile, WALEntry{Key: key, Value: &entry}) + if err == nil { + db.stateIDs[key] = newID + } + return err } func (db *DeploymentState) DeleteState(key string) error { - db.AssertOpened() + db.AssertOpenedForWrite() db.mu.Lock() defer db.mu.Unlock() @@ -81,13 +118,15 @@ func (db *DeploymentState) DeleteState(key string) error { return nil } - delete(db.Data.State, key) - - return nil + err := appendJSONLine(db.walFile, WALEntry{Key: key}) + if err == nil { + delete(db.stateIDs, key) + } + return err } -func (db *DeploymentState) getResourceEntry(key string) (ResourceEntry, bool) { - db.AssertOpened() +func (db *DeploymentState) GetResourceEntry(key string) (ResourceEntry, bool) { + db.AssertOpenedForRead() db.mu.Lock() defer db.mu.Unlock() @@ -99,18 +138,21 @@ func (db *DeploymentState) getResourceEntry(key string) (ResourceEntry, bool) { return result, ok } -// GetResourceEntry returns the full resource entry for the given key. -func (db *DeploymentState) GetResourceEntry(key string) (ResourceEntry, bool) { - return db.getResourceEntry(key) -} - // GetResourceID returns the ID of the resource for the given key, or an empty string if not found. func (db *DeploymentState) GetResourceID(key string) string { - entry, _ := db.getResourceEntry(key) - return entry.ID + db.AssertOpenedForReadOrWrite() + db.mu.Lock() + defer db.mu.Unlock() + + return db.stateIDs[key] } -func (db *DeploymentState) Open(path string) error { +type ( + WithRecovery bool + WithWrite bool +) + +func (db *DeploymentState) Open(ctx context.Context, path string, withRecovery WithRecovery, withWrite WithWrite) error { db.mu.Lock() defer db.mu.Unlock() @@ -118,53 +160,286 @@ func (db *DeploymentState) Open(path string) error { panic(fmt.Sprintf("state already opened: %v, cannot open %v", db.Path, path)) } - data, err := os.ReadFile(path) + db.Path = path + if err := db.Reload(ctx); err != nil { + return err + } + + walPath := db.Path + walSuffix + _, walError := os.Stat(walPath) + if walError == nil { + if withRecovery { + if err := db.replayWAL(ctx); err != nil { + return fmt.Errorf("reading state from %s: %w", path, err) + } + } else { + return fmt.Errorf("unexpected WAL file found at %s", walPath) + } + } + + if err := migrateState(&db.Data); err != nil { + return fmt.Errorf("migrating state %s: %w", path, err) + } + + if withWrite { + if err := os.MkdirAll(filepath.Dir(walPath), 0o755); err != nil { + return fmt.Errorf("failed to create state directory: %w", err) + } + walFile, err := os.OpenFile(walPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600) + if err != nil { + return fmt.Errorf("failed to open WAL file %s: %w", walPath, err) + } + db.walFile = walFile + lineage := db.Data.Lineage + if lineage == "" { + // state file is new, does not have lineage yet; store lineage in the WAL only + lineage = uuid.New().String() + } + walHead := WALHeader{ + Lineage: lineage, + Serial: db.Data.Serial + 1, + StateVersion: currentStateVersion, + CLIVersion: build.GetInfo().Version, + } + return appendJSONLine(db.walFile, walHead) + } + + return nil +} + +// OpenWithData initializes the state from an in-memory database without reading from disk. +// The state is opened in read mode; call UpgradeToWrite to transition to write mode. +func (db *DeploymentState) OpenWithData(path string, data Database) { + db.mu.Lock() + defer db.mu.Unlock() + + if db.Path != "" { + panic(fmt.Sprintf("state already opened: %v, cannot open %v", db.Path, path)) + } + + db.Path = path + db.Data = data + db.stateIDs = make(map[string]string) + for key, entry := range data.State { + db.stateIDs[key] = entry.ID + } +} + +func (db *DeploymentState) Reload(ctx context.Context) error { + db.stateIDs = make(map[string]string) + data, err := os.ReadFile(db.Path) if err != nil { if errors.Is(err, fs.ErrNotExist) { - // Create new database with serial=0, will be incremented to 1 in Finalize() + // Not initializing lineage yet, we might have that saved in WAL db.Data = NewDatabase("", 0) - db.Path = path - return nil + } else { + return err } - return err + } else { + if err := json.Unmarshal(data, &db.Data); err != nil { + return err + } + } + for key, entry := range db.Data.State { + db.stateIDs[key] = entry.ID } + return nil +} - err = json.Unmarshal(data, &db.Data) +func (db *DeploymentState) replayWAL(ctx context.Context) error { + walPath := db.Path + walSuffix + hasEntries, err := db.mergeWalIntoState(ctx) if err != nil { - return err + if errors.Is(err, errStaleWAL) { + log.Debugf(ctx, "Deleting stale WAL file %s", walPath) + _ = os.Remove(walPath) + return nil + } + return fmt.Errorf("WAL recovery failed: %w", err) } + if hasEntries { + if err := db.unlockedSave(); err != nil { + return err + } + } + if err := os.Remove(walPath); err != nil { + return fmt.Errorf("failed to remove WAL file %s: %w", walPath, err) + } + return nil +} - if err := migrateState(&db.Data); err != nil { - return fmt.Errorf("migrating state %s: %w", path, err) +func (db *DeploymentState) validateWALHeader(header *WALHeader) error { + if header.Lineage != db.Data.Lineage && db.Data.Lineage != "" { + return fmt.Errorf("WAL lineage (%s) does not match state lineage (%s)", header.Lineage, db.Data.Lineage) + } + + expected := db.Data.Serial + 1 + if header.Serial < expected { + return errStaleWAL + } + if header.Serial > expected { + return fmt.Errorf("WAL serial (%d) is ahead of expected (%d), state may be corrupted", header.Serial, expected) } - db.Path = path return nil } -func (db *DeploymentState) Finalize() error { +func (db *DeploymentState) mergeWalIntoState(ctx context.Context) (bool, error) { + if db.walFile != nil { + panic("internal error: walFile must be closed") + } + + walPath := db.Path + walSuffix + walFile, err := os.Open(walPath) + if err != nil { + return false, fmt.Errorf("failed to open WAL file %s: %w", walPath, err) + } + defer walFile.Close() + + scanner := bufio.NewScanner(walFile) + scanner.Buffer(make([]byte, 0, initialBufferSize), maxWalEntrySize) + lineNumber := 0 + var corruptedLines [][]byte + + for scanner.Scan() { + lineNumber++ + line := scanner.Bytes() + if lineNumber == 1 { + var header WALHeader + if err := json.Unmarshal(line, &header); err != nil { + return false, fmt.Errorf("failed to parse WAL header: %w", err) + } + if err := db.validateWALHeader(&header); err != nil { + return false, err + } + // Apply header metadata to state (lineage may be new for first deploy) + db.Data.Lineage = header.Lineage + db.Data.Serial = header.Serial + } else { + var entry WALEntry + if err := json.Unmarshal(line, &entry); err != nil { + log.Warnf(ctx, "Skipping corrupted WAL entry at %s:%d: %v", walPath, lineNumber, err) + corruptedLines = append(corruptedLines, append([]byte(nil), line...)) + continue + } + if db.Data.State == nil { + db.Data.State = make(map[string]ResourceEntry) + } + if entry.Value == nil { + delete(db.Data.State, entry.Key) + delete(db.stateIDs, entry.Key) + } else { + db.Data.State[entry.Key] = *entry.Value + db.stateIDs[entry.Key] = entry.Value.ID + } + } + } + + if err := scanner.Err(); err != nil { + return false, err + } + + if len(corruptedLines) > 0 { + corruptedPath := walPath + ".corrupted" + corruptedData := bytes.Join(corruptedLines, []byte("\n")) + if writeErr := os.WriteFile(corruptedPath, corruptedData, 0o600); writeErr != nil { + log.Warnf(ctx, "Failed to save corrupted WAL entries to %s: %v", corruptedPath, writeErr) + } else { + log.Warnf(ctx, "Saved %d corrupted WAL entries to %s", len(corruptedLines), corruptedPath) + } + } + + return lineNumber > 1, nil +} + +// Finalize replays the WAL (if open for write), captures the resulting state, and resets. +// Safe to call multiple times or on an already-finalized state. +// Returns the exported state as of the end of this operation. +func (db *DeploymentState) Finalize(ctx context.Context) (resourcestate.ExportedResourcesMap, error) { + db.mu.Lock() + defer db.mu.Unlock() + + if db.Path == "" { + return nil, nil + } + + var err error + + if db.walFile != nil { + db.walFile.Close() + db.walFile = nil + err = db.replayWAL(ctx) + } + + state := ExportStateFromData(db.Data) + + db.Path = "" + db.Data = Database{} + db.stateIDs = nil + + return state, err +} + +// UpgradeToWrite transitions from read mode to write mode without re-reading state. +// State must already be open for read. This initializes the WAL for writing. +func (db *DeploymentState) UpgradeToWrite() error { db.mu.Lock() defer db.mu.Unlock() - // Generate lineage on first save - if db.Data.Lineage == "" { - db.Data.Lineage = uuid.New().String() + if db.Path == "" { + return errors.New("internal error: DeploymentState must be opened first") + } + if db.walFile != nil { + return errors.New("internal error: DeploymentState is already open for write") } - db.Data.Serial++ + walPath := db.Path + walSuffix + if err := os.MkdirAll(filepath.Dir(walPath), 0o755); err != nil { + return fmt.Errorf("failed to create state directory: %w", err) + } + walFile, err := os.OpenFile(walPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600) + if err != nil { + return fmt.Errorf("failed to open WAL file %s: %w", walPath, err) + } + db.walFile = walFile - return db.unlockedSave() + lineage := db.Data.Lineage + if lineage == "" { + lineage = uuid.New().String() + } + walHead := WALHeader{ + Lineage: lineage, + Serial: db.Data.Serial + 1, + StateVersion: currentStateVersion, + CLIVersion: build.GetInfo().Version, + } + return appendJSONLine(db.walFile, walHead) } -func (db *DeploymentState) AssertOpened() { +func (db *DeploymentState) AssertOpenedForReadOrWrite() { if db.Path == "" { panic("internal error: DeploymentState must be opened first") } } -func (db *DeploymentState) ExportState(ctx context.Context) resourcestate.ExportedResourcesMap { +func (db *DeploymentState) AssertOpenedForRead() { + db.AssertOpenedForReadOrWrite() + if db.walFile != nil { + panic("internal error: DeploymentState must be opened in read mode") + } +} + +func (db *DeploymentState) AssertOpenedForWrite() { + db.AssertOpenedForReadOrWrite() + if db.walFile == nil { + panic("internal error: DeploymentState must be opened in write mode") + } +} + +// ExportStateFromData extracts resource IDs and ETags from a database snapshot. +func ExportStateFromData(data Database) resourcestate.ExportedResourcesMap { result := make(resourcestate.ExportedResourcesMap) - for key, entry := range db.Data.State { + for key, entry := range data.State { var etag string // Extract etag for dashboards. // covered by test case: bundle/deploy/dashboard/detect-change @@ -185,14 +460,16 @@ func (db *DeploymentState) ExportState(ctx context.Context) resourcestate.Export return result } +func (db *DeploymentState) ExportState(ctx context.Context) resourcestate.ExportedResourcesMap { + return ExportStateFromData(db.Data) +} + func (db *DeploymentState) unlockedSave() error { - db.AssertOpened() data, err := json.MarshalIndent(db.Data, "", " ") if err != nil { return err } - // Create parent directories if they don't exist dir := filepath.Dir(db.Path) if err := os.MkdirAll(dir, 0o755); err != nil { return fmt.Errorf("failed to create directory %#v: %w", dir, err) @@ -205,3 +482,15 @@ func (db *DeploymentState) unlockedSave() error { return nil } + +func appendJSONLine(file *os.File, obj any) error { + data, err := json.Marshal(obj) + if err != nil { + return err + } + data = append(data, '\n') + + _, err = file.Write(data) + // no fsync here, not needed + return err +} diff --git a/bundle/direct/dstate/state_test.go b/bundle/direct/dstate/state_test.go index acd2a9e5336..afe8634790a 100644 --- a/bundle/direct/dstate/state_test.go +++ b/bundle/direct/dstate/state_test.go @@ -1,6 +1,7 @@ package dstate import ( + "os" "path/filepath" "testing" @@ -8,46 +9,68 @@ import ( "github.com/stretchr/testify/require" ) +func mustFinalize(t *testing.T, db *DeploymentState) { + t.Helper() + _, err := db.Finalize(t.Context()) + require.NoError(t, err) +} + func TestOpenSaveFinalizeRoundTrip(t *testing.T) { path := filepath.Join(t.TempDir(), "state.json") var db DeploymentState - require.NoError(t, db.Open(path)) + require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true))) require.NoError(t, db.SaveState("jobs.my_job", "123", map[string]string{"key": "val"}, nil)) - require.NoError(t, db.Finalize()) + mustFinalize(t, &db) // Re-open and verify persisted data. var db2 DeploymentState - require.NoError(t, db2.Open(path)) + require.NoError(t, db2.Open(t.Context(), path, WithRecovery(false), WithWrite(false))) assert.Equal(t, 1, db2.Data.Serial) assert.Equal(t, "123", db2.GetResourceID("jobs.my_job")) + mustFinalize(t, &db2) +} + +func TestFinalizeWithNoEntriesDoesNotWriteStateFile(t *testing.T) { + path := filepath.Join(t.TempDir(), "state.json") + + var db DeploymentState + require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true))) + mustFinalize(t, &db) + + _, err := os.Stat(path) + assert.ErrorIs(t, err, os.ErrNotExist) } func TestPanicOnDoubleOpen(t *testing.T) { path := filepath.Join(t.TempDir(), "state.json") var db DeploymentState - require.NoError(t, db.Open(path)) + require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true))) assert.Panics(t, func() { - _ = db.Open(path) + _ = db.Open(t.Context(), path, WithRecovery(true), WithWrite(true)) }) + mustFinalize(t, &db) } func TestDeleteState(t *testing.T) { path := filepath.Join(t.TempDir(), "state.json") var db DeploymentState - require.NoError(t, db.Open(path)) + require.NoError(t, db.Open(t.Context(), path, WithRecovery(true), WithWrite(true))) require.NoError(t, db.SaveState("jobs.my_job", "123", map[string]string{}, nil)) - require.NoError(t, db.Finalize()) - - require.NoError(t, db.DeleteState("jobs.my_job")) - require.NoError(t, db.Finalize()) + mustFinalize(t, &db) var db2 DeploymentState - require.NoError(t, db2.Open(path)) - assert.Equal(t, 2, db2.Data.Serial) - assert.Equal(t, "", db2.GetResourceID("jobs.my_job")) + require.NoError(t, db2.Open(t.Context(), path, WithRecovery(true), WithWrite(true))) + require.NoError(t, db2.DeleteState("jobs.my_job")) + mustFinalize(t, &db2) + + var db3 DeploymentState + require.NoError(t, db3.Open(t.Context(), path, WithRecovery(false), WithWrite(false))) + assert.Equal(t, 2, db3.Data.Serial) + assert.Equal(t, "", db3.GetResourceID("jobs.my_job")) + mustFinalize(t, &db3) } diff --git a/bundle/direct/pkg.go b/bundle/direct/pkg.go index 58b9bc6b4b1..48a9c5a2ff7 100644 --- a/bundle/direct/pkg.go +++ b/bundle/direct/pkg.go @@ -64,7 +64,9 @@ func (d *DeploymentUnit) SetRemoteState(remoteState any) error { return nil } +// ExportState exports the current deployment state as a resource map. +// StateDB must already be open for read before calling this function. func (b *DeploymentBundle) ExportState(ctx context.Context) resourcestate.ExportedResourcesMap { - b.StateDB.AssertOpened() + b.StateDB.AssertOpenedForRead() return b.StateDB.ExportState(ctx) } diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index b4d70ede5ad..6c03ac8870d 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -70,17 +70,28 @@ func deployCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, ta if targetEngine.IsDirect() { b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(ctx), plan, direct.MigrateMode(false)) - // Finalize state: write to disk even if deploy failed, so partial progress is saved. - // Skip for empty plans to avoid creating a state file when nothing was deployed. - if len(plan.Plan) > 0 { - if err := b.DeploymentBundle.StateDB.Finalize(); err != nil { - logdiag.LogError(ctx, err) - } - } } else { bundle.ApplyContext(ctx, b, terraform.Apply()) } + // Capture post-apply state for Load below. + // For direct: flush WAL to disk (Finalize) and capture the result. + // For terraform: parse the state file written by terraform.Apply. + var state statemgmt.ExportedResourcesMap + if targetEngine.IsDirect() { + var err error + state, err = b.DeploymentBundle.StateDB.Finalize(ctx) + if err != nil { + logdiag.LogError(ctx, err) + } + } else { + var err error + state, err = terraform.ParseResourcesState(ctx, b) + if err != nil { + logdiag.LogError(ctx, err) + } + } + // Even if deployment failed, there might be updates in states that we need to upload statemgmt.PushResourcesState(ctx, b, targetEngine) if logdiag.HasError(ctx) { @@ -88,7 +99,7 @@ func deployCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, ta } bundle.ApplySeqContext(ctx, b, - statemgmt.Load(targetEngine), + statemgmt.Load(state), metadata.Compute(), metadata.Upload(), statemgmt.UploadStateForYamlSync(targetEngine), @@ -149,15 +160,27 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand return } - if plan != nil { + planFromFile := plan != nil + if plan == nil { + // State is already open for read by process.go (for direct engine) + plan = RunPlan(ctx, b, engine) + } + + if engine.IsDirect() { + // Upgrade from read (opened by process.go) to write mode + if err := b.DeploymentBundle.StateDB.UpgradeToWrite(); err != nil { + logdiag.LogError(ctx, err) + return + } + } + + if planFromFile { // Initialize DeploymentBundle for applying the loaded plan err := b.DeploymentBundle.InitForApply(ctx, b.WorkspaceClient(ctx), plan) if err != nil { logdiag.LogError(ctx, err) return } - } else { - plan = RunPlan(ctx, b, engine) } if logdiag.HasError(ctx) { diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 91640ac6cad..98e6f7fee2a 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -14,6 +14,7 @@ import ( "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/direct" "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/logdiag" "github.com/databricks/databricks-sdk-go/apierr" @@ -76,17 +77,23 @@ func approvalForDestroy(ctx context.Context, b *bundle.Bundle, plan *deployplan. func destroyCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, engine engine.EngineType) { if engine.IsDirect() { b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(ctx), plan, direct.MigrateMode(false)) - // Skip Finalize for empty plans to avoid creating a state file when nothing was destroyed. - if len(plan.Plan) > 0 { - if err := b.DeploymentBundle.StateDB.Finalize(); err != nil { - logdiag.LogError(ctx, err) - } - } } else { // Core destructive mutators for destroy. These require informed user consent. bundle.ApplyContext(ctx, b, terraform.Apply()) } + // Flush WAL to local state file before deleting remote files. + // Warn instead of hard-error: resources are already deleted, so proceed + // with file cleanup regardless of whether state flush succeeds. + if engine.IsDirect() { + if _, err := b.DeploymentBundle.StateDB.Finalize(ctx); err != nil { + diags := diag.WarningFromErr(err) + if len(diags) > 0 { + logdiag.LogDiag(ctx, diags[0]) + } + } + } + if logdiag.HasError(ctx) { return } @@ -168,6 +175,13 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { } if hasApproval { + if engine.IsDirect() { + // Upgrade from read (opened by process.go) to write mode + if err := b.DeploymentBundle.StateDB.UpgradeToWrite(); err != nil { + logdiag.LogError(ctx, err) + return + } + } destroyCore(ctx, b, plan, engine) } else { cmdio.LogString(ctx, "Destroy cancelled!") diff --git a/bundle/statemgmt/state_load.go b/bundle/statemgmt/state_load.go index 3345792c295..573c69126c2 100644 --- a/bundle/statemgmt/state_load.go +++ b/bundle/statemgmt/state_load.go @@ -9,9 +9,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/config/resources" - "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/statemgmt/resourcestate" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" @@ -26,8 +24,8 @@ type ( const ErrorOnEmptyState LoadMode = 0 type load struct { - modes []LoadMode - engine engine.EngineType + state ExportedResourcesMap + modes []LoadMode } func (l *load) Name() string { @@ -35,27 +33,16 @@ func (l *load) Name() string { } func (l *load) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - var err error - var state ExportedResourcesMap - - if l.engine.IsDirect() { - state = b.DeploymentBundle.ExportState(ctx) - } else { - var err error - state, err = terraform.ParseResourcesState(ctx, b) - if err != nil { - return diag.FromErr(err) - } - } + return applyState(ctx, b, l.state, l.modes) +} - err = l.validateState(state) - if err != nil { +// applyState merges the exported resource state into the bundle configuration. +func applyState(ctx context.Context, b *bundle.Bundle, state ExportedResourcesMap, modes []LoadMode) diag.Diagnostics { + if err := validateLoadedState(state, modes); err != nil { return diag.FromErr(err) } - // Merge state into configuration. - err = StateToBundle(ctx, state, &b.Config) - if err != nil { + if err := StateToBundle(ctx, state, &b.Config); err != nil { return diag.FromErr(err) } @@ -160,14 +147,14 @@ func StateToBundle(ctx context.Context, state ExportedResourcesMap, config *conf }) } -func (l *load) validateState(state ExportedResourcesMap) error { - if len(state) == 0 && slices.Contains(l.modes, ErrorOnEmptyState) { +func validateLoadedState(state ExportedResourcesMap, modes []LoadMode) error { + if len(state) == 0 && slices.Contains(modes, ErrorOnEmptyState) { return errors.New("resource not found or not yet deployed. Did you forget to run 'databricks bundle deploy'?") } - return nil } -func Load(engine engine.EngineType, modes ...LoadMode) bundle.Mutator { - return &load{modes: modes, engine: engine} +// Load returns a mutator that merges the provided resource state into the bundle configuration. +func Load(state ExportedResourcesMap, modes ...LoadMode) bundle.Mutator { + return &load{state: state, modes: modes} } diff --git a/bundle/statemgmt/upload_state_for_yaml_sync.go b/bundle/statemgmt/upload_state_for_yaml_sync.go index 74def3174f8..0399c7b31ff 100644 --- a/bundle/statemgmt/upload_state_for_yaml_sync.go +++ b/bundle/statemgmt/upload_state_for_yaml_sync.go @@ -141,12 +141,8 @@ func (m *uploadStateForYamlSync) convertState(ctx context.Context, b *bundle.Bun migratedDB := dstate.NewDatabase(tfState.Lineage, tfState.Serial+1) migratedDB.State = state - deploymentBundle := &direct.DeploymentBundle{ - StateDB: dstate.DeploymentState{ - Path: snapshotPath, - Data: migratedDB, - }, - } + deploymentBundle := &direct.DeploymentBundle{} + deploymentBundle.StateDB.OpenWithData(snapshotPath, migratedDB) // Apply SecretScopeFixups so the config matches what the direct engine expects. // This adds MANAGE ACL for the current user to all secret scopes, ensuring @@ -197,8 +193,12 @@ func (m *uploadStateForYamlSync) convertState(ctx context.Context, b *bundle.Bun } } + if err := deploymentBundle.StateDB.UpgradeToWrite(); err != nil { + return false, fmt.Errorf("upgrading state for apply: %w", err) + } + deploymentBundle.Apply(ctx, b.WorkspaceClient(ctx), plan, direct.MigrateMode(true)) - if err := deploymentBundle.StateDB.Finalize(); err != nil { + if _, err := deploymentBundle.StateDB.Finalize(ctx); err != nil { return false, err } diff --git a/cmd/bundle/deployment/migrate.go b/cmd/bundle/deployment/migrate.go index 5020d88e73a..77d95e3533e 100644 --- a/cmd/bundle/deployment/migrate.go +++ b/cmd/bundle/deployment/migrate.go @@ -227,12 +227,8 @@ To start using direct engine, set "engine: direct" under bundle in your databric migratedDB := dstate.NewDatabase(stateDesc.Lineage, stateDesc.Serial+1) migratedDB.State = state - deploymentBundle := &direct.DeploymentBundle{ - StateDB: dstate.DeploymentState{ - Path: tempStatePath, - Data: migratedDB, - }, - } + deploymentBundle := &direct.DeploymentBundle{} + deploymentBundle.StateDB.OpenWithData(tempStatePath, migratedDB) tempStatePathAutoRemove := true @@ -281,8 +277,12 @@ To start using direct engine, set "engine: direct" under bundle in your databric } } + if err := deploymentBundle.StateDB.UpgradeToWrite(); err != nil { + return fmt.Errorf("upgrading state for apply: %w", err) + } + deploymentBundle.Apply(ctx, b.WorkspaceClient(ctx), plan, direct.MigrateMode(true)) - if err := deploymentBundle.StateDB.Finalize(); err != nil { + if _, err := deploymentBundle.StateDB.Finalize(ctx); err != nil { logdiag.LogError(ctx, err) } if logdiag.HasError(ctx) { @@ -310,7 +310,7 @@ Validate the migration by running "databricks bundle plan%s", there should be no The state file is not synchronized to the workspace yet. To do that and finalize the migration, run "bundle deploy%s". To undo the migration, remove %s and rename %s to %s -`, len(deploymentBundle.StateDB.Data.State), localPath, extraArgsStr, extraArgsStr, localPath, localTerraformBackupPath, localTerraformPath)) +`, len(state), localPath, extraArgsStr, extraArgsStr, localPath, localTerraformBackupPath, localTerraformPath)) return nil } diff --git a/cmd/bundle/generate/dashboard.go b/cmd/bundle/generate/dashboard.go index 70de46225c2..71f4f573cf5 100644 --- a/cmd/bundle/generate/dashboard.go +++ b/cmd/bundle/generate/dashboard.go @@ -16,6 +16,8 @@ import ( "time" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/deploy/terraform" + "github.com/databricks/cli/bundle/direct/dstate" "github.com/databricks/cli/bundle/generate" "github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/resources" @@ -391,14 +393,26 @@ func (d *dashboard) runForResource(ctx context.Context, b *bundle.Bundle) { if stateDesc.Engine.IsDirect() { _, localPath := b.StateFilenameDirect(ctx) - if err := b.DeploymentBundle.StateDB.Open(localPath); err != nil { + if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { + logdiag.LogError(ctx, err) + return + } + } + + var state statemgmt.ExportedResourcesMap + if stateDesc.Engine.IsDirect() { + state = b.DeploymentBundle.ExportState(ctx) + } else { + var err error + state, err = terraform.ParseResourcesState(ctx, b) + if err != nil { logdiag.LogError(ctx, err) return } } bundle.ApplySeqContext(ctx, b, - statemgmt.Load(stateDesc.Engine), + statemgmt.Load(state), ) if logdiag.HasError(ctx) { return diff --git a/cmd/bundle/utils/process.go b/cmd/bundle/utils/process.go index a6f48d99fa2..5f43cff6acd 100644 --- a/cmd/bundle/utils/process.go +++ b/cmd/bundle/utils/process.go @@ -11,8 +11,10 @@ import ( "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/config/mutator" "github.com/databricks/cli/bundle/config/validate" + "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/bundle/direct/dstate" "github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/statemgmt" "github.com/databricks/cli/cmd/root" @@ -187,7 +189,7 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (b *bundle.Bundle needDirectState := stateDesc.Engine.IsDirect() && (opts.InitIDs || opts.ErrorOnEmptyState || opts.Deploy || opts.ReadPlanPath != "" || opts.PreDeployChecks || opts.PostStateFunc != nil) if needDirectState { _, localPath := b.StateFilenameDirect(ctx) - if err := b.DeploymentBundle.StateDB.Open(localPath); err != nil { + if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { logdiag.LogError(ctx, err) return b, stateDesc, root.ErrAlreadyPrinted } @@ -199,8 +201,19 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (b *bundle.Bundle if opts.ErrorOnEmptyState { modes = append(modes, statemgmt.ErrorOnEmptyState) } + var state statemgmt.ExportedResourcesMap + if stateDesc.Engine.IsDirect() { + state = b.DeploymentBundle.ExportState(ctx) + } else { + var err error + state, err = terraform.ParseResourcesState(ctx, b) + if err != nil { + logdiag.LogError(ctx, err) + return b, stateDesc, root.ErrAlreadyPrinted + } + } mutators := []bundle.Mutator{ - statemgmt.Load(stateDesc.Engine, modes...), + statemgmt.Load(state, modes...), } // InitializeURLs makes an extra API call; only run it when URLs are needed. if opts.InitIDs { diff --git a/databricks.yml b/databricks.yml new file mode 100644 index 00000000000..7cf210722a2 --- /dev/null +++ b/databricks.yml @@ -0,0 +1,19 @@ +bundle: + name: git + git: + # This is currently not supported + branch: ${var.deployment_branch} + +variables: + deployment_branch: + # By setting deployment_branch to "" we set bundle.git.branch to "" which is the same unsetting it. + # This this should make CLI read branch from git and update bundle.git.branch accordingly. It should + # Also set bundle.git.inferred to true. + default: "" + +targets: + prod: + default: true + dev: + variables: + deployment_branch: dev-branch diff --git a/libs/testserver/kill.go b/libs/testserver/kill.go new file mode 100644 index 00000000000..e24b13a0f11 --- /dev/null +++ b/libs/testserver/kill.go @@ -0,0 +1,108 @@ +package testserver + +import ( + "encoding/json" + "net/http" + "os" + "sync" + "time" + + "github.com/databricks/cli/internal/testutil" +) + +type killRuleKey struct { + token string + pattern string // "METHOD /path" +} + +type killRule struct { + offset int + times int +} + +type killRules struct { + mu sync.Mutex + rules map[killRuleKey]*killRule +} + +func newKillRules() *killRules { + return &killRules{rules: make(map[killRuleKey]*killRule)} +} + +func (kr *killRules) set(token, pattern string, offset, times int) { + kr.mu.Lock() + defer kr.mu.Unlock() + kr.rules[killRuleKey{token: token, pattern: pattern}] = &killRule{offset: offset, times: times} +} + +// check returns true if the caller should be killed for this request. +// It also performs the kill. +func (kr *killRules) check(t testutil.TestingT, method, path, token string, headers http.Header) bool { + pattern := method + " " + path + key := killRuleKey{token: token, pattern: pattern} + + kr.mu.Lock() + rule, ok := kr.rules[key] + if !ok { + kr.mu.Unlock() + return false + } + if rule.offset > 0 { + rule.offset-- + kr.mu.Unlock() + return false + } + if rule.times <= 0 { + delete(kr.rules, key) + kr.mu.Unlock() + return false + } + rule.times-- + if rule.times == 0 { + delete(kr.rules, key) + } + kr.mu.Unlock() + + killProcess(t, pattern, headers) + return true +} + +func killProcess(t testutil.TestingT, pattern string, headers http.Header) { + pid := ExtractPidFromHeaders(headers) + if pid == 0 { + t.Errorf("kill rule matched %q but test-pid not found in User-Agent", pattern) + return + } + + process, err := os.FindProcess(pid) + if err != nil { + t.Errorf("Failed to find process %d: %s", pid, err) + return + } + + if err := process.Kill(); err != nil { + t.Errorf("Failed to kill process %d: %s", pid, err) + return + } + + if !waitForProcessExit(pid, 2*time.Second) { + t.Logf("kill: timed out waiting for PID %d to exit (pattern: %s)", pid, pattern) + } + t.Logf("kill: killed PID %d (pattern: %s)", pid, pattern) +} + +// killEndpointHandler returns a HandlerFunc for POST /__testserver/kill. +func killEndpointHandler(kr *killRules) HandlerFunc { + return func(req Request) any { + var body struct { + Pattern string `json:"pattern"` + Offset int `json:"offset"` + Times int `json:"times"` + } + if err := json.Unmarshal(req.Body, &body); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"error": err.Error()}} + } + kr.set(req.Token, body.Pattern, body.Offset, body.Times) + return Response{StatusCode: 200} + } +} diff --git a/acceptance/internal/process_unix.go b/libs/testserver/process_unix.go similarity index 94% rename from acceptance/internal/process_unix.go rename to libs/testserver/process_unix.go index 1e0b0ead3e1..8b82187580e 100644 --- a/acceptance/internal/process_unix.go +++ b/libs/testserver/process_unix.go @@ -1,6 +1,6 @@ //go:build linux || darwin -package internal +package testserver import ( "syscall" diff --git a/acceptance/internal/process_windows.go b/libs/testserver/process_windows.go similarity index 96% rename from acceptance/internal/process_windows.go rename to libs/testserver/process_windows.go index fdad8b4f5e2..2a32fe4ede0 100644 --- a/acceptance/internal/process_windows.go +++ b/libs/testserver/process_windows.go @@ -1,6 +1,6 @@ //go:build windows -package internal +package testserver import ( "time" diff --git a/libs/testserver/server.go b/libs/testserver/server.go index 40556e55294..aa05aee5ab4 100644 --- a/libs/testserver/server.go +++ b/libs/testserver/server.go @@ -46,6 +46,8 @@ type Server struct { fakeOidc *FakeOidc mu sync.Mutex + kills *killRules + RequestCallback func(request *Request) ResponseCallback func(request *Request, response *EncodedResponse) } @@ -58,6 +60,7 @@ type Request struct { Vars map[string]string Workspace *FakeWorkspace Context context.Context + Token string } type Response struct { @@ -200,7 +203,19 @@ func getHeaders(value []byte) http.Header { func New(t testutil.TestingT) *Server { router := NewRouter() - server := httptest.NewServer(router) + kills := newKillRules() + + // Wrap the router so kill rules fire for ALL requests, including those with + // no registered handler that would otherwise bypass serve() entirely. + killMiddleware := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + token := getToken(r) + if kills.check(t, r.Method, r.URL.Path, token, r.Header) { + return + } + router.ServeHTTP(w, r) + }) + + server := httptest.NewServer(killMiddleware) t.Cleanup(server.Close) s := &Server{ @@ -209,6 +224,7 @@ func New(t testutil.TestingT) *Server { t: t, fakeWorkspaces: map[string]*FakeWorkspace{}, fakeOidc: &FakeOidc{url: server.URL}, + kills: kills, } router.Dispatch = s.serve @@ -258,6 +274,9 @@ Response.Body = '' }) router.NotFound = notFoundFunc + // Register a test-only endpoint for setting up kill rules from scripts. + s.Handle("POST", "/__testserver/kill", killEndpointHandler(s.kills)) + // Register a default handler for the SDK's host metadata discovery endpoint. // The SDK resolves this during config initialization (as of v0.126.0) to // determine workspace/account IDs, cloud, and OIDC endpoints. Without this @@ -289,12 +308,15 @@ func (s *Server) getWorkspaceForToken(token string) *FakeWorkspace { } func (s *Server) serve(w http.ResponseWriter, r *http.Request, handler HandlerFunc, vars map[string]string) { + token := getToken(r) + // Each test uses unique DATABRICKS_TOKEN, we simulate each token having // it's own fake fakeWorkspace to avoid interference between tests. - fakeWorkspace := s.getWorkspaceForToken(getToken(r)) + fakeWorkspace := s.getWorkspaceForToken(token) request := NewRequest(s.t, r, fakeWorkspace) request.Vars = vars + request.Token = token if s.RequestCallback != nil { s.RequestCallback(&request)