Pipelines
Data transformation and ingestion workflows
Compute ClusterHealthy
All pipelines execute on the selected cluster8 vCPUs
32 GB RAM
Max 6 concurrent
Utilization
42%
Market Data ETLActive
pipelines/pip-001.py
| 1 | import pyarrow as pa |
| 2 | import pyarrow.parquet as pq |
| 3 | from kraken_sdk import DataSource, Pipeline, Output |
| 4 | from datetime import datetime, timedelta |
| 5 | |
| 6 | pipeline = Pipeline("market-data-etl") |
| 7 | |
| 8 | @pipeline.transform |
| 9 | def extract_and_normalize(ctx): |
| 10 | """Fetch pricing data from all marketplace sources and normalize.""" |
| 11 | amazon = ctx.source("amazon-sp-api") |
| 12 | shopify = ctx.source("shopify") |
| 13 | |
| 14 | # Pull last hour of pricing updates |
| 15 | cutoff = datetime.utcnow() - timedelta(hours=1) |
| 16 | |
| 17 | amazon_prices = amazon.query( |
| 18 | "SELECT asin, price, currency, marketplace, updated_at " |
| 19 | "FROM pricing WHERE updated_at > :cutoff", |
| 20 | params={"cutoff": cutoff}, |
| 21 | ) |
| 22 | |
| 23 | shopify_prices = shopify.query( |
| 24 | "SELECT product_id, variant_id, price, compare_at_price " |
| 25 | "FROM product_variants WHERE updated_at > :cutoff", |
| 26 | params={"cutoff": cutoff}, |
| 27 | ) |
| 28 | |
| 29 | # Normalize to common schema |
| 30 | normalized = [] |
| 31 | for row in amazon_prices: |
| 32 | normalized.append({ |
| 33 | "source": "amazon", |
| 34 | "product_id": row["asin"], |
| 35 | "price_usd": convert_currency(row["price"], row["currency"]), |
| 36 | "marketplace": row["marketplace"], |
| 37 | "timestamp": row["updated_at"], |
| 38 | }) |
| 39 | |
| 40 | for row in shopify_prices: |
| 41 | normalized.append({ |
| 42 | "source": "shopify", |
| 43 | "product_id": row["product_id"], |
| 44 | "price_usd": float(row["price"]), |
| 45 | "marketplace": "shopify-direct", |
| 46 | "timestamp": cutoff.isoformat(), |
| 47 | }) |
| 48 | |
| 49 | return normalized |
| 50 | |
| 51 | |
| 52 | @pipeline.transform |
| 53 | def deduplicate(ctx, records): |
| 54 | """Remove duplicate entries, keeping the most recent per product.""" |
| 55 | seen = {} |
| 56 | for record in records: |
| 57 | key = (record["source"], record["product_id"]) |
| 58 | if key not in seen or record["timestamp"] > seen[key]["timestamp"]: |
| 59 | seen[key] = record |
| 60 | |
| 61 | ctx.log(f"Deduplicated {len(records)} -> {len(seen)} records") |
| 62 | return list(seen.values()) |
| 63 | |
| 64 | |
| 65 | @pipeline.output |
| 66 | def write_parquet(ctx, records): |
| 67 | """Write normalized pricing data to Parquet.""" |
| 68 | table = pa.Table.from_pylist(records) |
| 69 | output_path = ctx.storage.path("market-data", partition_by="date") |
| 70 | |
| 71 | pq.write_table(table, output_path, compression="snappy") |
| 72 | ctx.log(f"Wrote {len(records)} records to {output_path}") |
| 73 | |
| 74 | return Output( |
| 75 | records_written=len(records), |
| 76 | path=output_path, |
| 77 | schema=table.schema, |
| 78 | ) |
| 79 | |
| 80 | |
| 81 | def convert_currency(amount, currency, target="USD"): |
| 82 | rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.27, "CAD": 0.74} |
| 83 | return round(float(amount) * rates.get(currency, 1.0), 2) |
| 84 |
Python 3.12 · Kraken SDK v0.0.4
84 lines