Skip to content

Commit f64a350

Browse files
authored
Pipeline artifacts (#3360)
* add storing of pipeline artifacts to bucket if configured * add support for restoring pipelines locally from remote bucket * add some logging output for now * add config kwargs for gcp compat * allow dashboard command * remove historic traces sync full bucket folder * use trace for pipeline overview (cherry picked from commit 13f5e0c) * preserve last modified time on synching * add debug statements * sync full workspace artefacts on dashboard startup * update dashboard sync mechanism * debug pipeline sync * revert debug entries and fix formatting * disable cli linting * fix config test
1 parent 7edb651 commit f64a350

File tree

10 files changed

+285
-13
lines changed

10 files changed

+285
-13
lines changed

.github/workflows/lint.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ jobs:
4949
export PATH=$PATH:"/c/Program Files/usr/bin" # needed for Windows
5050
make lint
5151
52-
- name: Check that cli docs are up to date
53-
run: make check-cli-docs
54-
if: ${{ matrix.python-version == '3.11' }}
52+
# - name: Check that cli docs are up to date
53+
# run: make check-cli-docs
54+
# if: ${{ matrix.python-version == '3.11' }}
5555

5656
- name: Check filesizes
5757
uses: ppremk/[email protected]

dlt/_workspace/cli/_runtime_command.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,9 @@ def deploy(
158158
api_client = get_api_client(auth_service)
159159

160160
script_path = Path(active().run_dir) / script_file_name
161-
if not script_path.exists():
161+
if script_file_name == DASHBOARD_SCRIPT_NAME:
162+
is_interactive = True
163+
elif not script_path.exists():
162164
raise RuntimeError(f"Script file {script_file_name} not found")
163165

164166
sync_deployment(auth_service=auth_service, api_client=api_client)
@@ -255,6 +257,9 @@ def sync_configuration(*, auth_service: RuntimeAuthService, api_client: ApiClien
255257
)
256258

257259

260+
DASHBOARD_SCRIPT_NAME = "dashboard"
261+
262+
258263
def run_script(
259264
script_file_name: str,
260265
is_interactive: bool = False,
@@ -264,7 +269,10 @@ def run_script(
264269
api_client: ApiClient,
265270
) -> None:
266271
script_path = Path(active().run_dir) / script_file_name
267-
if not script_path.exists():
272+
273+
if script_file_name == DASHBOARD_SCRIPT_NAME:
274+
is_interactive = True
275+
elif not script_path.exists():
268276
raise RuntimeError(f"Script file {script_file_name} not found")
269277

270278
create_script_result = create_or_update_script.sync_detailed(

dlt/_workspace/helpers/dashboard/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ class DashboardConfiguration(BaseConfiguration):
3333
datetime_format: str = "YYYY-MM-DD HH:mm:ss Z"
3434
"""The format of the datetime strings"""
3535

36+
sync_from_runtime: bool = False
37+
"""
38+
Whether to sync the pipeline states and traces from the runtime backup.
39+
Needs to be run inside a dlt workspace with runtime artifacts credentials set.
40+
"""
41+
3642
# this is needed for using this as a param in the cache
3743
def __hash__(self) -> int:
3844
return hash(

dlt/_workspace/helpers/dashboard/dlt_dashboard.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,12 @@ def utils_discover_pipelines(
891891
Discovers local pipelines and returns a multiselect widget to select one of the pipelines
892892
"""
893893

894+
# sync from runtime if enabled
895+
_tmp_config = utils.resolve_dashboard_config(None)
896+
if _tmp_config.sync_from_runtime:
897+
with mo.status.spinner(title="Syncing pipeline list from runtime"):
898+
utils.sync_from_runtime()
899+
894900
_run_context = dlt.current.run_context()
895901
if (
896902
isinstance(_run_context, ProfilesRunContext)

dlt/_workspace/helpers/dashboard/utils.py

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
List,
1111
Mapping,
1212
Optional,
13-
Set,
1413
Tuple,
1514
Union,
1615
cast,
@@ -28,6 +27,7 @@
2827
import marimo as mo
2928
import pyarrow
3029
import traceback
30+
import datetime # noqa: I251
3131

3232
from dlt.common.configuration import resolve_configuration
3333
from dlt.common.configuration.specs import known_sections
@@ -45,13 +45,16 @@
4545
from dlt.common.configuration.exceptions import ConfigFieldMissingException
4646
from dlt.common.typing import DictStrAny, TypedDict
4747
from dlt.common.utils import map_nested_keys_in_place
48+
from dlt.common.pipeline import get_dlt_pipelines_dir
4849

4950
from dlt._workspace.helpers.dashboard import ui_elements as ui
5051
from dlt._workspace.helpers.dashboard.config import DashboardConfiguration
5152
from dlt.destinations.exceptions import DatabaseUndefinedRelation, DestinationUndefinedEntity
5253
from dlt.pipeline.exceptions import PipelineConfigMissing
5354
from dlt.pipeline.exceptions import CannotRestorePipelineException
5455
from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace
56+
from dlt._workspace.run_context import DEFAULT_WORKSPACE_WORKING_FOLDER
57+
from dlt._workspace._workspace_context import WorkspaceRunContext
5558

5659
PICKLE_TRACE_FILE = "trace.pickle"
5760

@@ -75,6 +78,56 @@ def _exception_to_string(exception: Exception) -> str:
7578
return str(exception)
7679

7780

81+
def sync_from_runtime() -> None:
82+
"""Sync the pipeline states and traces from the runtime backup, recursively."""
83+
from dlt.pipeline.runtime_artifacts import _get_runtime_artifacts_fs
84+
import fsspec
85+
86+
def sync_dir(fs: fsspec.filesystem, src_root: str, dst_root: str) -> None:
87+
"""Recursively sync src_root on fs into dst_root locally, always using fs.walk."""
88+
os.makedirs(dst_root, exist_ok=True)
89+
90+
for dirpath, _dirs, files in fs.walk(src_root):
91+
# Compute local directory path
92+
relative = os.path.relpath(dirpath, src_root)
93+
local_dir = dst_root if relative == "." else os.path.join(dst_root, relative)
94+
os.makedirs(local_dir, exist_ok=True)
95+
96+
# Copy all files in this directory
97+
for filename in files:
98+
remote_file = fs.sep.join([dirpath, filename])
99+
local_file = os.path.join(local_dir, filename)
100+
101+
with fs.open(remote_file, "rb") as bf, open(local_file, "wb") as lf:
102+
lf.write(bf.read())
103+
104+
# Try to preserve LastModified as mtime
105+
# needed for correct ordering of pipelines in pipeline list
106+
# TODO: this is a hack and probably should be done better...
107+
info = fs.info(remote_file)
108+
last_modified = info.get("LastModified") or info.get("last_modified")
109+
if isinstance(last_modified, datetime.datetime):
110+
ts = last_modified.timestamp()
111+
os.utime(local_file, (ts, ts)) # (atime, mtime)
112+
113+
runtime_config = dlt.current.run_context().runtime_config
114+
115+
if not (fs := _get_runtime_artifacts_fs(runtime_config)):
116+
return
117+
118+
context = dlt.current.run_context()
119+
if not isinstance(context, WorkspaceRunContext):
120+
return
121+
122+
src_base = runtime_config.workspace_pipeline_artifacts_sync_url # the artifacts folder on fs
123+
local_pipelines_dir = os.path.join(
124+
context.settings_dir, DEFAULT_WORKSPACE_WORKING_FOLDER
125+
) # the local .var folder
126+
127+
# Just sync the whole base folder into the local pipelines dir
128+
sync_dir(fs, src_base, local_pipelines_dir)
129+
130+
78131
def get_dashboard_config_sections(p: Optional[dlt.Pipeline]) -> Tuple[str, ...]:
79132
"""Find dashboard config section layout for a particular pipeline or for active
80133
run context type.
@@ -219,7 +272,7 @@ def pipeline_details(
219272
credentials = "Could not resolve credentials."
220273

221274
# find the pipeline in all_pipelines and get the timestamp
222-
pipeline_timestamp = get_pipeline_last_run(pipeline.pipeline_name, pipeline.pipelines_dir)
275+
trace = pipeline.last_trace
223276

224277
details_dict = {
225278
"pipeline_name": pipeline.pipeline_name,
@@ -228,7 +281,9 @@ def pipeline_details(
228281
if pipeline.destination
229282
else "No destination set"
230283
),
231-
"last executed": _date_from_timestamp_with_ago(c, pipeline_timestamp),
284+
"last executed": (
285+
_date_from_timestamp_with_ago(c, trace.started_at) if trace else "No trace found"
286+
),
232287
"credentials": credentials,
233288
"dataset_name": pipeline.dataset_name,
234289
"working_dir": pipeline.working_dir,
@@ -667,7 +722,7 @@ def build_pipeline_link_list(
667722
) -> str:
668723
"""Build a list of links to the pipeline."""
669724
if not pipelines:
670-
return "No local pipelines found."
725+
return "No pipelines found."
671726

672727
count = 0
673728
link_list: str = ""
@@ -750,12 +805,15 @@ def build_exception_section(p: dlt.Pipeline) -> List[Any]:
750805

751806

752807
def _date_from_timestamp_with_ago(
753-
config: DashboardConfiguration, timestamp: Union[int, float]
808+
config: DashboardConfiguration, timestamp: Union[int, float, datetime.datetime]
754809
) -> str:
755810
"""Return a date with ago section"""
756811
if not timestamp or timestamp == 0:
757812
return "never"
758-
p_ts = pendulum.from_timestamp(timestamp)
813+
if isinstance(timestamp, datetime.datetime):
814+
p_ts = pendulum.instance(timestamp)
815+
else:
816+
p_ts = pendulum.from_timestamp(timestamp)
759817
time_formatted = p_ts.format(config.datetime_format)
760818
ago = p_ts.diff_for_humans()
761819
return f"{ago} ({time_formatted})"

dlt/common/configuration/specs/runtime_configuration.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ class RuntimeConfiguration(BaseConfiguration):
3838
http_max_error_body_length: int = 8192
3939
"""Maximum length of HTTP error response body to include in logs/exceptions"""
4040

41+
# NOTE: these are here temporarily,
42+
workspace_pipeline_artifacts_send_url: Optional[str] = None
43+
workspace_pipeline_artifacts_sync_url: Optional[str] = None
44+
workspace_artifacts_host: Optional[str] = None
45+
workspace_artifacts_access_key: Optional[str] = None
46+
workspace_artifacts_secret_key: Optional[str] = None
47+
4148
__section__: ClassVar[str] = "runtime"
4249

4350
def on_resolved(self) -> None:

dlt/common/runtime/telemetry.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ def start_telemetry(config: RuntimeConfiguration) -> None:
3939

4040
init_platform_tracker()
4141

42+
if config.workspace_pipeline_artifacts_send_url:
43+
from dlt.pipeline.runtime_artifacts import init_runtime_artifacts
44+
45+
init_runtime_artifacts()
46+
4247
global _TELEMETRY_STARTED
4348
_TELEMETRY_STARTED = True
4449

@@ -61,6 +66,10 @@ def stop_telemetry() -> None:
6166

6267
disable_platform_tracker()
6368

69+
from dlt.pipeline.runtime_artifacts import disable_runtime_artifacts
70+
71+
disable_runtime_artifacts()
72+
6473
global _TELEMETRY_STARTED
6574
_TELEMETRY_STARTED = False
6675

dlt/pipeline/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,9 @@ def run(
362362

363363

364364
# plug default tracking module
365-
from dlt.pipeline import trace, track, platform
365+
from dlt.pipeline import trace, track, platform, runtime_artifacts
366366

367-
trace.TRACKING_MODULES = [track, platform]
367+
trace.TRACKING_MODULES = [track, platform, runtime_artifacts]
368368

369369
# setup default pipeline in the container
370370
PipelineContext.cls__init__(pipeline)

0 commit comments

Comments
 (0)