Source code for ulordapi.udfs.udfs

# coding=utf-8
# Copyright (c) 2016-2018 The Ulord Core Developers
# @File  : udfs.py
# @Author: Ulord_PuJi
# @Date  : 2018/5/18 0018

import sys, os, subprocess, platform, json, time, signal, logging, atexit
import copy
from uuid import uuid1

import ipfsapi

from ulordapi.utils import fileHelper
from ulordapi.config import ROOTPATH

[docs]class Udfs(): """ udfs class,including some operations about daemon program """ def __init__(self): """ add log and init daemon program """ self.log = logging.getLogger("Udfs:") # get some paths self.root_path = os.path.dirname(os.path.abspath(__file__)) self.config = os.path.join(self.root_path, 'config') self.lock = os.path.join(self.config, 'repo.lock') self.udfs_path = self.get_udfs() self.udfs_daemon_pid = self.get_pid() if not os.path.isdir(self.config): self.start_init() if self.udfs_init: self.modify_config() # self.start() # self.connect = UdfsHelper() # TODO: multi helper to download faster
[docs] def get_pid(self): """ get daemon pid :return: daemon pid """ if os.path.isfile(self.lock): self.log.debug('get udfs daemon pid') with open(self.lock, 'r') as target: return json.load(target).get("OwnerPID") else: self.log.debug("self.lock is {}.It doesn't exist".format(self.lock)) return None
[docs] def get_udfs(self): """ get udfs path according to the os :return: udfs path """ tools = os.path.join(self.root_path, 'tools') self.current_os = platform.system() self.log.info('Current os is {}'.format(self.current_os)) if self.current_os in ["Windows", "Win32"]: udfs = os.path.join(tools, 'udfs.exe') elif self.current_os in ["Mac OSX", "Darwin"]: udfs = os.path.join(tools, 'udfs') elif self.current_os == "Linux": udfs = os.path.join(tools, 'udfs') else: self.log.critical("Unknow operating system") sys.exit(0) return udfs
[docs] def start_command(self, cmd): """ start external command :param cmd: shell command :type cmd: str/list :return: popen """ self.log.debug("starting command,current command:{}".format(str(cmd))) FNULL = open(os.devnull, 'w') try: pl = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=FNULL) except Exception as e: self.log.error("start command \n{0}\n failed! Exception is {1}".format(str(cmd), e)) pl = None # self.log.info("end command,result is {}".format(pl.communicate())) time.sleep(3) return pl
[docs] def start(self, daemon=None): """ start udfs.if daemon is true, the udfs is a daemon,else the udfs is starting the current thread. :param daemon: if wait :type daemon: bool """ atexit.register(self.stop) # cmd = "{0} --config {1} daemon".format(self.udfs_path,self.config) cmd = [ self.udfs_path, "--config", self.config, "daemon" ] self.udfs_daemon = self.start_command(cmd) info = "Udfs has started!\nNow you can use it to download or upload!" print(info) self.log.info(info) if daemon and self.udfs_daemon: self.udfs_daemon.wait() self.udfs_daemon_pid = self.udfs_daemon.pid
[docs] def start_init(self): """ init udfs, sleep 3 microsecond to wait the command finished. """ # cmd = "{0} --config {1} init".format(self.udfs_path, self.config) cmd = [ self.udfs_path, "--config", self.config, "init" ] self.udfs_init = self.start_command(cmd) time.sleep(3)
[docs] def modify_config(self): """ modify udfs config """ self.udfs_config = os.path.join(self.config, 'config') if os.path.isfile(self.udfs_config): # modify config file self.log.debug("starting modify udfs config") with open(self.udfs_config) as target: self.udfs_json = json.load(target) self.udfs_json['Bootstrap'] = ["/ip4/114.67.37.2/tcp/20515/ipfs/QmctwnuHwE8QzH4yxuAPtM469BiCPK5WuT9KaTK3ArwUHu"] self.udfs_json['Datastore']['StorageMax'] = '0MB' with open(self.udfs_config, 'w') as target: json.dump(self.udfs_json, target, indent=2) self.log.debug("end modify udfs config") else: self.log.error("error get udfs config.Restart init...") self.start_init()
[docs] def stop(self): """ kill the udfs daemon """ if self.udfs_daemon_pid: self.log.info("stop daemon") if self.current_os in ["Windows", "Win32"]: self.log.debug("excute taskkill") try: os.popen('taskkill.exe /pid:{0} /F'.format(self.udfs_daemon_pid)) except Exception as e: self.log.error("Kill task error!Exception is {}".format(e)) elif self.current_os in ["Mac OSX", "Darwin"]: try: os.killpg(self.udfs_daemon_pid, signal.SIGTERM) except: self.log.error("Kill task error!Exception is {}".format(e)) elif self.current_os == "Linux": try: os.killpg(self.udfs_daemon_pid, signal.SIGTERM) except Exception as e: self.log.error("Kill task error!Exception is {}".format(e)) else: self.log.critical("Unknow operating system") sys.exit(1) # print(self.lock) # print(os.path.isfile(self.lock)) if os.path.isfile(self.lock): try: os.remove(self.lock) except Exception as e: self.log.error('remove self.lock({0}) failed!'.format(e))
[docs]class UdfsHelper(): """ download and upload files from Ulord """ def __init__(self, host='127.0.0.1', port='5001'): """ create a connection to the udfs :param host: udfs daemon host :type host: str :param port: udfs daemon port :type port: str/int """ self.udfs = Udfs() self.udfs_host = host self.udfs_port = port self.connect = None self.log = logging.getLogger("UdfsHelper:") self.chunks = {} self.objects = None self.links = [] self.downloadpath = os.path.join(ROOTPATH, 'download')
[docs] def update(self, host='127.0.0.1', port='5001'): """ update udfs dameon connection :param host: udfs daemon host :type host: str :param port: udfs daemon port :type port: str/int """ self.udfs_host = host self.udfs_port = port self.connect = None self.chunks = {} self.objects = None self.links = []
[docs] def cat(self, udfshash): """ Retrieves the contents of a file identified by hash. :param udfshash: udfs hash :type udfshash:str :return: str(File contents) """ if not self.connect: self.udfs.start(False) self.connect = ipfsapi.connect(self.udfs_host, self.udfs_port) return self.connect.cat(udfshash)
[docs] def upload_stream(self, stream): """ upload the stream to the ulord :param stream: stream data :type stream: unicode :return: """ if not self.connect: self.udfs.start(False) self.connect = ipfsapi.connect(self.udfs_host, self.udfs_port) try: # py-api doesn't support add stream.But the js-api supports.So sad.Maybe need to use HTTP-api. start = time.time() # TODO save stream to a file file_temp = os.path.join(ROOTPATH, 'temp', "{}.txt".format(uuid1())) if fileHelper.saveFile(file_temp, stream): result = self.connect.add(file_temp) try: os.remove(file_temp) except Exception as e: self.log.error("del temp file {0} error: {1}".format(file_temp, e)) end = time.time() self.log.info('upload stream cost:{}'.format(end - start)) return result.get('Hash') except Exception as e: logging.error("Failed upload.{}".format(e)) return None
[docs] def upload_file(self, local_file): """ upload the file to the udfs :param local_file: a local file path :type local_file: str :return: Hash or False """ if not self.connect: self.udfs.start(False) # self.log.error("You need to ") self.connect = ipfsapi.connect(self.udfs_host, self.udfs_port) try: if os.path.isfile(local_file): # start = time.time() result = self.connect.add(local_file) # end = time.time() # print('upload {0} ,size is {1}, cost:{2}'.format(local_file, FileHelper.getSize(local_file), (end - start))) return result.get('Hash') else: return False except Exception as e: # save e in the log self.log.error("upload file failed!Exception is {}".format(e)) return False
[docs] def list(self, filehash): """ list the udfshash chunks :param filehash: a udfs hash :type filehash: str :return: a list chunks of the udfs hash """ if not self.connect: self.udfs.start(False) self.connect = ipfsapi.connect(self.udfs_host, self.udfs_port) try: self.objects = self.connect.ls(filehash).get('Objects') if self.objects: for object in self.objects: if 'Links' in object.keys(): for link in object.get('Links'): self.links.append(link) else: self.links = "test" except Exception as e: logging.error("ls fail:{}".format(e))
[docs] def downloadfile(self, localfile): """ query the localfile from DB and then download from the udfs :param localfile: a file path :type localfile: str :return: True os False """ if not self.connect: self.udfs.start(False) self.connect = ipfsapi.connect(self.udfs_host, self.udfs_port) # TODO query the file hash from DB pass
[docs] def downloadhash(self, filehash, filepath=None, Debug=False): """ download file from the UDFS according to the udfs hash :param filehash: file udfs hash :type filehash: str :param filepath: the path to save the file :type filepath: str :param Debug: if Debug print the cost time :type Debug: bool :return: True or False """ if not self.connect: self.udfs.start(False) self.connect = ipfsapi.connect(self.udfs_host, self.udfs_port) try: if Debug: start = time.time() self.connect.get(filehash, filepath=filepath) if Debug: end = time.time() self.log.debug('download {0} cost:{1}'.format(filehash, (end - start))) print('download {0} cost:{1}'.format(filehash, (end - start))) self.log.info("download {} successfully!".format(filehash)) return True except Exception as e: logging.error("download fail:{}".format(e)) return False
[docs] def resumableDownload(self, filehash, filename=None): """ resumable download :param filehash: file UDFS hash :type filehash: str :param filename: file path to save the file :type filename: str :return: True or False """ if not self.connect: self.udfs.start(False) self.connect = ipfsapi.connect(self.udfs_host, self.udfs_port) # not thread safely.single thread filehash_path = os.path.join(self.downloadpath, filehash) tempjson = os.path.join(filehash_path, 'temp.json') if not os.path.isfile(tempjson): # save chunks result into the temp.json self.list(filehash) if self.links: i = 0 for link in self.links: if 'Hash' in link.keys(): self.chunks.update({ i: { 'filehash': link.get('Hash'), 'success': False } }) i += 1 fileHelper.saveFile(tempjson, json.dumps(self.chunks)) else: print("no chunks.Error get the {} chunks result".format(filehash)) # download chunk with open(tempjson) as target_file: self.chunks = json.load(target_file) if self.chunks: for chunk, chunk_result in self.chunks.items(): if not chunk_result.get('success'): chunk_result['success'] = self.downloadhash(chunk_result.get('filehash'), filehash_path) or chunk_result.get('success') fileHelper.saveFile(tempjson, json.dumps(self.chunks)) # merge chunks if filename: localfile = os.path.join(filehash_path, filename) else: localfile = os.path.join(filehash_path, filehash) with open(localfile, 'wb') as target_file: for i in range(len(self.chunks)): chunk = os.path.join(filehash_path, self.chunks.get(str(i)).get('filehash')) with open(chunk, 'rb') as source_file: for line in source_file: target_file.write(line) try: os.remove(chunk) # delete chunk to save the space except Exception as e: print("{0}:{1} remove failed:{2}".format(chunk, os.path.isfile(chunk), e)) try: os.remove(tempjson) except Exception as e: print("{0}:{1} remove failed:{2}".format(tempjson, os.path.isfile(tempjson), e))
if __name__ == '__main__': udfshelper = UdfsHelper() # udfshelper.upload_file(r'E:\ulord\py-ulord-api\ulordapi\udfs\config.json') with open(r'E:\ulord\py-ulord-api\ulordapi\udfs\config.json', 'r') as target: udfshelper.upload_stream(target)