Mara-pipelines 是一個輕量級的數(shù)據(jù)轉(zhuǎn)換框架,具有透明和低復(fù)雜性的特點。其他特點如下:
- 基于非常簡單的Python代碼就能完成流水線開發(fā)。
- 使用 PostgreSQL 作為數(shù)據(jù)處理引擎。
- 有Web界面可視化分析流水線執(zhí)行過程。
- 基于 Python 的 multiprocessing 單機流水線執(zhí)行。不需要分布式任務(wù)隊列。輕松調(diào)試和輸出日志。
- 基于成本的優(yōu)先隊列:首先運行具有較高成本(基于記錄的運行時間)的節(jié)點。
此外,在Mara-pipelines的Web界面中,你不僅可以查看和管理流水線及其任務(wù)節(jié)點,你還可以直接觸發(fā)這些流水線和節(jié)點,非常好用:
1.安裝
由于使用了大量的依賴,Mara-pipelines 并不適用于 Windows,如果你需要在 Windows 上使用 Mara-pipelines,請使用 Docker 或者 Windows 下的 linux 子系統(tǒng)。
使用pip安裝Mara-pipelines:
pip install mara-pipelines
或者:
pip install git+https://github.com/mara/mara-pipelines.git
2.使用示例
這是一個基礎(chǔ)的流水線演示,由三個相互依賴的節(jié)點組成,包括 任務(wù)1(ping_localhost), 子流水線(sub_pipeline), 任務(wù)2(sleep):
# 注意,這個示例中使用了部分國外的網(wǎng)站,如果無法訪問,請變更為國內(nèi)網(wǎng)站。
from mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline, run_interactively
pipeline = Pipeline(
id='demo',
description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')
pipeline.add(Task(id='ping_localhost', description='Pings localhost',
commands=[RunBash('ping -c 3 localhost')]))
sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts')
for host in ['google', 'amazon', 'facebook']:
sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}',
commands=[RunBash(f'ping -c 3 {host}.com')]))
sub_pipeline.add_dependency('ping_amazon', 'ping_facebook')
sub_pipeline.add(Task(id='ping_foo', description='Pings foo',
commands=[RunBash('ping foo')]), ['ping_amazon'])
pipeline.add(sub_pipeline, ['ping_localhost'])
pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds',
commands=[RunBash('sleep 2')]), ['sub_pipeline'])
可以看到,Task包含了多個commands,這些 command s會用于真正地執(zhí)行動作。
而 pipeline.add 的參數(shù)中,第一個參數(shù)是其節(jié)點,第二個參數(shù)是此節(jié)點的上游。如:
pipeline.add(sub_pipeline, ['ping_localhost'])
則表明必須執(zhí)行完 ping_localhost 才會執(zhí)行 sub_pipeline.
為了運行這個流水線,需要配置一個 PostgreSQL 數(shù)據(jù)庫來存儲運行時信息、運行輸出和增量處理狀態(tài):
import mara_db.auto_migration
import mara_db.config
import mara_db.dbs
mara_db.config.databases
= lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')}
mara_db.auto_migration.auto_discover_models_and_migrate()
如果 PostgresSQL 正在運行并且賬號密碼正確,輸出如下所示(創(chuàng)建了一個包含多個表的數(shù)據(jù)庫):
Created database "postgresql+psycopg2://root@localhost/example_etl_mara"
CREATE TABLE data_integration_file_dependency (
node_path TEXT[] NOT NULL,
dependency_type VARCHAR NOT NULL,
hash VARCHAR,
timestamp TIMESTAMP WITHOUT TIME ZONE,
PRIMARY KEY (node_path, dependency_type)
);
.. more tables
為了運行這個流水線,你需要:
from mara_pipelines.ui.cli import run_pipeline
run_pipeline(pipeline)

這將運行單個流水線節(jié)點及其 ( **sub_pipeline ** ) 所依賴的所有節(jié)點:
run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=True)
3.Web 界面
我認(rèn)為 mara-pipelines 最有用的是他們提供了基于Flask管控流水線的Web界面。
對于每條流水線,他們都有一個頁面顯示:
- 所有子節(jié)點的圖以及它們之間的依賴關(guān)系
- 流水線的總體運行時間圖表以及過去 30 天內(nèi)最昂貴的節(jié)點(可配置)
- 所有流水線節(jié)點及其平均運行時間和由此產(chǎn)生的排隊優(yōu)先級的表
- 流水線最后一次運行的輸出和時間線

對于每個任務(wù),都有一個頁面顯示
- 流水線中任務(wù)的上游和下游
- 最近 30 天內(nèi)任務(wù)的運行時間
- 任務(wù)的所有命令
- 任務(wù)最后運行的輸出

此外,流水線和任務(wù)可以直接從網(wǎng)頁端調(diào)用運行,這是非常棒的特點。
-
Web
+關(guān)注
關(guān)注
2文章
1302瀏覽量
73598 -
流水線
+關(guān)注
關(guān)注
0文章
127瀏覽量
27115 -
數(shù)據(jù)轉(zhuǎn)換
+關(guān)注
關(guān)注
0文章
97瀏覽量
18448 -
python
+關(guān)注
關(guān)注
57文章
4856瀏覽量
89526
發(fā)布評論請先 登錄
流水線基本結(jié)構(gòu)
周期精確的流水線仿真模型
流水線中的相關(guān)培訓(xùn)教程[1]
FPGA之流水線練習(xí)(3):設(shè)計思路
FPGA之為什么要進行流水線的設(shè)計
如何選擇合適的LED生產(chǎn)流水線輸送方式
嵌入式_流水線
基于非常簡單的Python代碼就能完成流水線開發(fā)
CPU流水線的問題
什么是流水線 Jenkins的流水線詳解
Mara-pipelines:輕量級的數(shù)據(jù)轉(zhuǎn)換框架

超級方便的輕量級Python流水線工具
評論