百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

Greenplum Python专用库gppylib学习——base.py

toyiye 2024-08-30 02:48 5 浏览 0 评论

base.py依赖的python包(Queue,threading,os,signal,subprocess/subprocess32,sys,time,warnings,paramiko,getpass),依赖的gp包(gplog,gpsubprocess,pygresql)。pygresql导入语句的是from pygresql.pg import DB,主要使用的DB是SQLCommand类,这个类先不用关注。gpsubprocess是对subprocess的封装,可以看到这里使用了两个子进程包gpsubprocess和subprocess。

代码分析

WorkerPool类

先看WorkPool类定义,类实例包含了存放worker实例的列表、存放带执行Command的work_queue队列、存放执行完Command的completed_queue队列。WorkerPool中的worker实例的数量是在构造函数就给定的,初始化后worker实例会一直运行,图中的start就是在构造函数中完成的。Worker实例从work_queue队列中取工作项Command的函数是getNetWorkItem。

def getNextWorkItem(self):

return self.work_queue.get(block=True)

Worker实例处理完成work_queue中所有命令之后,取不到命令或取到的命令是halt_command,或者任务池标志了should_stop之后,使用markTaskDone函数告知WorkPool该任务完成。

def markTaskDone(self):

self.work_queue.task_done()

clsSystemState.py的GpSystemStateProgram类中run函数中有base文件中类的使用示例,简化如下,通过这些示例来学习任务池的使用:

dispatchCount = 0

pool = base.WorkerPool(parallelDegree) #parallelDegree给定worker个数

for hostName, segments in ...:

cmd = ...

hostNameToCmd[hostName] = cmd

pool.addCommand(cmd)

dispatchCount+=1

pool.wait_and_printdots(dispatchCount)

hostNameToResults = {}

for hostName, cmd in hostNameToCmd.iteritems():

hostNameToResults[hostName] = cmd.decodeResults() #取出结果集

pool.haltWork()

主要流程:通过addCommand函数向队列(work_queue)中添加工作负载(WorkerPool类可以通过构造函数向队列中添加多个命令(列表形式),或者通过addCommand函数添加单个命令)。针对work_queue队列由三种join的方法,代码如下。hostNameCmd是一个字典,键为hostName,值为cmd。通过cmd.decodeResults函数取出结果集。

def join(self):

self.work_queue.join()

return True

def _join_work_queue_with_timeout(self, timeout):

"""

Queue.join() unfortunately doesn't take a timeout (see

https://bugs.python.org/issue9634). Fake it here, with a solution

inspired by notes on that bug report.

XXX This solution uses undocumented Queue internals (though they are not

underscore-prefixed...).

"""

done_condition = self.work_queue.all_tasks_done

done_condition.acquire()

try:

while self.work_queue.unfinished_tasks:

if (timeout <= 0):

# Timed out.

return


start_time = time.time()

done_condition.wait(timeout)

timeout -= (time.time() - start_time)

finally:

done_condition.release()

def wait_and_printdots(self, command_count, quiet=True):

while self.completed_queue.qsize() < command_count:

time.sleep(1)


if not quiet:

sys.stdout.write(".")

sys.stdout.flush()

if not quiet:

print " "

self.join()


Queue/queue模块的类

属性 描述

Queue(maxsize=0) 创建一个先入先出队列。如果给定最大值,则在队列没有空间时阻塞;否则,为无限队列

LifoQueue(maxsize=0) 创建一个后入先出队列。如果给定最大值,则在队列没有空间时阻塞;否则,为无限队列

PriorityQueue(maxsize=0) 创建一个优先级队列。如果给定最大值,则在队列没有空间时阻塞,否则,为无限队列

Queue/queue异常

属性 描述

Empty 当对空队列调用get*()方法时抛出异常

Full 当对已满的队列调用put*()方法时抛出异常

Worker类

worker类继承自threading模块中的Thread类,run函数先使用getNextWorkItem函数取得command,总共有四种情况:任务池中没有command,该Worker示例需要向任务池标记任务完成;如果取得的命令是pool.halt_command,该Worker示例需要向任务池标记任务完成;如果任务池标记了should_stop,该Worker示例需要向任务池标记任务完成;下面是正常流程,执行命令,并将命令放入任务池完成队列。

class Woker(Thread):

...

def run(self):

while True:

try:

try:

self.cmd = self.pool.getNextWorkItem()

except TypeError:

# misleading exception raised during interpreter shutdown

return

# we must have got a command to run here

if self.cmd is None:

self.logger.debug("[%s] got a None cmd" % self.name)

self.pool.markTaskDone()

elif self.cmd is self.pool.halt_command:

self.logger.debug("[%s] got a halt cmd" % self.name)

self.pool.markTaskDone()

self.cmd = None

return

elif self.pool.should_stop:

self.logger.debug("[%s] got cmd and pool is stopped: %s" % (self.name, self.cmd))

self.pool.markTaskDone()

self.cmd = None

else:

self.logger.debug("[%s] got cmd: %s" % (self.name, self.cmd.cmdStr))

self.cmd.run()

self.logger.debug("[%s] finished cmd: %s" % (self.name, self.cmd))

self.pool.addFinishedWorkItem(self.cmd)

self.cmd = None

except Exception, e:

self.logger.exception(e)

if self.cmd:

self.logger.debug("[%s] finished cmd with exception: %s" % (self.name, self.cmd))

self.pool.addFinishedWorkItem(self.cmd)

self.cmd = None

def haltWork(self):

self.logger.debug("[%s] haltWork" % self.name)

c = self.cmd

if c is not None and isinstance(c, Command):

c.interrupt()

c.cancel()

threading模块的Thread类实例化表示一个执行线程的对象,拥有的数据属性name(线程名)、ident(线程的标识符)、daemon(布尔标志,表示这个线程是否是守护线程),方法如下:

Thread对象方法描述

_ini_(group=None, target=None, name=None, args=(), kwargs={}, verbose=None, daemon=None) 实例化一个线程对象,需要有一个可调用的target,以及其参数args或kwargs。还可以传递name或group参数,不过后者还未实现。此外,verbose标志也是可以接受的。而daemon的值将会设定thread.daemon属性/标志

使用Thread类可以有很多方法创建线程,这里介绍三种方法:1. 创建Thread的实例,传给它一个函数;2. 创建Thread的实例,传给它一个可调用的类实例;3.派生Thread的子类,并创建子类的实例。

Command类

Command类有两个执行函数runNoWait和run函数,runNoWait函数通过调用exec_context.execute(self,wait=False)函数执行命令,并返回proc;run函数直接调用exec_context.execute(self)函数。

def runNoWait(self):

faultPoint = os.getenv('GP_COMMAND_FAULT_POINT')

if not faultPoint or (self.name and not self.name.startswith(faultPoint)):

self.exec_context.execute(self, wait=False)

return self.exec_context.proc

def run(self, validateAfter=False):

faultPoint = os.getenv('GP_COMMAND_FAULT_POINT')

if not faultPoint or (self.name and not self.name.startswith(faultPoint)):

self.exec_context.execute(self)

else:

# simulate error

self.results = CommandResult(1, 'Fault Injection', 'Fault Injection', False, True)

if validateAfter:

self.validate()

pass

ExecutionContext类

以RemoteExecutionContext执行上下文类为例,参数是以以下方法处理的

keys = sorted(cmd.propagate_env_map.keys(), reverse=True) for k in keys: cmd.cmdStr = "%s=%s && %s" % (k, cmd.propagate_env_map[k], cmd.cmdStr),将参数序列化到cmdStr中。对于LocalExecutionContext来说,调用如下命令执行命令:self.proc = gpsubprocess.Popen(cmd.cmdStr, env=None, shell=True,executable='/bin/bash',stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE, close_fds=True),对GP封装的subprocess模式请参见相应其他系列的博客。如果需要等待子进程,则调用(rc, stdout_value, stderr_value) = self.proc.communicate2(input=self.stdin),然后使用cmd.set_results(CommandResult( rc, "".join(stdout_value), "".join(stderr_value), self.completed, self.halt)) def cancel(self, cmd):封装命令返回的结果。

class LocalExecutionContext(ExecutionContext):

proc = None

halt = False

completed = False

def __init__(self, stdin):

ExecutionContext.__init__(self)

self.stdin = stdin

pass

def execute(self, cmd, wait=True):

# prepend env. variables from ExcecutionContext.propagate_env_map

# e.g. Given {'FOO': 1, 'BAR': 2}, we'll produce "FOO=1 BAR=2 ..."

# also propagate env from command instance specific map

keys = sorted(cmd.propagate_env_map.keys(), reverse=True)

for k in keys:

cmd.cmdStr = "%s=%s && %s" % (k, cmd.propagate_env_map[k], cmd.cmdStr)

# executable='/bin/bash' is to ensure the shell is bash. bash isn't the

# actual command executed, but the shell that command string runs under.

self.proc = gpsubprocess.Popen(cmd.cmdStr, env=None, shell=True,executable='/bin/bash',stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE, close_fds=True)

cmd.pid = self.proc.pid

if wait:

(rc, stdout_value, stderr_value) = self.proc.communicate2(input=self.stdin)

self.completed = True

cmd.set_results(CommandResult(

rc, "".join(stdout_value), "".join(stderr_value), self.completed, self.halt))

def cancel(self, cmd):

if self.proc:

try:

os.kill(self.proc.pid, signal.SIGTERM)

except OSError:

pass

def interrupt(self, cmd):

self.halt = True

if self.proc:

self.proc.cancel()

class RemoteExecutionContext(LocalExecutionContext):

trail = set()

def __init__(self, targetHost, stdin, gphome=None):

LocalExecutionContext.__init__(self, stdin)

self.targetHost = targetHost

if gphome:

self.gphome = gphome

else:

self.gphome = GPHOME

def execute(self, cmd):

# prepend env. variables from ExcecutionContext.propagate_env_map

# e.g. Given {'FOO': 1, 'BAR': 2}, we'll produce "FOO=1 BAR=2 ..."

self.__class__.trail.add(self.targetHost)

# also propagate env from command instance specific map

keys = sorted(cmd.propagate_env_map.keys(), reverse=True)

for k in keys:

cmd.cmdStr = "%s=%s && %s" % (k, cmd.propagate_env_map[k], cmd.cmdStr)

# Escape " for remote execution otherwise it interferes with ssh

cmd.cmdStr = cmd.cmdStr.replace('"', '\\"')

cmd.cmdStr = "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=60 " \

"{targethost} \"{gphome} {cmdstr}\"".format(targethost=self.targetHost,

gphome=". %s/greenplum_path.sh;" % self.gphome,

cmdstr=cmd.cmdStr)

LocalExecutionContext.execute(self, cmd)

if (cmd.get_results().stderr.startswith('ssh_exchange_identification: Connection closed by remote host')):

self.__retry(cmd)

pass

def __retry(self, cmd, count=0):

if count == SSH_MAX_RETRY:

return

time.sleep(SSH_RETRY_DELAY)

LocalExecutionContext.execute(self, cmd)

if (cmd.get_results().stderr.startswith('ssh_exchange_identification: Connection closed by remote host')):

self.__retry(cmd, count + 1)

而RemoteExecutionContext先对参数进行格式化后,还需要对双引号使用双斜线进行替换后,然后添加ssh相关命令选项,代码如下所示:cmd.cmdStr = cmd.cmdStr.replace('"', '\\"') cmd.cmdStr = "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=60 {targethost} \"{gphome} {cmdstr}\"".format(targethost=self.targetHost, gphome=". %s/greenplum_path.sh;" % self.gphome, cmdstr=cmd.cmdStr),然后调用LocalExecutionContext父类的execute函数。如果返回的结果包含ssh_exchange_identification: Connection closed by remote host,则需要进行等待相应的时间,然后进行重试。

Controller-Worker架构模式

辅助说明:

Controller-Worker是一种组合架构模式,Controller基于Client的参数动态生成Woker数量,并控制Woker的生命周期,如创建和终止。

Controller属性:

Controller事先知道自身拥有的Woker类型。

Controller依赖一个工作任务池,通过工作任务池Controller监控整体任务执行情况。

Worker属性:

Worker并行消费工作任务池中任务,并把执行结果返回到任务池中。

Worker彼此间没有任何耦合。

辅助说明:

Controller通过WorkerPool和Worker进行命令传递。

Controller通过超时机制,保证最后一定有命令结果返回给Client

Controller通过halt命令,停止所有的Woker

Worker采用Thread方式来实现。

Worker1、Worker2、WorkerN无差别,根据获取的Cmd,通过ssh方式在对应的Host执行命令。

相关推荐

# Python 3 # Python 3字典Dictionary(1)

Python3字典字典是另一种可变容器模型,且可存储任意类型对象。字典的每个键值(key=>value)对用冒号(:)分割,每个对之间用逗号(,)分割,整个字典包括在花括号({})中,格式如...

Python第八课:数据类型中的字典及其函数与方法

Python3字典字典是另一种可变容器模型,且可存储任意类型对象。字典的每个键值...

Python中字典详解(python 中字典)

字典是Python中使用键进行索引的重要数据结构。它们是无序的项序列(键值对),这意味着顺序不被保留。键是不可变的。与列表一样,字典的值可以保存异构数据,即整数、浮点、字符串、NaN、布尔值、列表、数...

Python3.9又更新了:dict内置新功能,正式版十月见面

机器之心报道参与:一鸣、JaminPython3.8的热乎劲还没过去,Python就又双叒叕要更新了。近日,3.9版本的第四个alpha版已经开源。从文档中,我们可以看到官方透露的对dic...

Python3 基本数据类型详解(python三种基本数据类型)

文章来源:加米谷大数据Python中的变量不需要声明。每个变量在使用前都必须赋值,变量赋值以后该变量才会被创建。在Python中,变量就是变量,它没有类型,我们所说的"类型"是变...

一文掌握Python的字典(python字典用法大全)

字典是Python中最强大、最灵活的内置数据结构之一。它们允许存储键值对,从而实现高效的数据检索、操作和组织。本文深入探讨了字典,涵盖了它们的创建、操作和高级用法,以帮助中级Python开发...

超级完整|Python字典详解(python字典的方法或操作)

一、字典概述01字典的格式Python字典是一种可变容器模型,且可存储任意类型对象,如字符串、数字、元组等其他容器模型。字典的每个键值key=>value对用冒号:分割,每个对之间用逗号,...

Python3.9版本新特性:字典合并操作的详细解读

处于测试阶段的Python3.9版本中有一个新特性:我们在使用Python字典时,将能够编写出更可读、更紧凑的代码啦!Python版本你现在使用哪种版本的Python?3.7分?3.5分?还是2.7...

python 自学,字典3(一些例子)(python字典有哪些基本操作)

例子11;如何批量复制字典里的内容2;如何批量修改字典的内容3;如何批量修改字典里某些指定的内容...

Python3.9中的字典合并和更新,几乎影响了所有Python程序员

全文共2837字,预计学习时长9分钟Python3.9正在积极开发,并计划于今年10月发布。2月26日,开发团队发布了alpha4版本。该版本引入了新的合并(|)和更新(|=)运算符,这个新特性几乎...

Python3大字典:《Python3自学速查手册.pdf》限时下载中

最近有人会想了,2022了,想学Python晚不晚,学习python有前途吗?IT行业行业薪资高,发展前景好,是很多求职群里严重的香饽饽,而要进入这个高薪行业,也不是那么轻而易举的,拿信工专业的大学生...

python学习——字典(python字典基本操作)

字典Python的字典数据类型是基于hash散列算法实现的,采用键值对(key:value)的形式,根据key的值计算value的地址,具有非常快的查取和插入速度。但它是无序的,包含的元素个数不限,值...

324页清华教授撰写【Python 3 菜鸟查询手册】火了,小白入门字典

如何入门学习python...

Python3.9中的字典合并和更新,了解一下

全文共2837字,预计学习时长9分钟Python3.9正在积极开发,并计划于今年10月发布。2月26日,开发团队发布了alpha4版本。该版本引入了新的合并(|)和更新(|=)运算符,这个新特性几乎...

python3基础之字典(python中字典的基本操作)

字典和列表一样,也是python内置的一种数据结构。字典的结构如下图:列表用中括号[]把元素包起来,而字典是用大括号{}把元素包起来,只不过字典的每一个元素都包含键和值两部分。键和值是一一对应的...

取消回复欢迎 发表评论:

请填写验证码