QuickStartΒΆ

Seaflow usage can be divided into three major steps. The first step is to configure the executor and generate instances, including scheduler configuration, data directory configuration, and plugin mounting; The second step is to register the object directly or register the node after objectification; Step 3: Process the registered callable objects; Step 4: Start the actuator directly.

The following is an example of Seaflow usage,

from seaflow.nodes.node import Node
from seaflow.runners.dagrunner import DAGRunnerNode
import time

### Declare Node Executor Instance
dagrunner = DAGRunnerNode()
### function
def test_a(a):
    time.sleep(3)
    return a + 2

def test_aa(aa):
    time.sleep(3)
    return aa + 3

def test_b(a1,aa1):
    time.sleep(2)
    return a1 + aa1 + 2

def test_bb(bb):
    time.sleep(3)
    return bb + 4

def test_c(b1,bb1):
    time.sleep(3)
    return b1 + bb1 + 2

def test_cc(cc):
    time.sleep(3)
    return cc + 5

def test_d(b1,c1,cc1,f1):
    time.sleep(3)
    return b1 + c1 + cc1 + f1

def first_func(a,b):
    time.sleep(3)
    c = a + b
    return c


### Node objectification
test_a_node = Node(func=test_a,inputs=['a'],outputs='a1',name='test_a') #
test_aa_node = Node(func=test_aa,inputs=['aa'],outputs='aa1',name='test_aa') #
test_b_node = Node(func=test_b,inputs=['a1','aa1'],outputs='b1',name='test_b') ##
test_bb_node = Node(func=test_bb,inputs=['bb'],outputs='bb1',name='test_bb') #
test_c_node = Node(func=test_c,inputs=['b1','bb1'],outputs='c1',name='test_c') ##
test_cc_node = Node(func=test_cc,inputs=['cc'],outputs='cc1',name='test_cc') #
test_d_node = Node(func=test_d,inputs=['b1','c1','cc1','f1'],outputs='d1',name='test_d') ####
first_node = Node(func=first_func,inputs=['f','cc1'],outputs='f1',name='first_func') ##


### Register on scheduler
test_a = dagrunner.scheduler.register(controller_obj=dagrunner.scheduler)(test_a_node)
test_aa = dagrunner.scheduler.register(controller_obj=dagrunner.scheduler)(test_aa_node)
test_b = dagrunner.scheduler.register(controller_obj=dagrunner.scheduler)(test_b_node)
test_bb = dagrunner.scheduler.register(controller_obj=dagrunner.scheduler)(test_bb_node)
test_c = dagrunner.scheduler.register(controller_obj=dagrunner.scheduler)(test_c_node)
test_cc = dagrunner.scheduler.register(controller_obj=dagrunner.scheduler)(test_cc_node)
test_d = dagrunner.scheduler.register(controller_obj=dagrunner.scheduler)(test_d_node)
first_func= dagrunner.scheduler.register(controller_obj=dagrunner.scheduler)(first_node)


### Registering target function dependencies to scheduler
test_a >> test_b >> test_c >> test_d
test_aa >> test_b >> test_d
test_bb >> test_c >> test_d
test_cc >> test_d
test_cc >> first_func >> test_d


### Test the call of the node
print('------>>>',first_node(100,10),first_node.run(100,10))
print(type(first_node)==Node)


### Show nodes and edges
print('~~~~~~',dagrunner.scheduler.get_graph_obj())
print('------',dagrunner.scheduler.show_nodes())
print('======',dagrunner.scheduler.show_edges())


### Show DAG diagram situation
# dagrunner.scheduler.draw()


### Calling the target function
tmp_func_a = dagrunner.scheduler.get_function_obj(function_name='test_aa')
print(type(tmp_func_a))
tmp_func_b = dagrunner.scheduler.get_function_obj(function_name='first_func')
print(tmp_func_a(10))


### Load initial parameter data into the catalog
dagrunner.catalog.save(data_name='a',data_obj=10)
dagrunner.catalog.save(data_name='aa',data_obj=10)
dagrunner.catalog.save(data_name='bb',data_obj=10)
dagrunner.catalog.save(data_name='cc',data_obj=10)
dagrunner.catalog.save(data_name='f',data_obj=10)
### Scheduler function testing
# ### Initialize Kahn work queue
# dagrunner.scheduler.init_job_queue_with_kahn()
# ### Using the Kahn algorithm to return the generator
# run_list = dagrunner.scheduler.dispath_algorithm_with_kahn()
# print(run_list,'--------------------------')
# for i in run_list:
#     print(i)
#     ray_result = dagrunner.execute_algorithm_with_ray(run_list=i)
#     print('==========',ray_result)


### Formal operation interface
start_time = time.time()
result = dagrunner.execute()
end_time = time.time()
print('cost time is {}'.format(end_time - start_time))
print(result)
print(dagrunner.catalog.load(data_name='d1'))