dask安装

 
准备要安装的机器节点,并在每个节点上执行 

pip install dask[complete]

启动调度器

 
多个节点由一台调度器控制,Python只连接调度器,调度器控制多节点资源的分配及使用

启动调度器:在一台服务器上启动调度器,通常称为“控制节点”,命令如下

dask-scheduler --host 0.0.0.0 --port 8786 --dashboard-address 8787
    

向调度器添加工作节点

 
启动工作节点:在每台工作节点上启动工作节点,连接到调度器,命令如下:

dask-worker tcp://scheduler-ip:8786 --nthreads 4 --memory-limit 4GB

其中scheduler-ip是调度器所在服务器的IP地址。启动成功后,调度器会显示工作节点的注册信息。

添加第二个节点

 
dask-worker tcp://scheduler-ip:8786 --nthreads 4 --memory-limit 4GB
    
--scheduler-ip: 调度器的IP地址。
--nthreads:     指定每个工作节点的线程数。根据你的机器配置调整此参数。
--memory-limit: 指定每个工作节点的内存限制。根据你的机器配置调整此参数。

验证新节点

 
查看调度器日志:
在调度器的控制台或日志中,你应该会看到新工作节点的注册信息。调度器会显示新节点的连接状态和资源信息。

使用Dask Dashboard:如果你启用了Dask Dashboard(默认端口为8787),
可以通过浏览器访问http://scheduler-ip:8787,查看集群的状态和资源使用情况。
在Dashboard中,你应该能看到新加入的工作节点。
    

 

    

 
from dask.distributed import Client

# 连接到集群
client = Client('scheduler-ip:8786')

# 查看集群信息
print(client)

# 示例:使用Dask DataFrame在集群上进行计算
import dask.dataframe as dd
import pandas as pd

# 创建一个Dask DataFrame
df = dd.from_pandas(pd.read_csv('large_data.csv'), npartitions=4)

# 执行筛选操作
filtered_df = df[df['column_name'] > 10]

# 执行聚合操作
result = filtered_df.groupby('group_column').mean().compute()

print(result)

    

 
运行上述代码后,Dask会自动将任务分发到所有工作节点上,包括新加入的节点。
你可以通过Dask Dashboard或调度器日志来观察任务的执行情况和资源使用情况。
    

注意事项

 
网络配置:
确保新节点可以访问调度器所在的服务器,并且调度器所在的服务器也可以访问新节点。
如果存在防火墙或网络限制,需要相应地配置网络规则。

资源匹配:
根据新节点的硬件配置(如CPU核心数、内存大小等),
合理设置--nthreads和--memory-limit参数,以充分利用新节点的资源。

动态扩展:
Dask支持动态扩展和收缩集群。
如果需要,你可以在运行时动态地添加或移除工作节点,而无需停止整个集群。

    

 

    

 


 

  

 


参考
    sklearn数据集分割方法汇总