diff mbox

[RFC,5/8] Add nbd server Python module

Message ID 1362867748-30528-6-git-send-email-stefanha@redhat.com
State New
Headers show

Commit Message

Stefan Hajnoczi March 9, 2013, 10:22 p.m. UTC
The nbd module works like this:

  server = nbd.Server(sock)
  server.add_export('drive0', handler0)
  server.add_export('drive1', handler1)
  server.run()

The user must provide a handler object which defines the behavior of an
export:

  class MyNBDHandler(nbd.ExportHandler):
      def write(self, offset, data):
          pass # do something

      def size(self):
          return 10 * 1024 * 1024 * 1024

Note that the handler is invoked from a thread.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 nbd.py | 124 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 124 insertions(+)
 create mode 100644 nbd.py
diff mbox

Patch

diff --git a/nbd.py b/nbd.py
new file mode 100644
index 0000000..2528531
--- /dev/null
+++ b/nbd.py
@@ -0,0 +1,124 @@ 
+# NBD server module
+#
+# Copyright 2013 Red Hat, Inc. and/or its affiliates
+#
+# Authors:
+#   Stefan Hajnoczi <stefanha@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or later.
+# See the COPYING file in the top-level directory.
+
+import struct
+import collections
+import threading
+
+__all__ = ['ExportHandler', 'Server']
+
+NBD_CMD_WRITE = 1
+NBD_CMD_DISC = 2
+NBD_REQUEST_MAGIC = 0x25609513
+NBD_REPLY_MAGIC = 0x67446698
+NBD_PASSWD = 0x4e42444d41474943
+NBD_OPTS_MAGIC = 0x49484156454F5054
+NBD_OPT_EXPORT_NAME = 1 << 0
+
+neg1_struct = struct.Struct('>QQH')
+export_tuple = collections.namedtuple('Export', 'reserved magic opt len')
+export_struct = struct.Struct('>IQII')
+neg2_struct = struct.Struct('>QH124x')
+request_tuple = collections.namedtuple('Request', 'magic type handle from_ len')
+request_struct = struct.Struct('>IIQQI')
+reply_struct = struct.Struct('>IIQ')
+
+def recvall(sock, bufsize):
+    received = 0
+    chunks = []
+    while received < bufsize:
+        chunk = sock.recv(bufsize - received)
+        if len(chunk) == 0:
+            raise Exception('unexpected disconnect')
+        chunks.append(chunk)
+        received += len(chunk)
+    return ''.join(chunks)
+
+class ExportHandler(object):
+    def write(self, offset, data):
+        pass
+
+    def size(self):
+        return 0
+
+def negotiate(conn, exports):
+    '''Negotiate export with client'''
+    # Send negotiation part 1
+    buf = neg1_struct.pack(NBD_PASSWD, NBD_OPTS_MAGIC, 0)
+    conn.sendall(buf)
+
+    # Receive export option
+    buf = recvall(conn, export_struct.size)
+    export = export_tuple._make(export_struct.unpack(buf))
+    assert export.magic == NBD_OPTS_MAGIC
+    assert export.opt == NBD_OPT_EXPORT_NAME
+    name = recvall(conn, export.len)
+
+    if name not in exports:
+        print 'name "%s" not in exports' % name
+        return None
+    handler = exports[name]
+
+    # Send negotiation part 2
+    buf = neg2_struct.pack(handler.size(), 0)
+    conn.sendall(buf)
+    return handler
+
+def read_request(conn):
+    '''Parse NBD request from client'''
+    buf = recvall(conn, request_struct.size)
+    req = request_tuple._make(request_struct.unpack(buf))
+    assert req.magic == NBD_REQUEST_MAGIC
+    return req
+
+def write_reply(conn, error, handle):
+    buf = reply_struct.pack(NBD_REPLY_MAGIC, error, handle)
+    conn.sendall(buf)
+
+def server_connection_thread(conn, exports):
+    handler = negotiate(conn, exports)
+    if handler is None:
+        conn.close()
+        return
+
+    while True:
+        req = read_request(conn)
+        if req.type == NBD_CMD_WRITE:
+            # Reply immediately, don't propagate internal errors to client
+            write_reply(conn, 0, req.handle)
+
+            data = recvall(conn, req.len)
+            handler.write(req.from_, data)
+        elif req.type == NBD_CMD_DISC:
+            break
+        else:
+            print 'unrecognized command type %#02x' % req.type
+            break
+    conn.close()
+
+class Server(object):
+    def __init__(self, sock):
+        self.sock = sock
+        self.exports = {}
+
+    def add_export(self, name, handler):
+        self.exports[name] = handler
+
+    def run(self):
+        threads = []
+        for i in range(len(self.exports)):
+            conn, _ = self.sock.accept()
+            t = threading.Thread(target=server_connection_thread,
+                                 args=(conn, self.exports))
+            t.daemon = True
+            t.start()
+            threads.append(t)
+        for t in threads:
+            t.join()