Pipelines

Data transformation and ingestion workflows

Compute ClusterHealthy
All pipelines execute on the selected cluster
8 vCPUs
32 GB RAM
Max 6 concurrent
Utilization
42%
Market Data ETLActive
pipelines/pip-001.py
1import pyarrow as pa
2import pyarrow.parquet as pq
3from kraken_sdk import DataSource, Pipeline, Output
4from datetime import datetime, timedelta
5
6pipeline = Pipeline("market-data-etl")
7
8@pipeline.transform
9def extract_and_normalize(ctx):
10 """Fetch pricing data from all marketplace sources and normalize."""
11 amazon = ctx.source("amazon-sp-api")
12 shopify = ctx.source("shopify")
13
14 # Pull last hour of pricing updates
15 cutoff = datetime.utcnow() - timedelta(hours=1)
16
17 amazon_prices = amazon.query(
18 "SELECT asin, price, currency, marketplace, updated_at "
19 "FROM pricing WHERE updated_at > :cutoff",
20 params={"cutoff": cutoff},
21 )
22
23 shopify_prices = shopify.query(
24 "SELECT product_id, variant_id, price, compare_at_price "
25 "FROM product_variants WHERE updated_at > :cutoff",
26 params={"cutoff": cutoff},
27 )
28
29 # Normalize to common schema
30 normalized = []
31 for row in amazon_prices:
32 normalized.append({
33 "source": "amazon",
34 "product_id": row["asin"],
35 "price_usd": convert_currency(row["price"], row["currency"]),
36 "marketplace": row["marketplace"],
37 "timestamp": row["updated_at"],
38 })
39
40 for row in shopify_prices:
41 normalized.append({
42 "source": "shopify",
43 "product_id": row["product_id"],
44 "price_usd": float(row["price"]),
45 "marketplace": "shopify-direct",
46 "timestamp": cutoff.isoformat(),
47 })
48
49 return normalized
50
51
52@pipeline.transform
53def deduplicate(ctx, records):
54 """Remove duplicate entries, keeping the most recent per product."""
55 seen = {}
56 for record in records:
57 key = (record["source"], record["product_id"])
58 if key not in seen or record["timestamp"] > seen[key]["timestamp"]:
59 seen[key] = record
60
61 ctx.log(f"Deduplicated {len(records)} -> {len(seen)} records")
62 return list(seen.values())
63
64
65@pipeline.output
66def write_parquet(ctx, records):
67 """Write normalized pricing data to Parquet."""
68 table = pa.Table.from_pylist(records)
69 output_path = ctx.storage.path("market-data", partition_by="date")
70
71 pq.write_table(table, output_path, compression="snappy")
72 ctx.log(f"Wrote {len(records)} records to {output_path}")
73
74 return Output(
75 records_written=len(records),
76 path=output_path,
77 schema=table.schema,
78 )
79
80
81def convert_currency(amount, currency, target="USD"):
82 rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.27, "CAD": 0.74}
83 return round(float(amount) * rates.get(currency, 1.0), 2)
84
Python 3.12 · Kraken SDK v0.0.4
84 lines