百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

Python实现算法:规划图技术(GraphPlanner)

toyiye 2024-07-02 02:50 18 浏览 0 评论

这篇文章涉及规划图技术分步算法实现:从伪代码和方程到Python代码。在本文中,我们将用Python实现规划图(Planning Graph)及其规划器GraphPlanner,以及AI规划的数据结构和搜索算法。

介绍

规划图是为了解决经典人工智能规划方法(又称STRIPS规划器)的复杂性问题而发展起来的。我们需要实施两个主要部分:

  • 规划图:数据结构
  • 图规划:搜索算法,为我们找到解决方案

如果你不熟悉规划图,想了解更多,请查看以下帖子:

https://towardsdatascience.com/improving-classical-ai-planning-complexity-with-planning-graph-c63d47f87018

领域和问题表示

在开始实现之前,我们需要知道如何表示规划域和规划问题。

规划图及其规划器与许多STRIPS规划器有相同的表示,因此我们将使用PDDL(规划域定义语言)来表示它们。下面是PDDL域文件的一个示例。

;; Specification in PDDL1 of the DWR domain

(define (domain dock-worker-robot-simple)
 (:requirements :strips :typing )
 (:types
  location      ; there are several connected locations in the harbor
  robot         ; holds at most 1 container, only 1 robot per location
  container)

 (:predicates
   (adjacent ?l1  ?l2 - location)       ; location ?l1 is adjacent ot ?l2
   (atl ?r - robot ?l - location)       ; robot ?r is at location ?l
   (loaded ?r - robot ?c - container )  ; robot ?r is loaded with container ?c
   (unloaded ?r - robot)                ; robot ?r is empty
   (in ?c - container ?l - location)    ; container ?c is within location ?l
   )

;; there are 3 operators in this domain:

;; moves a robot between two adjacent locations
 (:action move
     :parameters (?r - robot ?from ?to - location)
     :precondition (and (adjacent ?from ?to) (atl ?r ?from) )
     :effect (and (atl ?r ?to)
                    (not (atl ?r ?from)) ))

;; loads an empty robot with a container held by a nearby crane
 (:action load
     :parameters (?l - location ?c - container ?r - robot)
     :precondition (and (atl ?r ?l) (in ?c ?l) (unloaded ?r))
     :effect (and (loaded ?r ?c)
                    (not (in ?c ?l)) (not (unloaded ?r)) ))

;; unloads a robot holding a container with a nearby crane
 (:action unload
     :parameters (?l - location ?c - container ?r - robot)
     :precondition (and (atl ?r ?l) (loaded ?r ?c) )
     :effect (and (unloaded ?r) (in ?c ?l)
                    (not (loaded ?r ?c)) )) )

我们可以将PDDL看作JSON或XML之类的东西,这意味着我们需要一个解析器来反序列化用它编写的表示。当我在Github上搜索时,其中有几个出现了,但是有一个似乎很适合我们的项目pddlpy。

https://github.com/hfoffani/pddl-lib

但是,它在开发中不再活跃,我发现了一个bug和一些问题。因此,我决定使用它并编写一个适配器/包装器,这是一个薄层,我们添加它来修复错误并解决其他问题。

PDDL适配器

我们需要的代表如下:

  • 世界的初始状态:数据类型为set
  • 目标状态:数据类型为set
  • 运算符(也称为操作)的列表,这些运算符已用实变量实例化:数据类型为List[Operator]

我们将只使用pddlpy库中的一个接口,DomainProblem()类构造函数。

需要提供上面列出的三个接口:初始状态、目标状态和运算符列表。

我们创建了一个名为PlanningProblem的类:

class PlanningProblem(object):

    def __init__(self, dom_file: str, problem_file: str):
        self._domain_file = dom_file
        self._problem_file = problem_file
        self._domain_problem = DomainProblem(self._domain_file,
                                             self._problem_file)
        self._initial_state = self._to_set_of_tuples(self._domain_problem.
                                                     initialstate())
        self._goal_state = self._to_set_of_tuples(self._domain_problem.goals())
        self._actions = self._get_ground_operators()

库提供的状态不是我们想要的正确数据类型,因此需要将它们转换为一组元组。

我们使用set()数据类型,以便以后实现数据结构和算法。因为在经典的人工智能规划中,我们大量使用集合论,我们应该使用set()数据类型来利用内置函数来加速我们的实现。我们将在下一节看到更多内容。

我们还必须创建一个运算符列表,我们称之为操作。下面是适配器的最终代码。

class PlanningProblem(object):

    def __init__(self, dom_file: str, problem_file: str):
        self._domain_file = dom_file
        self._problem_file = problem_file
        self._domain_problem = DomainProblem(self._domain_file,
                                             self._problem_file)
        self._initial_state = self._to_set_of_tuples(self._domain_problem.
                                                     initialstate())
        self._goal_state = self._to_set_of_tuples(self._domain_problem.goals())
        self._actions = self._get_ground_operators()

    @staticmethod
    def _type_symbols(variable_type, world_objects: dict):
        # 如果变量类型是在world对象中找到的,
        # 返回对象名称列表,如robr, robq
        return (k for k, v in world_objects.items() if v == variable_type)

    def _instantiate(self, variables, world_objects: dict):
        variable_ground_space = []
        for variable_name, variable_type in variables:
            c = []
            for symbol in self._type_symbols(variable_type, world_objects):
                c.append((variable_name, symbol))
            variable_ground_space.append(c)
        return itertools.product(*variable_ground_space)

    def _get_ground_operators(self) -> List[Operator]:
        ground_operators = []
        for operator in self._domain_problem.operators():
            op = self._domain_problem.domain.operators[operator]
            for ground in self._instantiate(op.variable_list.items(),
                                            self._domain_problem.
                                            worldobjects()):
                st = dict(ground)
                gop = Operator(operator)
                gop.variable_list = st
                gop.precondition_pos = set(
                    [a.ground(st) for a in op.precondition_pos])
                gop.precondition_neg = set(
                    [a.ground(st) for a in op.precondition_neg])
                gop.effect_pos = set([a.ground(st) for a in op.effect_pos])
                gop.effect_neg = set([a.ground(st) for a in op.effect_neg])
                ground_operators.append(gop)
        return ground_operators

    @staticmethod
    def _to_set_of_tuples(state: Set[Atom]) -> Set[Tuple]:
        set_of_tuples = set()
        for atom in state:
            tup = tuple(atom.predicate)
            set_of_tuples.add(tup)
        return set_of_tuples

    @property
    def initial_state(self):
        return self._initial_state

    @property
    def goal_state(self):
        return self._goal_state

    @property
    def actions(self):
        return self._actions

我们现在可以使用这个类来解决规划领域和规划问题,并将重点放在数据结构和算法实现上。

规划图:数据结构

在这里我们将只看伪代码和方程,然后接下来的重点是将它们翻译成代码。

以下是我们如何构建规划图的伪代码:

有四个步骤,我们一个接一个地讲。

计算动作

这是指以下步骤:

其中包括两部分:

  • 对于PDDL适配器提供的所有操作,我们在当前状态下搜索可用的操作
  • 确保那些可应用的操作的前提条件不在前提条件的互斥体中
class PlanningGraph(object):

    def __init__(self, dom_file: str, problem_file: str):
        self._planning_problem = PlanningProblem(dom_file, problem_file)
        self._graph: Graph = Graph(visualize)
    
    def compute_actions(self, gr: Graph):
        graph_result = gr
        level = gr.num_of_levels
        
        # 计算 Ai
        action_list = []
        for action in self._planning_problem.actions:
            if self._applicable(action, graph_result.prop[level - 1],
                                graph_result.prop_mutexes[level - 1]):
                action_list.append(action)
        graph_result.act[level] = action_list
        
    @staticmethod
    def _applicable(action, state, preconditions_mutex) -> bool:
        if action.precondition_pos.issubset(state) and \
                action.precondition_neg.isdisjoint(state):
            applicable = True
            if preconditions_mutex is not None:
                for precondition in list(permutations(action.precondition_pos, 2)):
                    if precondition in preconditions_mutex:
                        applicable = False
                        break
        else:
            applicable = False

        return applicable)

计算前提条件

下一步是计算前提条件,也就是这一步:

这一步非常简单:

class PlanningGraph(object):

    def __init__(self, dom_file: str, problem_file: str):
        self._planning_problem = PlanningProblem(dom_file, problem_file)
        self._graph: Graph = Graph(visualize)
    
    def compute_preconditions(self, gr: Graph):
        graph_result = gr
        level = gr.num_of_levels
        
        proposition_list = set()
        for action in action_list:
            for effect in action.effect_pos:
                proposition_list.add(effect)
        graph_result.prop[level] = proposition_list
        

我们只存储计算出的动作效果。

计算操作互斥

该算法的下一步是计算Actions Mutex(操作互斥),这是一组相互抵消效果的操作。

在这个等式中有三个部分,对于动作中所有可能的排列,我们希望在列表中包括以下内容:

  • 行动的消极影响会干扰另一方的积极影响或先决条件
  • 第二部分是相同的,只是在另一个方向(b到a)
  • 第三部分是它们的前提条件是互斥
class PlanningGraph(object):

    def __init__(self, dom_file: str, problem_file: str):
        self._planning_problem = PlanningProblem(dom_file, problem_file)
        self._graph: Graph = Graph(visualize)
    
    def compute_actions_mutex(self, gr: Graph):
        graph_result = gr
        level = gr.num_of_levels
        
        action_mutex_list = []
        for action_pair in list(permutations(action_list, 2)):
            if self.compute_action_mutex(action_pair,
                                         graph_result.prop_mutexes[level - 1]):
                action_mutex_list.append(action_pair)
        graph_result.act_mutexes[level] = action_mutex_list
        
    @staticmethod
    def compute_action_mutex(pair, preconds_mutex) -> bool:
        a = pair[0]
        b = pair[1]

        # 两个动作是相互依存的
        if a.effect_neg.intersection(b.precondition_pos.union(b.effect_pos)) \
                != set():
            return True
        if b.effect_neg.intersection(a.precondition_pos.union(a.effect_pos)) \
                != set():
            return True

        # 它们的先决条件互斥
        if preconds_mutex is not None:
            for mutex in preconds_mutex:
                # (p, q)
                p = mutex[0]
                q = mutex[1]
                if p in a.precondition_pos and q in b.precondition_pos:
                    return True

        return False

计算互斥的前提条件

建立规划图的算法的最后一步是计算预条件互斥。

这意味着我们要寻找一对互斥的前提条件。它们是互斥的当且仅当:

  • 对于同时产生p和q的所有操作对,它们都在actions Mutex列表中
  • 没有同时产生p和q的单一操作
class PlanningGraph(object):

    def __init__(self, dom_file: str, problem_file: str):
        self._planning_problem = PlanningProblem(dom_file, problem_file)
        self._graph: Graph = Graph()
    
    def compute_preconditions_mutex(self, gr: Graph):
        proposition_mutex_list = []
        for proposition_pair in list(permutations(proposition_list, 2)):
            if self.compute_precondition_mutex(proposition_pair, action_list, action_mutex_list):
                if proposition_pair not in proposition_mutex_list:
                    swapped = (proposition_pair[1], proposition_pair[0])
                    if swapped not in proposition_mutex_list:
                        proposition_mutex_list.append(proposition_pair)
        graph_result.prop_mutexes[level] = proposition_mutex_list
        
    @staticmethod
    def compute_precondition_mutex(proposition_pair, action_list, action_mutex):
        p = proposition_pair[0]
        q = proposition_pair[1]

        for action in action_list:
            if p in action.effect_pos and q in action.effect_pos:
                # (p, q)不是互斥对象,如果它们都是由同一个动作产生的
                return False

        # 每一个产生p的动作
        actions_with_p = set()
        for action in action_list:
            if p in action.effect_pos:
                actions_with_p.add(action)

        # 每一个产生q的动作
        actions_with_q = set()
        for action in action_list:
            if q in action.effect_pos:
                actions_with_q.add(action)

        all_mutex = True
        for p_action in actions_with_p:
            for q_action in actions_with_q:
                if p_action == q_action:
                    return False
                if (p_action, q_action) not in action_mutex:
                    all_mutex = False
                    break
            if not all_mutex:
                break

        return all_mutex

我们现在已经完成了构建数据结构的代码,规划图。为了帮助调试,可以使用pydot扩充代码以生成图形可视化。下面是一个例子。

搜索算法:GraphPlanner

我们现在已经准备好数据结构,可以开始实现搜索算法,为计划问题找到解决方案。

该算法是递归的,分为三个部分:

  • 计划
  • 提取
  • 搜索

提取和搜索

这两个步骤是递归的,算法如下。第一部分是Extract:

下一部分是Search:

这是这两个函数如何递归工作的示例:

它递归地调用Search(),直到所有的命题都被解析,并调用Extract()进入规划图的下一级。

Python编写如下:

class GraphPlanner(object):

    def __init__(self):
        self._layered_plan: LayeredPlan = LayeredPlan()
        self._mutex = {}
        
    def _extract(self, gr: Graph, g: set, index: int):
        if index == 0:
            return Plan()
        return self._search(gr, g, Plan(), index)

    def _search(self, gr: Graph, g: set, plan: Plan, index: int):
        if g == set():
            new_goals = set()
            for action in plan.plan:
                for proposition in action.precondition_pos:
                    if 'adjacent' not in proposition:
                        new_goals.add(proposition)

            extracted_plan = self._extract(gr, new_goals, index-1)
            if extracted_plan is None:
                return None
            else:
                self._layered_plan[index-1] = extracted_plan
                self._layered_plan[index] = plan
                return plan
        else:
            # 选择g中的任意p
            proposition = g.pop()

            # 计算解析器
            resolvers = []
            for action in gr.act[index]:
                if proposition in action.effect_pos:
                    if plan.plan:
                        mutex = False
                        for action2 in plan.plan:
                            if (action, action2) in \
                                    gr.act_mutexes[index]:
                                mutex = True
                                break
                        if not mutex:
                            resolvers.append(action)
                    else:
                        resolvers.append(action)

            # 没有解析器
            if not resolvers:
                return None

            # 选择非确定性,如果失败则回溯
            while resolvers:
                resolver = resolvers.pop()
                plan.append(resolver)
                plan_result = self._search(gr, g - resolver.effect_pos,
                                           plan, index)
                if plan_result is not None:
                    return plan_result
                else:
                    plan.remove(resolver)
                    g.add(proposition)
            return None

主程序

我们到达了算法的最后一步、以下是主要步骤和入口点:

在某些情况下,我们需要计划更多的步骤来创建解决方案计划,需要展开规划图并重试搜索。

我们还需要添加一个额外的步骤,以确保算法在没有可能的解决方案时终止。下面是最后一段代码:

class GraphPlanner(object):

    def __init__(self):
        self._layered_plan: LayeredPlan = LayeredPlan()
        self._mutex = {}
       
    def plan(self, gr: Graph, g: set):
        index = gr.num_of_levels - 1

        if not g.issubset(gr.prop[index]):
            return None

        plan = self._extract(gr, g, index)
        if plan:
            return self._layered_plan

        if gr.fixed_point:
            n = 0
            try:
                props_mutex = self._mutex[gr.num_of_levels - 1]
            except KeyError:
                props_mutex = None
            if props_mutex:
                n = len(props_mutex)
        else:
            n = 0

        while True:
            index += 1
            gr = PlanningGraph.expand(gr)
            plan = self._extract(gr, g, index)
            if plan:
                return self._layered_plan
            elif gr.fixed_point:
                try:
                    props_mutex = self._mutex[gr.num_of_levels-1]
                except KeyError:
                    props_mutex = None
                if props_mutex:
                    if n == len(props_mutex):
                        # 这意味着它已经稳定下来
                        return None
                    else:
                        n = len(props_mutex)

结尾

我意识到要描述实现这个算法的思想过程并不容易。但希望至少你能对如何实现从等式、伪代码到Python代码的算法有一些基本的理解。

完整代码可在Github上获得,如下所示:

https://github.com/debbynirwan/planning_graph

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码