Skip to content

Python API

Terminal window
pip install ixtract

Profile the source, consult run history, and produce an ExecutionPlan.

from ixtract import plan, ExtractionIntent, RuntimeContext, CostConfig
result = plan(
intent,
runtime_context=None, # optional RuntimeContext
cost_config=None, # optional CostConfig
)

Returns a PlanResult. Nothing is extracted.

Execute a PlanResult returned by plan().

from ixtract import execute
if result.is_safe:
execution = execute(result)
print(f"{execution.rows_extracted:,} rows in {execution.duration_seconds:.1f}s")

Raises NotRecommendedError if the verdict is NOT RECOMMENDED and force=False.

Planner-free execution path. Takes a stored ExecutionPlan directly — no re-planning, no profile, no enrichment. Used by replay internally.

from ixtract import execute_plan
execute_plan(stored_plan, intent)

Load a stored plan by run ID, verify its fingerprint, and re-execute it.

from ixtract import replay
replay_result = replay(run_id="run_001", intent=intent)

Raises PlanCorruptionError if the stored fingerprint does not match. Raises UnsupportedPlanVersion if the plan version is incompatible.

Profile a source without planning.

from ixtract import profile
source_profile = profile(intent)
print(source_profile.row_count_estimate)
print(source_profile.cv) # coefficient of variation (skew indicator)

Return the full explanation for the last plan: worker resolution chain, context applied, chunk boundaries.

from ixtract import explain
explanation = explain(intent)

from ixtract import ExtractionIntent
intent = ExtractionIntent(
source_type="postgresql", # "postgresql" | "mysql" | "sqlserver"
source_config={
"host": "localhost",
"port": 5432,
"database": "mydb",
"user": "app",
"password": "secret",
},
object_name="orders",
output_dir="./output", # default: "./output"
output_format="parquet", # "parquet" | "csv" | "s3" | "gcs"
)

All fields optional. Defaults to None (no constraint applied).

from ixtract import RuntimeContext
ctx = RuntimeContext(
# Hard caps — override controller
max_workers=4,
min_workers=1,
# Soft multipliers — scale the base worker count
source_load="high", # "low" | "normal" | "high"
network_quality="degraded", # "good" | "normal" | "degraded"
priority="low", # "low" | "normal" | "high"
# Advisories — informational, produce warnings not blocks
target_duration_seconds=60,
maintenance_window_minutes=30,
disk_budget_gb=10.0,
egress_budget_gb=5.0,
)

RuntimeContext constrains the plan but never feeds the controller. A run with RuntimeContext is excluded from controller learning.

from ixtract import CostConfig
cost_cfg = CostConfig(
compute_rate=0.05, # cost per worker-second
egress_rate=0.01, # cost per GB egressed
connection_rate=0.0, # cost per connection-second
)
# From file
cost_cfg = CostConfig.from_file("rates.json")
# From CLI args dict
cost_cfg = CostConfig.from_cli_args(args)

All rates default to zero. Cost is an estimate, not a guarantee.


result.is_safe # True if verdict is SAFE TO RUN
result.is_not_recommended # True if verdict is NOT RECOMMENDED
result.plan # ExecutionPlan dataclass
result.verdict # "SAFE TO RUN" | "SAFE WITH WARNINGS" | "NOT RECOMMENDED"
result.advisories # list of advisory strings
result.cost_estimate # CostEstimate or None
execution.rows_extracted # int
execution.duration_seconds # float
execution.throughput_rows_sec # float
execution.run_id # str
execution.output_files # list[str]

from ixtract import (
IxtractError, # base
ValidationError, # bad intent or config
NotRecommendedError, # verdict NOT RECOMMENDED, force=False
ExecutionError, # runtime failure during extraction
PlanCorruptionError, # replay: fingerprint mismatch
UnsupportedPlanVersion, # replay: incompatible plan version
)

from ixtract import (
plan, execute,
ExtractionIntent, RuntimeContext, CostConfig,
NotRecommendedError,
)
intent = ExtractionIntent(
source_type="postgresql",
source_config={"host": "db.prod", "database": "analytics", "user": "etl"},
object_name="events",
output_dir="/data/output",
)
ctx = RuntimeContext(
source_load="high",
network_quality="degraded",
priority="low",
)
cost = CostConfig(compute_rate=0.05, egress_rate=0.01)
result = plan(intent, runtime_context=ctx, cost_config=cost)
print(f"Verdict: {result.verdict}")
if result.cost_estimate:
print(f"Estimated cost: ${result.cost_estimate.total:.2f}")
try:
execution = execute(result)
print(f"{execution.rows_extracted:,} rows · {execution.throughput_rows_sec:,.0f}/s")
except NotRecommendedError as e:
print(f"Not recommended: {e}")