欢迎来到山村网

python实现的文件同步服务器实例

2019-03-02 13:47:10浏览:539 来源:山村网   
核心摘要:  本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:  服务端使用asyncore, 收到文件后

  本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:

  服务端使用asyncore, 收到文件后保存到本地。

  客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。

  重点:

  1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。

  2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。

  上代码:

  服务端:

  ?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 # receive file from client and store them into file use asyncore.# #/usr/bin/python #coding: utf-8 import asyncore import socket from socket import errno import logging import time import sys import struct import os import fcntl import threading from rrd_graph import MakeGraph try: import rrdtool except (importError, importWarnning): print "Hope this information can help you:" print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu." sys.exit(1) class RequestHandler(asyncore.dispatcher): def __init__(self, sock, map=None, chunk_size=1024): self.logger = logging.getLogger('%s-%s' % (self.__class__.__name__, str(sock.getsockname()))) self.chunk_size = chunk_size asyncore.dispatcher.__init__(self,sock,map) self.data_to_write = list() def readable(self): #self.logger.debug("readable() called.") return True def writable(self): response = (not self.connected) or len(self.data_to_write) #self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write))) return response def handle_write(self): data = self.data_to_write.pop() #self.logger.debug("handle_write()->%s size: %s",data.rstrip('rn'),len(data)) sent = self.send(data[:self.chunk_size]) if sent < len(data): remaining = data[sent:] self.data_to_write.append(remaining) def handle_read(self): self.writen_size = 0 nagios_perfdata = '../perfdata' head_packet_format = "!LL128s128sL" head_packet_size = struct.calcsize(head_packet_format) data = self.recv(head_packet_size) if not data: return filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data) filepath = os.path.join(nagios_perfdata, filepath[:filepath_len]) filename = filename[:filename_len] self.logger.debug("update file: %s" % filepath + '/' + filename) try: if not os.path.exists(filepath): os.makedirs(filepath) except OSError: pass self.fd = open(os.path.join(filepath,filename), 'w') #self.fd = open(filename,'w') if filesize > self.chunk_size: times = filesize / self.chunk_size first_part_size = times * self.chunk_size second_part_size = filesize % self.chunk_size while 1: try: data = self.recv(self.chunk_size) #self.logger.debug("handle_read()->%s size.",len(data)) except socket.error,e: if e.args[0] == errno.EWOULDBLOCK: print "EWOULDBLOCK" time.sleep(1) else: #self.logger.debug("Error happend while receive data: %s" % e) break else: self.fd.write(data) self.fd.flush() self.writen_size += len(data) if self.writen_size == first_part_size: break #receive the packet at last while 1: try: data = self.recv(second_part_size) #self.logger.debug("handle_read()->%s size.",len(data)) except socket.error,e: if e.args[0] == errno.EWOULDBLOCK: print "EWOULDBLOCK" time.sleep(1) else: #self.logger.debug("Error happend while receive data: %s" % e) break else: self.fd.write(data) self.fd.flush() self.writen_size += len(data) if len(data) == second_part_size: break elif filesize <= self.chunk_size: while 1: try: data = self.recv(filesize) #self.logger.debug("handle_read()->%s size.",len(data)) except socket.error,e: if e.args[0] == errno.EWOULDBLOCK: print "EWOULDBLOCK" time.sleep(1) else: #self.logger.debug("Error happend while receive data: %s" % e) break else: self.fd.write(data) self.fd.flush() self.writen_size += len(data) if len(data) == filesize: break self.logger.debug("File size: %s" % self.writen_size) class SyncServer(asyncore.dispatcher): def __init__(self,host,port): asyncore.dispatcher.__init__(self) self.debug = True self.logger = logging.getLogger(self.__class__.__name__) self.create_socket(socket.AF_INET,socket.SOCK_STREAM) self.set_reuse_addr() self.bind((host,port)) self.listen(2000) def handle_accept(self): client_socket = self.accept() if client_socket is None: pass else: sock, addr = client_socket #self.logger.debug("Incoming connection from %s" % repr(addr)) handler = RequestHandler(sock=sock) class RunServer(threading.Thread): def __init__(self): super(RunServer,self).__init__() self.daemon = False def run(self): server = SyncServer('',9999) asyncore.loop(use_poll=True) def StartServer(): logging.basicConfig(level=logging.DEBUG, format='%(name)s: %(message)s', ) RunServer().start() #MakeGraph().start() if __name__ == '__main__': StartServer()

  客户端:

  ?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 # monitor path with inotify(python module), and send them to remote server.# # use sendfile(2) instead of send function in socket, if we have python-sendfile installed.# import socket import time import os import sys import struct import threading import Queue try: import pyinotify except (importError, importWarnning): print "Hope this information can help you:" print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu." sys.exit(1) try: from sendfile import sendfile except (importError,importWarnning): pass filetype_filter = [".rrd",".xml"] def check_filetype(pathname): for suffix_name in filetype_filter: if pathname[-4:] == suffix_name: return True try: end_string = pathname.rsplit('.')[-1:][0] end_int = int(end_string) except: pass else: # means pathname endwith digit return False class sync_file(threading.Thread): def __init__(self, addr, events_queue): super(sync_file,self).__init__() self.daemon = False self.queue = events_queue self.addr = addr self.chunk_size = 1024 def run(self): while 1: event = self.queue.get() if check_filetype(event.pathname): print time.asctime(),event.maskname, event.pathname filepath = event.path.split('/')[-1:][0] filename = event.name filesize = os.stat(os.path.join(event.path, filename)).st_size sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) filepath_len = len(filepath) filename_len = len(filename) sock.connect(self.addr) offset = 0 data = struct.pack("!LL128s128sL",filepath_len, filename_len, filepath,filename,filesize) fd = open(event.pathname,'rb') sock.sendall(data) if "sendfile" in sys.modules: # print "use sendfile(2)" while 1: sent = sendfile(sock.fileno(), fd.fileno(), offset, self.chunk_size) if sent == 0: break offset += sent else: # print "use original send function" while 1: data = fd.read(self.chunk_size) if not data: break sock.send(data) sock.close() fd.close() class EventHandler(pyinotify.ProcessEvent): def __init__(self, events_queue): super(EventHandler,self).__init__() self.events_queue = events_queue def my_init(self): pass def process_IN_CLOSE_WRITE(self,event): self.events_queue.put(event) def process_IN_MOVED_TO(self,event): self.events_queue.put(event) def start_notify(path, mask, sync_server): events_queue = Queue.Queue() sync_thread_pool = list() for i in range(500): sync_thread_pool.append(sync_file(sync_server, events_queue)) for i in sync_thread_pool: i.start() wm = pyinotify.WatchManager() notifier = pyinotify.Notifier(wm,EventHandler(events_queue)) wdd = wm.add_watch(path,mask,rec=True) notifier.loop() def do_notify(): perfdata_path = '/var/lib/pnp4nagios/perfdata' mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO sync_server = ('127.0.0.1',9999) start_notify(perfdata_path,mask,sync_server) if __name__ == '__main__': do_notify()

  python监视线程池

  ?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 #!/usr/bin/python import threading import time class Monitor(threading.Thread): def __init__(self, *args,**kwargs): super(Monitor,self).__init__() self.daemon = False self.args = args self.kwargs = kwargs self.pool_list = [] def run(self): print self.args print self.kwargs for name,value in self.kwargs.items(): obj = value[0] temp = {} temp[name] = obj self.pool_list.append(temp) while 1: print self.pool_list for name,value in self.kwargs.items(): obj = value[0] parameters = value[1:] died_threads = self.cal_died_thread(self.pool_list,name) print "died_threads", died_threads if died_threads >0: for i in range(died_threads): print "start %s thread..." % name t = obj[0].__class__(*parameters) t.start() self.add_to_pool_list(t,name) else: break time.sleep(0.5) def cal_died_thread(self,pool_list,name): i = 0 for item in self.pool_list: for k,v in item.items(): if name == k: lists = v for t in lists: if not t.isAlive(): self.remove_from_pool_list(t) i +=1 return i def add_to_pool_list(self,obj,name): for item in self.pool_list: for k,v in item.items(): if name == k: v.append(obj) def remove_from_pool_list(self, obj): for item in self.pool_list: for k,v in item.items(): try: v.remove(obj) except: pass else: return

  使用方法:

  ?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 rrds_queue = Queue.Queue() make_rrds_pool = [] for i in range(5): make_rrds_pool.append(MakeRrds(rrds_queue)) for i in make_rrds_pool: i.start() make_graph_pool = [] for i in range(5): make_graph_pool.append(MakeGraph(rrds_queue)) for i in make_graph_pool: i.start() monitor = Monitor(make_rrds_pool=(make_rrds_pool, rrds_queue), make_graph_pool=(make_graph_pool, rrds_queue)) monitor.start()

  解析:

  1. 接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。

  2. 每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。

  3. 如果没有线程死去,则什么也不做。

  从外部调用Django模块

  ?

1 2 3 4 5 6 7 8 import os import sys sys.path.insert(0,'/data/cloud_manage') from django.core.management import setup_environ import settings setup_environ(settings) from common.monitor import Monitor from django.db import connection, transaction

  前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.

  这样不仅可以调用django自身的模块,还能调用project本身的东西。

  希望本文所述对大家的Python程序设计有所帮助。

(责任编辑:豆豆)
下一篇:

C++不使用变量求字符串长度strlen函数的实现方法

上一篇:

PowerShell在控制台输出特殊符号的方法

  • 信息二维码

    手机看新闻

  • 分享到
打赏
免责声明
• 
本文仅代表作者个人观点,本站未对其内容进行核实,请读者仅做参考,如若文中涉及有违公德、触犯法律的内容,一经发现,立即删除,作者需自行承担相应责任。涉及到版权或其他问题,请及时联系我们 xfptx@outlook.com