Source code for score.netfs._connection

import fcntl
import hashlib
import socket
import logging
import os
import shutil
import struct
from ._exceptions import CommitFailed, UploadFailed, DownloadFailed
from .constants import Constants
from transaction.interfaces import IDataManager
from zope.interface import implementer

log = logging.getLogger('score.netfs')


[docs]class NetfsConnection: CHUNK_SIZE = 1024 * 1024 def __init__(self, conf): self.conf = conf if self.conf.host: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.conf.host, self.conf.port)) self.socket.settimeout(None)
[docs] def put(self, path, file, ctx=None, *, move=True): """ Uploads a file with given *path* to the server and moves it into the cache folder. The *file* is either a string (denoting a file system path to the file), or a :term:`file object`. In case the *file* parameter was a string, it is possible to keep that original file in place by specifying a falsy value for *move*. """ realpath = os.path.realpath(os.path.join(self.conf.cachedir, path)) path_prefix = os.path.commonprefix((self.conf.cachedir, realpath)) if path_prefix != self.conf.cachedir: raise ValueError('Invalid path: ' + path) dirname = os.path.dirname(realpath) if dirname: os.makedirs(dirname, exist_ok=True) if isinstance(file, str): if move: shutil.move(file, realpath) else: shutil.copy2(file, realpath) file = open(realpath, 'rb') else: shutil.copyfileobj(file, open(realpath, 'wb')) if self.conf.host: self.upload(path, file, ctx)
[docs] def get(self, path): """ Returns the local path to a file, downloading it from the server, if it does not already exist in the local cache folder. """ realpath = os.path.realpath(os.path.join(self.conf.cachedir, path)) path_prefix = os.path.commonprefix((self.conf.cachedir, realpath)) if path_prefix != self.conf.cachedir: raise ValueError('Invalid path: ' + path) if os.path.exists(realpath): return realpath tmpfile = realpath + '.tmp' dirname = os.path.dirname(realpath) if dirname: os.makedirs(dirname, exist_ok=True) file = open(tmpfile, 'wb') try: fcntl.flock(file, fcntl.LOCK_EX) if os.path.exists(realpath): # another process downloaded the file return realpath time = self.download(path, file) os.rename(tmpfile, realpath) os.utime(realpath, (time, time)) finally: fcntl.flock(file, fcntl.LOCK_UN) file.close() return realpath
[docs] def upload(self, path, file, ctx=None): """ Puts the contents of given :term:`file object` *file* with given *path* onto the server. You must call :meth:`.commit` to actually persist the upload. If you are using the ctx module, though, you should pass a :term:`context object` as *ctx*. This will automatically commit the upload if the transaction was successful. """ if self.conf.host is None: raise UploadFailed('No server configured') if not isinstance(path, bytes): path = path.encode('UTF-8') data = struct.pack('b', Constants.REQ_UPLOAD) data += struct.pack('!i', len(path)) data += path file.seek(0, 2) data += struct.pack('!q', file.tell()) file.seek(0, 0) sha = hashlib.sha512() self._send(data) chunk = file.read(self.CHUNK_SIZE) while chunk: self._send(chunk) sha.update(chunk) chunk = file.read(self.CHUNK_SIZE) self._send(sha.digest()) response = struct.unpack('b', self._read(1))[0] if response != Constants.RESP_OK: raise UploadFailed() if ctx: _CtxDataManager.join(self, ctx.tx_manager)
def prepare(self): """ Prepares the current transaction. Raises *CommitFailed* if the server responded with an error code. """ if self.conf.host is None: return self._send(struct.pack('b', Constants.REQ_PREPARE)) response = struct.unpack('b', self._read(1))[0] if response != Constants.RESP_OK: raise CommitFailed()
[docs] def commit(self): """ Instructs the server to persist all uploaded files, so that other clients can find them. """ if self.conf.host is None: return self._send(struct.pack('b', Constants.REQ_COMMIT)) response = struct.unpack('b', self._read(1))[0] if response != Constants.RESP_OK: raise CommitFailed()
def rollback(self): """ Sends a rollback command to the server. """ if self.conf.host is None: return self._send(struct.pack('b', Constants.REQ_ROLLBACK))
[docs] def download(self, path, file, retry=1): """ Downloads the file with given *path* from the server and writes it into the :term:`file object` *file*. """ if self.conf.host is None: raise DownloadFailed('No server configured') if not isinstance(path, bytes): path = path.encode('UTF-8') data = struct.pack('b', Constants.REQ_DOWNLOAD) data += struct.pack('!i', len(path)) data += path self._send(data) response = struct.unpack('b', self._read(1))[0] if response != Constants.RESP_OK: raise DownloadFailed(path) length = struct.unpack('!q', self._read(8))[0] sha = hashlib.sha512() while length: chunk_size = min(self.CHUNK_SIZE, length) chunk = self._read(chunk_size) sha.update(chunk) file.write(chunk) length -= chunk_size hash = self._read(512 // 8) if sha.digest() != hash: if retry > 0: return self.download(path, file, retry - 1) raise DownloadFailed(path) return struct.unpack('!i', self._read(4))[0]
def _send(self, data): log.debug('sending: {}'.format(data)) totalsent = 0 while totalsent < len(data): sent = self.socket.send(data[totalsent:]) if sent == 0: raise RuntimeError("socket connection broken") totalsent = totalsent + sent def _read(self, length): chunks = [] bytes_recd = 0 while bytes_recd < length: chunk = self.socket.recv(min(length - bytes_recd, 2048)) if chunk == b'': raise RuntimeError("socket connection broken") chunks.append(chunk) bytes_recd = bytes_recd + len(chunk) log.debug('receiving {}: {}'.format(length, b''.join(chunks))) return b''.join(chunks)
@implementer(IDataManager) class _CtxDataManager: """ An :interface:`IDataManager <transaction.interfaces.IDataManager>`, which will commit all uploaded files at the end of the transaction. """ _instances = [] @classmethod def join(cls, connection, tx_manager): tx = tx_manager.get() if (connection, tx) not in cls._instances: tx.join(cls(connection, tx_manager)) def __init__(self, connection, tx_manager): self.transaction_manager = tx_manager self.connection = connection self.__class__._instances.append((self.connection, tx_manager.get())) def abort(self, transaction): self.connection.rollback() self.__class__._instances.remove((self.connection, transaction)) def tpc_begin(self, transaction): pass def commit(self, transaction): pass def tpc_vote(self, transaction): self.connection.prepare() def tpc_finish(self, transaction): self.connection.commit() self.__class__._instances.remove((self.connection, transaction)) def tpc_abort(self, transaction): self.connection.rollback() self.__class__._instances.remove((self.connection, transaction)) def sortKey(self): return 'score.netfs(%d)' % id(self)