Intorduction

现在的大数据应用都非常复杂,并不仅仅能够通过一个或者几个MapReduce任务来描述。更加复杂的描述大数据应用的是工作流 (workflow)。

WorkflowSim (http://www.workflowsim.org/)是由南加州大学(University of Southern California)的Weiwei Chen开发的一套开源工作流仿真软件。WorkflowSim是拓展自CloudSim的工作流仿真开源软件,可以提供工作流基础上的仿真,增加了模拟不同层次延时和故障的支持,与真实分布式环境更加接近。支持了各种task scheduling, clustering, resource provisioning的算法。广泛应用在故障容错研究,成本导向调度研究,资源调度研究,能耗研究等领域。

其工作原理是在暨有的CloudSim仿真软件基础上,提供workflow层次的仿真。工作流可以用有向图来描述(Directed Acyclic Graph) DAG,图的任何一个节点都是一个由用户制定执行的任务(task),节点之间有通过数据输入输出形成的依赖关系(dependency)。工作流引擎会依据其依赖关系按顺序执行,同时执行也会调用用户所选择的调度算法,比如HEFT,MINMIN等等算法已经在WorkflowSim里边实现并且测试通过了。

用户也可以添加自己的调度算法。

在实际运行过程中,因为很多工作流非常大,甚至到达几千上万个任务,而通常我们只有几十个计算节点。这个时候就需要使用task clustering这个技术来聚合相类似的任务从而形成任务群(clustered job),有时候也通用称之为job。每个job包含了多个task,然后提交给运行环境的时候是整体提交的,这样可以节省很多延迟(submission delay),等到了某一个计算节点的时候再打开任务群然后分别执行。

如何形成job就需要各种算法,有包括了balanced task clustering可以综合考虑依赖关系的,也有fault tolerant clustering可以综合考虑规避failure的。这些也都已经在WorkflowSim里边实现。

其行为最为接近Pegasus工作流管理系统。

WorkflowSim是一套开源软件,所有源代码和介绍都可以在http://www.github.com/WorkflowSim/找到,使用目前流行的Git做软件版本控制。

使用非常简单,下载完源代码之后,只需要在任何一个WorkflowSim example里边选择所需要的dax 文档,并且修改里边的daxPath变量,即可运行。比如选择:“examples/org/workflowsim/examples/WorkflowSimBasicExample1.java”这个例子,将里边的daxPath改成在你当前环境下的文件路径即可。

WorkflowSim在config/dax文件夹下面已经提供了很多DAX文件可供仿真。这些工作流的介绍可以参考:Documentation – Pegasus WMS

同时我们还提供WorkflowGenerator可以生成你所需要的任何工作流DAX文件https://confluence.pegasus.isi.edu/display/pegasus/WorkflowGenerator

如果对WorkflowSim感兴趣并且想进一步开发功能,可建议(但不限于)研究以下方面

  1. 基于价格的调度算法,CloudSim本身已经实现了每个task的cost,但是WorkflowSim还没有合适的调度算法。

  2. 动态资源调度算法。WorkflowSim目前还是静态的把计算节点建立好然后运行工作流。

  3. 多数据中心支持。目前只测试了单数据中心支持。

  4. 数据日志支持。将已有的工作流管理系统的log转换成为trace从而可以在WorkflowSim里边仿真?目前只测试过Pegasus。

  5. 可靠的调度算法,在仿真Failure的情况下调度算法应该如何变得更鲁棒?目前failure的生成已经开发完成,但是相应的算法还没有。

  6. 自适应算法或者分布式算法。目前仅有一个中心调度算法,分布式算法也可以在WorkflowSim上面实现。

  7. 基于duplication的算法。在计算节点有余的时候,可以自动复制一部分任务到空闲的计算节点上。

WorkflowSim中调度算法与规划算法的区别

WorkflowSim中有两种类型的算法:调度算法(Scheduling Algorithm)和计划算法(Planning Algorithm)。理论上它们是相同的,但是在实现方面它们是完全不同的。WorkflowSim 具有三层:Workflow Planner、Workflow Engine、Workflow Scheduler。

在 Workflow Planner 中,我们拥有整个工作流(所有任务及其依赖项)的全局视图,并且在每次迭代中,Workflow Engine 将free的任务(这意味着它们的父级已成功完成)发布到 Workflow Scheduler。Workflow Scheduler 将这些free的任务与资源(WorkflowSim 中的 Condor VM)进行匹配,并提交它们以供执行。分别来说,规划算法是一种全局调度算法,可以将任何任务绑定到任何资源(但实际执行顺序取决于资源可用性)。WorkflowSim 中的调度算法是一种本地调度算法,它只将空闲任务绑定到可用资源(并以某种方式保证它们的执行顺序)。

默认情况下,未设置规划算法。如果您已指定规划算法,则调度算法将被禁用,因为指定的规划算法强制工作流调度程序将任务映射到其分配的资源。

一些局部优化算法如MIN-MIN、MAX-MIN是调度算法,而一些全局优化算法如HEFT是WorkflowSim中的规划算法

SHARED和LOCAL文件系统之间的区别

SHARED文件系统只有一个用于一个数据中心的存储空间,而LOCAL文件系统也有一个用于每个VM的本地文件系统。

对于 stage-in,在 SHARED 模式下,我们在stage-in作业开始时将所有的输入文件移动到共享存储中。在 LOCAL 模式下,我们将每个任务的输入文件从最近的 VM(因为 VM 也具有本地文件系统)或共享文件系统(如果该任务分配到的 VM 可用)移动。

对于数据传输成本,在 SHARED 模式下,数据传输成本已经计入任务执行时间,因此我们不计算每个作业的数据传输成本。但我们计算了初始阶段工作的成本。在 LOCAL 模式下,增加每个作业的数据传输成本。

为什么我们需要区分它们?在实践中,我们要么拥有一个共享文件系统,例如 NFS,要么拥有一个分布式系统,例如 HDFS。如果您有一些数据感知算法来提高数据局部性,则需要使用 LOCAL。否则,如果您的算法不考虑数据,您可以使用 SHARED fs 来简化建模。

Task 和 Job 之间的区别(以及任务集群)

尽管在某些情况下我们可以互换使用Task和Job,但它们是完全不同的。任Task是用户指定运行的程序。它对应于 DAX 中的“job”,因为 DAX 是由用户创建的。WorkflowSim 中的Job是包含一个或多个Task的单个执行单元。但是,WorkflowSim 中的Job本身是从Task延伸扩展的,目的是简化一些代码。

任务聚类是一种将多个Task合并为一个Job的优化方法。根据不同的优化目的,我们最终可能会得到容错集群(最小化故障影响)、平衡任务集群(平衡数据传输成本和通信成本)等。

为什么任务集群即使它失去了一些并行性,也可以减少makespan?

任务集群只能在资源争用的情况下工作,这意味着我们没有足够的资源,必须将任务合并到任务中。例如,Montage工作流在每个级别最多可以有10,000个任务,而通常在一个小型数据集群中有20个节点。通过将这些任务合并为20或40个作业,我们仍然可以充分利用资源并提高整体运行时间。另一个重要问题是开销,作业提交、作业执行、作业准备都具有在现代分布式系统中非常重要的开销。单独执行10,000个任务最终可能会产生10,000倍的开销,而20个作业有20倍的开销。

有关任务聚类的好处的更多详细信息,请参阅任何任务聚类论文。

如何在类中增加属性?

如果你有一个新的属性要在WorkflowSim中使用,并且想要在配置文件中配置它,可以进行下面的几个步骤:

  1. 假设我们想要添加一个工作流的截止日期,并且它是一个很长的变量。在org.workflowsim.utils.Parameters.java中,添加:
private static long deadline;
 
public static void init(..., long dl) {
 
... //other parameters
 
deadline = dl;
 
}
 
public static long getDeadline(){
 
return deadline;
 
}

因为Parameters是一个静态对象,通过这种方式,我们可以在任何地方访问deadline

  1. org.workflowsim.utils.ArgumentParser.java中添加解析

    public ArgumentParser(String[] args) {
     
    long deadline = 0;
     
    if ...//other parameter parsing
     
    else if (key.equals("deadline")) {
     
    deadline = Long.parseLong(value);
     
    }
     
    Parameters.init(..., deadline);
     
    }

这意味着我们在解析配置文件时设置了参数的截止日期。

  1. config.txt中添加截止日期

config.txt**文件中添加:

deadline = 10000
  1. 然后你可以在你的调度程序中得到截止日期,比如在RandomPlanner.java中:
long deadline = Parameters.getDeadline();

类的介绍

Cloudlet

Cloudlet类对托管在云数据中心的容器中的应用程序进行建模。Cloudlet继承了CloudSim软件包中的功能。包括用户id,任务的大小,传入文件的大小。传出文件的大小,执行开始时间,完成时间和执行状态(CREATED,READY,QUEUED,CANCEL,PAUSED ,FAILED,SUCCESS和RESUMED等)等。详情见“sources\org\cloudbus\cloudsim\Cloudlet.java”。

Cloudlet类中包含一个静态内部类:Resource,它被用来跟踪Cloudlet在不同CloudResource中的移动。记录Cloudlet在一个CloudResource中的提交时间、等待时间、时间运行时间、成本等信息。

Task

Task类是CloudSim中Cloudlet类的一个扩展。它支持实现任务之间的依赖关系,其中包括一组父任务和一组其子任务。在WorkflowSim中,当所有的父任务都成功完成时,工作流引擎确保任务被释放到调度器(准备运行)。

相比Cloudlet类,Task新增了parentList,childList,taskFinishTime等属性。其中增加taskFinishTime是因为cloudlet不允许WorkflowSim更新任务的finish time。Task类中的getProcessingCost()方法用来获取处理或执行此任务的总成本。原始的Processing Cost = input data transfer + processing cost + output transfer cost

【可根据需要进行修改】。

Job

Job是Task的扩展。它基本上是一组任务。在WorkflowSim中,ClusteringEngine将任务合并为任务(任务组),并且job的总体运行时间是task运行时间的总和。

Vm

Vm 类对VM进行建模。虚拟机由主机管理和托管,它运行在一个主机内,与其他虚拟机共享hostList。VM的属性有内存,处理器及其存储大小。它根据CloudletScheduler定义的策略处理cloudlet。每个虚拟机都有一个所有者,由所有者将cloudlet提交给要执行的虚拟机。

CondorVM

Condor Vm扩展一个Vm,不同的是它有一个本地存储系统和一个状态来指示它是否忙碌:VM_STATUS_IDLE or VM_STATUS_READY (not used in workflowsim) or VM_STATUS_BUSY。

FileItem

这是WorkflowSim中的一个文件实现。由于CloudSim已经实现了File,我们在这里称之为FileItem。WorkflowSim有一个不同的文件视图。案例1:在org.cloudsim.File中,文件大小是integer类型,而在我们的例子中,它应该是double类型,因为我们有很多大的文件。此外,我们希望精确地估计传输延迟。案例2:我们指定与CloudSim中不同的文件的类型FileType (NONE,输入INPUT,输出OUTPUT)。

FileItem类中的isRealInputFile()方法,用于判断文件是否为输入文件。输入文件对应有一个输出文件,它不需要在工作流中阶段加入,我们有一个规则,一个文件只写一次,并多次读取,因此,如果一个文件是一个输出文件,这意味着它是在这个作业中生成的,然后由同一作业(或者其他作业)中的另一个任务使用。

Parameters

Parameters类包含用户可以在配置文件中指定的大多数参数。

类中定义了调度算法、规划算法、文件类型、成本模型等枚举类型,以及虚拟机个数、调度算法、规划算法、成本模型、最大深度等属性。

Datacenter

DataCenter类代表数据中心,提供虚拟化的网格资源,处理虚拟机信息的查询,包含虚拟机对资源的分配策略。数据中心类是一个CloudResource,它的主机列表是虚拟化的。它处理VM查询(即VM处理),而不是处理与cloudlet相关的查询。即使一个分配策略将被实例化(在超类的init()方法中),它也不会被使用,因为cloudlets的处理由CloudletScheduler处理,虚拟机的处理由VmAllocationPolicy处理。

类中包含DatacenterCharacteristics属性,DatacenterCharacteristics表示资源的静态属性,如资源架构、操作系统(OS)、管理策略(时间或空间共享)、成本和资源配置中所处的时区。

WorkflowDatacenter

WorkflowDatacenter扩展了Datacenter,这样我们就可以使用CondorVM和其他组件。

类中包含processCloudletSubmit()方法,它处理Cloudlet提交。cloudlet实际可以被cast到org.workflowsim.Job 。

updateTaskExecTime()方法更新job中的task的执行时间与完成时间。

stageInFile2FileSystem()方法,将处于stage-in 阶段的job所需文件加入存储。对于本地文件系统(如condor-io),将文件添加到本地存储;对于共享文件系统(如NFS),需要向共享存储添加文件。

processDataStageInForComputeJob()方法,返回job执行所需的文件传输时间个执行时间。

WorkflowParser

WorkflowParser将DAX解析为任务,以便WorkflowSim可以管理它们。

DAX的文件格式如下:

img

类中的parseXmlFile(String path)方法,根据参数DAX文件的地址path,将其转换为工作流。它遍历整个dax文件,将所有的<job></job>标签,转换为工作流中的任务task,其中id是该任务的唯一标识;runtime为该任务的长度;其内部的<uses></uses>标签代表该任务的文件。link属性表示文件的种类(input输入文件或output输出文件);size属性代表该文件的大小;其中file属性代表该文件的名字。【任务A的output输出文件名file和大小size需要与其对应的任务B的input输入文件名和大小相同!】

<child><parent></parent></child>标签表示任务之间的依赖关系,即某一个子任务child的父任务parent有哪些。其中ref属性的值对应<job>标签中的id属性。parseXmlFile()方法根据这些依赖关系,设置工作流中的某一个任务的子任务与父任务及其深度。如果一个任务没有父任务,则将其设置为root。【root可能有多个!】

SimEntity

该类表示一个模拟实体。实体可以处理事件,也可以发送事件给其他实体。当这个类被扩展时,有几个方法需要被实现:

①startEntity():在模拟启动时,由Simulation类调用。这个方法负责启动实体。

② processEvent(SimEvent):每当延迟队列中有需要由实体处理的事件时,仿真类就会调用processEvent(SimEvent)。

③ shutdownEntity():在模拟结束之前,仿真调用shutdownEntity()。如果你想将数据保存在日志文件中,这个方法将放置相应的代码。

WorkflowScheduler

WorkflowScheduler它隐藏了虚拟机管理,例如创建虚拟机、将任务交给虚拟机以及销毁虚拟机,并根据配置选择调度算法。

一个workflowScheduler代理中绑定一个workflowEngine,即其含有一个workflowEngineId属性,并通过bindSchedulerDatacenter(int datacenterId)方法将其与一个数据中心绑定。与此同时,为了保证每个任务仅提交一次,设置一个布尔类型的变量processCloudletSubmitHasShown来标记任务是否提交。除此之外,workflowScheduler代理根据不同的指令执行对应的事件,如下图:

img

WorkflowEngine

WorkflowEngine为代表用户的引擎。它隐藏了虚拟机管理,例如创建虚拟机、向虚拟机提交cloudlet和销毁虚拟机。由于WorkflowEngine类继承了SimEntity类,所以他重写了startEntity()、processEvent(SimEvent)、shutdownEntity()方法。一个workflowEngine引擎控制多个workflowScheduler代理。

hasJobListContainsID(List jobList, int id)方法:用来判断一个任务列表中是否存在某个任务。

submitJobs()方法:在提交获得的任务列表时,借助hasJobListContainsID(List jobList, int id)方法来判断某个要提交的任务的所有父任务是否已经全部执行完毕。

WorkflowPlanner

WorkflowPlanner根据配置选择规划算法。他有一个ClusteringEngine属性,即一个workflowPlanner代理与一个clusteringEngine引擎绑定,并通过clusteringEngine引擎绑定到一个workflowEngine引擎中。【ClusteringEngine类是WorkflowSim的一个可选组件,它可以将多个任务合并成一个大任务,由于目前本人还没有使用到,所以暂不介绍~】

WorkflowPlanner类中的getPlanningAlgorithm(PlanningAlgorithm name)方法可以根据需要选择规划算法,一般我们在使用WorkflowSim创建自己个人的算法时,写的算法是PlanningAlgorithm类型的规划算法。

workflowPlanner代理可以为任务添加影响因子,这对任务平衡的聚类算法很有用,我们可以根据研究需要,选择使用。方法为:processImpactFactors(List taskList) 与addImpact(Task task, double impact)。

BaseSchedulingAlgorithm

BaseSchedulingAlgorithm类继承了SchedulingAlgorithmInterface接口,它是一个基本调度器实现了基本功能。如:虚拟机的get与set,cloudlet类型的任务列表的get与set,以及主要的run()方法。其他调度方法应扩展自BaseSchedulingAlgorithm,但不应直接使用。

BasePlanningAlgorithm

BasePlanningAlgorithm类继承了PlanningAlgorithmInterface接口,它是一个基本规划器实现了基本功能。如:虚拟机的get与set,任务列表的get与set,以及主要的run()方法。其他调度方法应扩展自BasePlanningAlgorithm,但不应直接使用。

与BaseSchedulingAlgorithm不同,BasePlanningAlgorithm可以直接设置并获取Task类型的任务列表,并增加了数据中心列表DatacenterList的get与set。

【我们使用WorkflowSim创建个人的算法时,需要继承BasePlanningAlgorithm类】。

WorkflowSimBasicExample1

由其名可知,这是一个基本的WorkflowSimExample,它创建了一个工作流规划器、一个工作流引擎、一个调度器、一个数据中心和20个虚拟机。【根据实际情况在使用时更改daxPath与其他参数】。它包含4个方法:main()方法,创建虚拟机的方法createVM(),创建数据中心的方法createDatacenter(),以及输出结果的方法printJobList()。

main()方法实现了:①初始化WorkflowSim包;②初始化Parameters参数;③初始化cloudsim包;④创建数据中心;⑤创建调度算法代理;⑥创建WorkflowEngine引擎;⑦创建工作流;⑧创建虚拟机;⑨执行调度算法,完成任务到虚拟机的映射;⑩启动仿真程序、打印仿真结果以及关闭仿真程序。

我们在写好了自己的算法后,为了实现他,则需要创建一个继承了WorkflowSimExample的example来运行算法。

使用

如何创建并使用调度算法

当我们下载好WorkflowSim源码后,便可以实现一个自己的调度算法。具体步骤如下:

  1. 在source/org/planning中写一个继承BasePlanningAlgorithm的Class,类名如RKPNPlanningAlgorithm,重写run()方法。【可参考HEFTPlanningAlgorithm算法】,如图所示:

img

img

  1. 算法写完之后,需要在WorkflowPlanner的getPlanningAlgorithm()方法中增加该算法,具体操作如下:

img

img

其中的case:RKPN中的RKPN是自定义的算法名的缩写。

  1. 在参数类Parameters的枚举PlanningAlgorithm里面添加步骤2的case:RKPN,具体操作如图所示:

img

img

  1. 这样我们的一个调度算法就编写完了,如何使用呢?需要在”examples/org/workflowsim.examples/planning”中创建一个example来实现它。具体操作如下:

img

img

如图所示,在main()方法中需要根据自己的dax工作流文件的存放地址修改daxPath的值。【在“config/dax”中也有许多可供使用的工作流文件~】

img

上图修改Parameters的规划算法参数,值为在步骤(3)中设置的参数

除此之外,我们也可以根据需要自行设置虚拟机的个数vmNum等其他参数。具体操作见代码。

  1. 运行main()函数,即可在控制台获得工作流的运行结果。

参考

本文直接复制粘贴自:https://blog.csdn.net/LaraJean/article/details/123689433