Distributed storage. Security attributes and ducumentation update.

!MAILaRCHIVE_VOTE_RePLACE
Previous message: [thread] [date] [author]
Next message: [thread] [date] [author]
To: <netdev@...>
Cc: <linux-kernel@...>, <linux-fsdevel@...>
Date: Friday, August 31, 2007 - 12:06 pm

On Tue, Jul 31, 2007 at 09:13:47PM +0400, Evgeniy Polyakov (johnpol@2ka.mipt.ru) wrote:
Hi.

I'm pleased to announce third release of the distributed storage
subsystem, which allows to form a storage on top of remote and local
nodes, which in turn can be exported to another storage as a node to
form tree-like storages.

This release includes following changes:
* security attributes (permission mask assigned to addresses, allowed to
	connect to given local export node)
* big documentation update (userspace documentation on the site also
	includes various usage case examples and descirption of the
	configuration utilitiy, protocols and userspace target)
* mirror algorithm has been moved from per-page to per-sector dirty
	bitmask

Further TODO list includes:
* implement optional saving of mirroring/linear information on the remote
	nodes (simple)
* implement netlink based setup (simple)
* new redundancy algorithm (complex)

Homepage:
http://tservice.net.ru/~s0mbre/old/?section=projects&item=dst

Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mipt.ru>

diff --git a/Documentation/dst/algorithms.txt b/Documentation/dst/algorithms.txt
new file mode 100644
index 0000000..bfc6984
--- /dev/null
+++ b/Documentation/dst/algorithms.txt
@@ -0,0 +1,115 @@
+Each storage by itself is just a set of contiguous logical blocks, with
+allowed number of operations. Nodes, each of which has own start and size,
+are placed into storage by appropriate algorithm, which remaps
+logical sector number into real node's sector. One can create
+own algorithms, since DST has pluggable interface for that.
+Currently mirrored and linear algorithms are supported.
+
+Let's briefly describe how they work.
+
+Linear algorithm.
+Simple approach of concatenating storages into single device with
+increased size is used in this algorithm. Essentially new device
+has size equal to sum of sizes of underlying nodes and nodes are
+placed one after another.
+
+  /----- Node 1 ---\                         /------ Node 3 ----\
+start              end                     start               end
+ |==================|========================|==================|
+ |                start                     end                 |
+ |                  \------- Node 2 ---------/                  |
+ |                                                              |
+start                                                          end
+ \-------------------------- DST storage ----------------------/
+
+			        /\
+			        ||
+			        ||
+
+			   IO operations
+
+			    Figure 1. 
+     3 nodes combined into single storage using linear algorithm.
+
+Mirror algorithm.
+In this algorithms nodes are placed under each other, so when
+operation comes to the first one, it can be mirrored to all
+underlying nodes. In case of reading, actual data is obtained from
+the nearest node - algoritm keeps track of previous operation
+and knows where it was stopped, so that subsequent seek to the 
+start of the new request will take the shortest time.
+Writing is always mirrored to all underlying nodes.
+
+                  IO operations
+                       ||
+                       ||
+                       \/
+
+|---------------- DST storate -------------------|
+|      prev position                             |
+|-------|------------ Node 1 --------------------|
+|                              prev pos          |
+|-------------------- Node 2 -----|--------------|
+|prev pos                                        |
+|---|---------------- Node 3 --------------------|
+
+		Figure 2.
+   3 nodes combined into single storage using mirror algorithm.
+
+Each algorithm must implement number of callbacks,
+which must be registered during initialization time.
+
+struct dst_alg_ops
+{
+	int			(*add_node)(struct dst_node *n);
+	void			(*del_node)(struct dst_node *n);
+	int 			(*remap)(struct dst_request *req);
+	int			(*error)(struct kst_state *state, int err);
+	struct module 		*owner;
+};
+
+@add_node.
+This callback is invoked when new node is being added into the storage,
+but before node is actually added into the storage, so that it could
+be accessed from it. When it is called, all appropriate initialization
+of the underlying device is already completed (system has been connected
+to remote node or got a reference to the local block device). At this
+stage algorithm can add node into private map. 
+It must return zero on success or negative value otherwise.
+
+@del_node.
+This callback is invoked when node is being deleted from the storage,
+i.e. when its reference counter hits zero. It is called before
+any cleaning is performed.
+It must return zero on success or negative value otherwise.
+
+@remap.
+This callback is invoked each time new bio hits the storage.
+Request structure contains BIO itself, pointer to the node, which originally
+stores the whole region under given IO request, and various parameters
+used by storage core to process this block request.
+It must return zero on success or negative value otherwise. It is upto
+this method to call all cleaning if remapping failed, for example it must
+call kst_bio_endio() for given callback in case of error, which in turn
+will call bio_endio(). Note, that dst_request structure provided in this
+callback is allocated on stack, so if there is a need to use it outside
+of the given function, it must be cloned (it will happen automatically
+in state's push callback, but that copy will not be shared by any other
+user).
+
+@error.
+This callback is invoked for each error, which happend when processed
+requests for remote nodes or when talking to remote size
+of the local export node (state contains data related to data
+transfers over the network).
+If this function has fixed given error, it must return 0 or negative
+error value otherwise.
+
+@owner.
+This is module reference counter updated automatically by DST core.
+
+Algorithm must provide its name and above structure to the 
+dst_alloc_alg() function, which will return a reference to the newly
+created algorithm.
+To remove it, one needs to call dst_remove_alg() with given algorithm
+pointer.
diff --git a/Documentation/dst/dst.txt b/Documentation/dst/dst.txt
new file mode 100644
index 0000000..3b326aa
--- /dev/null
+++ b/Documentation/dst/dst.txt
@@ -0,0 +1,66 @@
+Distributed storage. Design and implementation.
+http://tservice.net.ru/~s0mbre/old/?section=projects&item=dst
+
+	     Evgeniy Polyakov
+
+This document is intended to briefly describe design and
+implementation details of the distributed storage project,
+aimed to create ability to group physically and/or logically
+distributed storages into single device.
+
+Main operational unit in the storage is node. Node can represent
+either remote storage, connected to local machine, or local
+device, or storage exported to the outside of the system.
+Here goes small explaination of basic therms.
+
+Local node.
+This node is just a logical link between block device (with given
+major and minor numbers) and structure in the DST hierarchy,
+which represents number of sectors on the area, corresponding to given
+block device. it can be a disk, a device mapper node or stacked
+block device on top of another underlying DST nodes.
+
+Local export node.
+Essentially the same as local node, but it allows to access
+to its data via network. Remote clients can connect to given local 
+export node and read or write blocks according to its size.
+Blocks are then forwarded to underlying local node and processed
+there accordingly to the nature of the local node.
+
+Remote node.
+This type of nodes contain remotely accessible devices. One can think
+about remote nodes as remote disks, which can be connected to
+local system and combined into single storage. Remote nodes
+are presented as number of sectors accessed over the network
+by the local machine, where distributed storage is being formed.
+
+
+Each node or set of them can be formed into single array, which
+in turn becomes a local node, which can be exported further by stacking
+a local export node on top of it.
+
+Each storage by itself is just a set of contiguous logical blocks, with
+allowed number of operations. Nodes, each of which has own start and size,
+are placed into storage by appropriate algorithm, which remaps
+logical sector number into real node's sector. One can create
+own algorithms, since DST has pluggable interface for that.
+Currently mirrored and linear algorithms are supported.
+One can find more details in Documentation/dst/algorithms.txt file.
+
+Main goal of the distributed storage is to combine remote nodes into
+single device, so each block IO request is being sent over the network
+(contrary requests for local nodes are handled by the gneric block
+layer features). Each network connection has number of variables which
+describe it (socket, list of requests, error handling and so on),
+which form kst_state structure. This network state is added into per-socket
+polling state machine, and can be processed by dedicated thread when
+becomes ready. This system forms asynchronous IO for given block
+requests. If block request can be processed without blocking, then
+no new structures are allocated and async part of the state is not used.
+
+When connection to the remote peer breaks, DST core tries to reconnect
+to failed node and no requests are marked as errorneous, instead
+they live in the queue until reconnectin is established.
+
+Userspace code, setup documentation and examples can be found on project's
+homepage above.
diff --git a/drivers/block/Kconfig b/drivers/block/Kconfig
index b4c8319..ca6592d 100644
--- a/drivers/block/Kconfig
+++ b/drivers/block/Kconfig
@@ -451,6 +451,8 @@ config ATA_OVER_ETH
 	This driver provides Support for ATA over Ethernet block
 	devices like the Coraid EtherDrive (R) Storage Blade.
 
+source "drivers/block/dst/Kconfig"
+
 source "drivers/s390/block/Kconfig"
 
 endmenu
diff --git a/drivers/block/Makefile b/drivers/block/Makefile
index dd88e33..fcf042d 100644
--- a/drivers/block/Makefile
+++ b/drivers/block/Makefile
@@ -29,3 +29,4 @@ obj-$(CONFIG_VIODASD)		+= viodasd.o
 obj-$(CONFIG_BLK_DEV_SX8)	+= sx8.o
 obj-$(CONFIG_BLK_DEV_UB)	+= ub.o
 
+obj-$(CONFIG_DST)		+= dst/
diff --git a/drivers/block/dst/Kconfig b/drivers/block/dst/Kconfig
new file mode 100644
index 0000000..9c5eba2
--- /dev/null
+++ b/drivers/block/dst/Kconfig
@@ -0,0 +1,19 @@
+config DST
+	tristate "Distributed storage"
+	depends on NET
+	---help---
+	This driver allows to create a distributed storage.
+
+config DST_ALG_LINEAR
+	tristate "Linear distribution algorithm"
+	depends on DST
+	---help---
+	This module allows to create linear mapping of the nodes
+	in the distributed storage.
+
+config DST_ALG_MIRROR
+	tristate "Mirror distribution algorithm"
+	depends on DST
+	---help---
+	This module allows to create a mirror of the noes in the
+	distributed storage.
diff --git a/drivers/block/dst/Makefile b/drivers/block/dst/Makefile
new file mode 100644
index 0000000..1400e94
--- /dev/null
+++ b/drivers/block/dst/Makefile
@@ -0,0 +1,6 @@
+obj-$(CONFIG_DST) += dst.o
+
+dst-y := dcore.o kst.o
+
+obj-$(CONFIG_DST_ALG_LINEAR) += alg_linear.o
+obj-$(CONFIG_DST_ALG_MIRROR) += alg_mirror.o
diff --git a/drivers/block/dst/alg_linear.c b/drivers/block/dst/alg_linear.c
new file mode 100644
index 0000000..584f99e
--- /dev/null
+++ b/drivers/block/dst/alg_linear.c
@@ -0,0 +1,99 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/dst.h>
+
+static struct dst_alg *alg_linear;
+
+/*
+ * This callback is invoked when node is removed from storage.
+ */
+static void dst_linear_del_node(struct dst_node *n)
+{
+}
+
+/*
+ * This callback is invoked when node is added to storage.
+ */
+static int dst_linear_add_node(struct dst_node *n)
+{
+	struct dst_storage *st = n->st;
+
+	n->start = st->disk_size;
+	st->disk_size += n->size;
+
+	return 0;
+}
+
+static int dst_linear_remap(struct dst_request *req)
+{
+	int err;
+
+	if (req->node->bdev) {
+		generic_make_request(req->bio);
+		return 0;
+	}
+
+	err = kst_check_permissions(req->state, req->bio);
+	if (err)
+		return err;
+
+	return req->state->ops->push(req);
+}
+
+/*
+ * Failover callback - it is invoked each time error happens during
+ * request processing.
+ */
+static int dst_linear_error(struct kst_state *st, int err)
+{
+	if (err)
+		set_bit(DST_NODE_FROZEN, &st->node->flags);
+	else
+		clear_bit(DST_NODE_FROZEN, &st->node->flags);
+	return 0;
+}
+
+static struct dst_alg_ops alg_linear_ops = {
+	.remap		= dst_linear_remap,
+	.add_node 	= dst_linear_add_node,
+	.del_node 	= dst_linear_del_node,
+	.error		= dst_linear_error,
+	.owner		= THIS_MODULE,
+};
+
+static int __devinit alg_linear_init(void)
+{
+	alg_linear = dst_alloc_alg("alg_linear", &alg_linear_ops);
+	if (!alg_linear)
+		return -ENOMEM;
+
+	return 0;
+}
+
+static void __devexit alg_linear_exit(void)
+{
+	dst_remove_alg(alg_linear);
+}
+
+module_init(alg_linear_init);
+module_exit(alg_linear_exit);
+
+MODULE_LICENSE("GPL");
+MODULE_AUTHOR("Evgeniy Polyakov <johnpol@2ka.mipt.ru>");
+MODULE_DESCRIPTION("Linear distributed algorithm.");
diff --git a/drivers/block/dst/alg_mirror.c b/drivers/block/dst/alg_mirror.c
new file mode 100644
index 0000000..be42350
--- /dev/null
+++ b/drivers/block/dst/alg_mirror.c
@@ -0,0 +1,765 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/poll.h>
+#include <linux/dst.h>
+
+#define DST_MIRROR_MAX_CHUNKS		4096
+
+struct dst_mirror_priv
+{
+	unsigned int		chunk_num;
+
+	u64			last_start;
+
+	spinlock_t		backlog_lock;
+	struct list_head	backlog_list;
+
+	unsigned long		*chunk;
+};
+
+static struct dst_alg *alg_mirror;
+static struct bio_set *dst_mirror_bio_set;
+
+static ssize_t dst_mirror_chunk_mask_show(struct device *dev,
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_node *n = container_of(dev, struct dst_node, device);
+	struct dst_mirror_priv *priv = n->priv;
+	unsigned int i;
+	int rest = PAGE_SIZE;
+
+	for (i = 0; i < priv->chunk_num/BITS_PER_LONG; ++i) {
+		int bit, j;
+
+		for (j = 0; j < BITS_PER_LONG; ++j) {
+			bit = (priv->chunk[i] >> j) & 1;
+			sprintf(buf, "%c", (bit)?'+':'-');
+			buf++;
+		}
+
+		rest -= BITS_PER_LONG;
+
+		if (rest < BITS_PER_LONG)
+			break;
+	}
+
+	return PAGE_SIZE - rest;
+}
+
+static DEVICE_ATTR(chunks, 0444, dst_mirror_chunk_mask_show, NULL);
+
+/*
+ * This callback is invoked when node is removed from storage.
+ */
+static void dst_mirror_del_node(struct dst_node *n)
+{
+	struct dst_mirror_priv *priv = n->priv;
+
+	vfree(priv->chunk);
+	kfree(priv);
+	n->priv = NULL;
+
+	if (n->device.parent == &n->st->device)
+		device_remove_file(&n->device, &dev_attr_chunks);
+}
+
+static void dst_mirror_handle_priv(struct dst_node *n)
+{
+	if (n->priv) {
+		int err;
+		err = device_create_file(&n->device, &dev_attr_chunks);
+	}
+}
+
+/*
+ * This callback is invoked when node is added to storage.
+ */
+static int dst_mirror_add_node(struct dst_node *n)
+{
+	struct dst_storage *st = n->st;
+	struct dst_mirror_priv *priv;
+
+	if (st->disk_size)
+		st->disk_size = min(n->size, st->disk_size);
+	else
+		st->disk_size = n->size;
+
+	priv = kzalloc(sizeof(struct dst_mirror_priv), GFP_KERNEL);
+	if (!priv)
+		return -ENOMEM;
+
+	priv->chunk_num = st->disk_size;
+
+	priv->chunk = vmalloc(priv->chunk_num/BITS_PER_LONG * sizeof(long));
+	if (!priv->chunk)
+		goto err_out_free;
+
+	memset(priv->chunk, 0, priv->chunk_num/BITS_PER_LONG * sizeof(long));
+
+	spin_lock_init(&priv->backlog_lock);
+	INIT_LIST_HEAD(&priv->backlog_list);
+
+	dprintk("%s: %llu:%llu, chunk_num: %u, disk_size: %llu.\n",
+			__func__, n->start, n->size,
+			priv->chunk_num, st->disk_size);
+
+	n->priv_callback = &dst_mirror_handle_priv;
+	n->priv = priv;
+
+	return 0;
+
+err_out_free:
+	kfree(priv);
+	return -ENOMEM;
+}
+
+static void dst_mirror_sync_destructor(struct bio *bio)
+{
+	struct bio_vec *bv;
+	int i;
+
+	bio_for_each_segment(bv, bio, i)
+		__free_page(bv->bv_page);
+	bio_free(bio, dst_mirror_bio_set);
+}
+
+static void dst_mirror_sync_requeue(struct dst_node *n)
+{
+	struct dst_mirror_priv *p = n->priv;
+	struct dst_request *req;
+	unsigned int num, idx, i;
+	u64 start;
+	unsigned long flags;
+	int err;
+
+	while (!list_empty(&p->backlog_list)) {
+		req = NULL;
+		spin_lock_irqsave(&p->backlog_lock, flags);
+		if (!list_empty(&p->backlog_list)) {
+			req = list_entry(p->backlog_list.next,
+					struct dst_request,
+					request_list_entry);
+			list_del(&req->request_list_entry);
+		}
+		spin_unlock_irqrestore(&p->backlog_lock, flags);
+
+		if (!req)
+			break;
+
+		start = req->start - to_sector(req->orig_size - req->size);
+
+		idx = start;
+		num = to_sector(req->orig_size);
+
+		for (i=0; i<num; ++i)
+			if (test_bit(idx+i, p->chunk))
+				break;
+
+		dprintk("%s: idx: %u, num: %u, i: %u, req: %p, "
+				"start: %llu, size: %llu.\n",
+				__func__, idx, num, i, req, 
+				req->start, req->orig_size);
+
+		err = -1;
+		if (i != num) {
+			err = kst_enqueue_req(n->state, req);
+			if (err) {
+				printk("%s: congestion [%c]: req: %p, "
+						"start: %llu, size: %llu.\n",
+					__func__,
+					(bio_rw(req->bio) == WRITE)?'W':'R',
+					req, req->start, req->size);
+				kst_del_req(req);
+			}
+		}
+		if (err) {
+			req->bio_endio(req, err);
+			dst_free_request(req);
+		}
+	}
+
+	kst_wake(n->state);
+}
+
+static void dst_mirror_mark_sync(struct dst_node *n)
+{
+	if (test_bit(DST_NODE_NOTSYNC, &n->flags)) {
+		clear_bit(DST_NODE_NOTSYNC, &n->flags);
+		printk("%s: node: %p, %llu:%llu synchronization "
+				"has been completed.\n",
+			__func__, n, n->start, n->size);
+	}
+}
+
+static void dst_mirror_mark_notsync(struct dst_node *n)
+{
+	if (!test_bit(DST_NODE_NOTSYNC, &n->flags)) {
+		set_bit(DST_NODE_NOTSYNC, &n->flags);
+		printk("%s: not synced node n: %p.\n", __func__, n);
+	}
+}
+
+/*
+ * Without errors it is always called under node's request lock,
+ * so it is safe to requeue them.
+ */
+static void dst_mirror_bio_error(struct dst_request *req, int err)
+{
+	int i;
+	struct dst_mirror_priv *priv = req->node->priv;
+	unsigned int num, idx;
+	void (*process_bit[])(int nr, volatile void *addr) =
+		{&__clear_bit, &__set_bit};
+	u64 start = req->start - to_sector(req->orig_size - req->size);
+
+	if (err)
+		dst_mirror_mark_notsync(req->node);
+	else
+		dst_mirror_sync_requeue(req->node);
+
+	priv->last_start = req->start;
+
+	idx = start;
+	num = to_sector(req->orig_size);
+
+	dprintk("%s: req_priv: %p, chunk %p, %llu:%llu start: %llu, size: %llu, "
+		"chunk_num: %u, idx: %d, num: %d, err: %d.\n",
+		__func__, req->priv, priv->chunk, req->node->start, 
+		req->node->size, start, req->orig_size, priv->chunk_num, 
+		idx, num, err);
+
+	if (unlikely(idx >= priv->chunk_num || idx + num > priv->chunk_num)) {
+		printk("%s: %llu:%llu req: %p, start: %llu, orig_size: %llu, "
+			"req_start: %llu, req_size: %llu, "
+			"chunk_num: %u, idx: %d, num: %d, err: %d.\n",
+			__func__, req->node->start, req->node->size, req,
+			start, req->orig_size, 
+			req->start, req->size,
+			priv->chunk_num, idx, num, err);
+		return;
+	}
+
+	for (i=0; i<num; ++i)
+		process_bit[!!err](idx+i, priv->chunk);
+}
+
+static void dst_mirror_sync_req_endio(struct dst_request *req, int err)
+{
+	unsigned long notsync = 0;
+	struct dst_mirror_priv *priv = req->node->priv;
+	int i;
+
+	dst_mirror_bio_error(req, err);
+
+	printk("%s: freeing bio: %p, bi_size: %u, "
+			"orig_size: %llu, req: %p, node: %p.\n",
+		__func__, req->bio, req->bio->bi_size, req->orig_size, req,
+		req->node);
+
+	bio_put(req->bio);
+
+	for (i = 0; i < priv->chunk_num/BITS_PER_LONG; ++i) {
+		notsync = priv->chunk[i];
+
+		if (notsync)
+			break;
+	}
+
+	if (!notsync)
+		dst_mirror_mark_sync(req->node);
+}
+
+static int dst_mirror_sync_endio(struct bio *bio, unsigned int size, int err)
+{
+	struct dst_request *req = bio->bi_private;
+	struct dst_node *n = req->node;
+	struct dst_mirror_priv *priv = n->priv;
+	unsigned long flags;
+
+	printk("%s: bio: %p, err: %d, size: %u, req: %p.\n",
+			__func__, bio, err, bio->bi_size, req);
+
+	if (bio->bi_size)
+		return 1;
+
+	bio->bi_rw = WRITE;
+	bio->bi_size = req->orig_size;
+	bio->bi_sector = req->start;
+
+	if (!err) {
+		spin_lock_irqsave(&priv->backlog_lock, flags);
+		list_add_tail(&req->request_list_entry, &priv->backlog_list);
+		spin_unlock_irqrestore(&priv->backlog_lock, flags);
+		kst_wake(req->state);
+	} else {
+		req->bio_endio(req, err);
+		dst_free_request(req);
+	}
+	return 0;
+}
+
+static int dst_mirror_sync_block(struct dst_node *n,
+		int bit_start, int bit_num)
+{
+	u64 start = to_bytes(bit_start);
+	struct bio *bio;
+	unsigned int nr_pages = to_bytes(bit_num)/PAGE_SIZE, i;
+	struct page *page;
+	int err = -ENOMEM;
+	struct dst_request *req;
+
+	printk("%s: bit_start: %d, bit_num: %d, start: %llu, nr_pages: %u, "
+			"disk_size: %llu.\n",
+			__func__, bit_start, bit_num, start, nr_pages,
+			n->st->disk_size);
+
+	while (nr_pages) {
+		req = dst_clone_request(NULL, n->w->req_pool);
+		if (!req)
+			return -ENOMEM;
+
+		bio = bio_alloc_bioset(GFP_NOIO, nr_pages, dst_mirror_bio_set);
+		if (!bio)
+			goto err_out_free_req;
+
+		bio->bi_rw = READ;
+		bio->bi_private = req;
+		bio->bi_sector = to_sector(start);
+		bio->bi_bdev = NULL;
+		bio->bi_destructor = dst_mirror_sync_destructor;
+		bio->bi_end_io = dst_mirror_sync_endio;
+
+		for (i = 0; i < nr_pages; ++i) {
+			err = -ENOMEM;
+
+			page = alloc_page(GFP_NOIO);
+			if (!page)
+				break;
+
+			err = bio_add_pc_page(n->st->queue, bio,
+					page, PAGE_SIZE, 0);
+			if (err <= 0)
+				break;
+			err = 0;
+		}
+
+		if (err && !bio->bi_vcnt)
+			goto err_out_put_bio;
+
+		req->node = n;
+		req->state = n->state;
+		req->start = bio->bi_sector;
+		req->size = req->orig_size = bio->bi_size;
+		req->bio = bio;
+		req->idx = bio->bi_idx;
+		req->num = bio->bi_vcnt;
+		req->bio_endio = &dst_mirror_sync_req_endio;
+		req->callback = &kst_data_callback;
+
+		dprintk("%s: start: %llu, size(pages): %u, bio: %p, "
+				"size: %u, cnt: %d, req: %p, size: %llu.\n",
+				__func__, bio->bi_sector, nr_pages, bio,
+				bio->bi_size, bio->bi_vcnt, req, req->size);
+
+		err = n->st->queue->make_request_fn(n->st->queue, bio);
+		if (err)
+			goto err_out_put_bio;
+
+		nr_pages -= bio->bi_vcnt;
+		start += bio->bi_size;
+	}
+
+	return 0;
+
+err_out_put_bio:
+	bio_put(bio);
+err_out_free_req:
+	dst_free_request(req);
+	return err;
+}
+
+/*
+ * Resync logic.
+ *
+ * System allocates and queues requests for number of regions.
+ * Each request initially is reading from the one of the nodes.
+ * When it is completed, system checks if given region was already
+ * written to, and in such case just drops read request, otherwise
+ * it writes it to the node being updated. Any write clears not-uptodate
+ * bit, which is used as a flag that region must be synchronized or not.
+ * Reading is never performed from the node under resync.
+ */
+static int dst_mirror_resync(struct dst_node *n)
+{
+	int err = 0, sync = 0;
+	struct dst_mirror_priv *priv = n->priv;
+	unsigned int i;
+
+	printk("%s: node: %p, %llu:%llu synchronization has been started.\n",
+			__func__, n, n->start, n->size);
+
+	for (i = 0; i < priv->chunk_num/BITS_PER_LONG; ++i) {
+		int bit, num, start;
+		unsigned long word = priv->chunk[i];
+
+		if (!word)
+			continue;
+
+		num = 0;
+		start = -1;
+		while (word && num < BITS_PER_LONG) {
+			bit = __ffs(word);
+			if (start == -1)
+				start = bit;
+			num++;
+			word >>= (bit+1);
+		}
+
+		if (start != -1) {
+			err = dst_mirror_sync_block(n, start + i*BITS_PER_LONG,
+					num);
+			if (err)
+				break;
+			sync++;
+		}
+	}
+
+	if (!sync && !err)
+		dst_mirror_mark_sync(n);
+
+	return err;
+}
+
+static void dst_mirror_destructor(struct bio *bio)
+{
+	dprintk("%s: bio: %p.\n", __func__, bio);
+	bio_free(bio, dst_mirror_bio_set);
+}
+
+static int dst_mirror_end_io(struct bio *bio, unsigned int size, int err)
+{
+	struct dst_request *req = bio->bi_private;
+
+	if (bio->bi_size)
+		return 0;
+
+	dprintk("%s: req: %p, bio: %p, req->bio: %p, err: %d.\n",
+			__func__, req, bio, req->bio, err);
+	req->bio_endio(req, err);
+	bio_put(bio);
+	return 0;
+}
+
+static void dst_mirror_read_endio(struct dst_request *req, int err)
+{
+	dst_mirror_bio_error(req, err);
+
+	if (!err)
+		kst_bio_endio(req, 0);
+}
+
+static void dst_mirror_write_endio(struct dst_request *req, int err)
+{
+	dst_mirror_bio_error(req, err);
+
+	req = req->priv;
+
+	dprintk("%s: req: %p, priv: %p err: %d, bio: %p, "
+			"cnt: %d, orig_size: %llu.\n",
+		__func__, req, req->priv, err, req->bio,
+		atomic_read(&req->refcnt), req->orig_size);
+
+	if (atomic_dec_and_test(&req->refcnt)) {
+		dprintk("%s: freeing bio %p.\n", __func__, req->bio);
+		bio_endio(req->bio, req->orig_size, 0);
+		dst_free_request(req);
+	}
+}
+
+static int dst_mirror_process_request(struct dst_request *req,
+		struct dst_node *n)
+{
+	int err = 0;
+
+	/*
+	 * Block layer requires to clone a bio.
+	 */
+	if (n->bdev) {
+		struct bio *clone = bio_alloc_bioset(GFP_NOIO,
+			req->bio->bi_max_vecs, dst_mirror_bio_set);
+
+		__bio_clone(clone, req->bio);
+
+		clone->bi_bdev = n->bdev;
+		clone->bi_destructor = dst_mirror_destructor;
+		clone->bi_private = req;
+		clone->bi_end_io = &dst_mirror_end_io;
+
+		dprintk("%s: clone: %p, bio: %p, req: %p.\n",
+				__func__, clone, req->bio, req);
+
+		generic_make_request(clone);
+	} else {
+		struct dst_request nr;
+		/*
+		 * Network state processing engine will clone request 
+		 * by itself if needed. We can not use the same structure
+		 * here, since number of its fields will be modified.
+		 */
+		memcpy(&nr, req, sizeof(struct dst_request));
+
+		nr.node = n;
+		nr.state = n->state;
+		nr.priv = req;
+
+		err = kst_check_permissions(n->state, req->bio);
+		if (!err)
+			err = req->state->ops->push(&nr);
+	}
+
+	dprintk("%s: req: %p, n: %p, bdev: %p, err: %d.\n",
+			__func__, req, n, n->bdev, err);
+	return err;
+}
+
+static int dst_mirror_write(struct dst_request *oreq)
+{
+	struct dst_node *n, *node = req->node;
+	int num, err = 0, err_num = 0, orig_num;
+
+	req = dst_clone_request(oreq, oreq->node->w->req_pool);
+	if (!req) {
+		kst_bio_endio(oreq, -ENOMEM);
+		return -ENOMEM;
+	}
+
+	req->priv = req;
+
+	/*
+	 * This logic is pretty simple - req->bio_endio will not
+	 * call bio_endio() until all mirror devices completed
+	 * processing of the request (no matter with or without error).
+	 * Mirror's req->bio_endio callback will take care of that.
+	 */
+	orig_num = num = atomic_read(&req->node->shared_num) + 1;
+	atomic_set(&req->refcnt, num);
+
+	req->bio_endio = &dst_mirror_write_endio;
+
+	dprintk("\n%s: req: %p, mirror to %d nodes.\n",
+			__func__, req, num);
+
+	err = dst_mirror_process_request(req, node);
+	if (err)
+		err_num++;
+
+	if (--num) {
+		list_for_each_entry_rcu(n, &node->shared, shared) {
+			dprintk("\n%s: req: %p, start: %llu, size: %llu, "
+					"num: %d, n: %p.\n",
+				__func__, req, req->start, 
+				req->size, num, n);
+
+			err = dst_mirror_process_request(req, n);
+			if (err)
+				err_num++;
+
+			if (--num <= 0)
+				break;
+		}
+	}
+
+	if (err_num == orig_num) {
+		dprintk("%s: req: %p, num: %d, err: %d.\n",
+				__func__, req, num, err);
+		return -ENODEV;
+	}
+
+	return 0;
+}
+
+static int dst_mirror_read(struct dst_request *req)
+{
+	struct dst_node *node = req->node, *n, *min_dist_node;
+	struct dst_mirror_priv *priv = node->priv;
+	u64 dist, d;
+	int err;
+
+	req->bio_endio = &dst_mirror_read_endio;
+
+	do {
+		err = -ENODEV;
+		min_dist_node = NULL;
+		dist = -1ULL;
+ 
+		/*
+		 * Reading is never performed from the node under resync.
+		 * If this will cause any troubles (like all nodes must be
+		 * resynced between each other), this check can be removed
+		 * and per-chunk dirty bit can be tested instead.
+		 */
+
+		if (!test_bit(DST_NODE_NOTSYNC, &node->flags)) {
+			priv = node->priv;
+			if (req->start > priv->last_start)
+				dist = req->start - priv->last_start;
+			else
+				dist = priv->last_start - req->start;
+			min_dist_node = req->node;
+		}
+
+		list_for_each_entry_rcu(n, &node->shared, shared) {
+			if (test_bit(DST_NODE_NOTSYNC, &n->flags))
+				continue;
+
+			priv = n->priv;
+
+			if (req->start > priv->last_start)
+				d = req->start - priv->last_start;
+			else
+				d = priv->last_start - req->start;
+
+			if (d < dist)
+				min_dist_node = n;
+		}
+
+		if (!min_dist_node)
+			break;
+
+		req->node = min_dist_node;
+		req->state = req->node->state;
+
+		if (req->node->bdev) {
+			req->bio->bi_bdev = req->node->bdev;
+			generic_make_request(req->bio);
+			err = 0;
+			break;
+		}
+
+		err = req->state->ops->push(req);
+		if (err) {
+			printk("%s: 1 req: %p, bio: %p, node: %p, err: %d.\n",
+				__func__, req, req->bio, min_dist_node, err);
+			dst_mirror_mark_notsync(req->node);
+		}
+	} while (err && min_dist_node);
+
+	if (err) {
+		printk("%s: req: %p, bio: %p, node: %p, err: %d.\n",
+			__func__, req, req->bio, min_dist_node, err);
+		kst_bio_endio(req, err);
+	}
+	return err;
+}
+
+/*
+ * This callback is invoked from block layer request processing function,
+ * its task is to remap block request to different nodes.
+ */
+static int dst_mirror_remap(struct dst_request *req)
+{
+	int (*remap[])(struct dst_request *) = 
+		{&dst_mirror_read, &dst_mirror_write};
+
+	return remap[bio_rw(req->bio) == WRITE](req);
+}
+
+static int dst_mirror_error(struct kst_state *st, int err)
+{
+	struct dst_request *req, *tmp;
+	unsigned int revents = st->socket->ops->poll(NULL, st->socket, NULL);
+
+	if (err == -EEXIST)
+		return err;
+
+	if (!(revents & (POLLERR | POLLHUP))) {
+		if (test_bit(DST_NODE_NOTSYNC, &st->node->flags)) {
+			return dst_mirror_resync(st->node);
+		}
+		return 0;
+	}
+
+	dst_mirror_mark_notsync(st->node);
+
+	mutex_lock(&st->request_lock);
+	list_for_each_entry_safe(req, tmp, &st->request_list,
+					request_list_entry) {
+		kst_del_req(req);
+		dprintk("%s: requeue [%c], start: %llu, idx: %d,"
+				" num: %d, size: %llu, offset: %u, err: %d.\n",
+			__func__, (bio_rw(req->bio) == WRITE)?'W':'R',
+			req->start, req->idx, req->num, req->size,
+			req->offset, err);
+
+		if (bio_rw(req->bio) == READ) {
+			req->start -= to_sector(req->orig_size - req->size);
+			req->size = req->orig_size;
+			req->flags &= ~DST_REQ_HEADER_SENT;
+			req->idx = 0;
+			if (dst_mirror_read(req))
+				kst_complete_req(req, err);
+			else
+				dst_free_request(req);
+		} else {
+			kst_complete_req(req, err);
+		}
+	}
+	mutex_unlock(&st->request_lock);
+	return err;
+}
+
+static struct dst_alg_ops alg_mirror_ops = {
+	.remap		= dst_mirror_remap,
+	.add_node	= dst_mirror_add_node,
+	.del_node	= dst_mirror_del_node,
+	.error		= dst_mirror_error,
+	.owner		= THIS_MODULE,
+};
+
+static int __devinit alg_mirror_init(void)
+{
+	int err = -ENOMEM;
+
+	dst_mirror_bio_set = bioset_create(256, 256);
+	if (!dst_mirror_bio_set)
+		return -ENOMEM;
+
+	alg_mirror = dst_alloc_alg("alg_mirror", &alg_mirror_ops);
+	if (!alg_mirror)
+		goto err_out;
+
+	return 0;
+
+err_out:
+	bioset_free(dst_mirror_bio_set);
+	return err;
+}
+
+static void __devexit alg_mirror_exit(void)
+{
+	dst_remove_alg(alg_mirror);
+	bioset_free(dst_mirror_bio_set);
+}
+
+module_init(alg_mirror_init);
+module_exit(alg_mirror_exit);
+
+MODULE_LICENSE("GPL");
+MODULE_AUTHOR("Evgeniy Polyakov <johnpol@2ka.mipt.ru>");
+MODULE_DESCRIPTION("Mirror distributed algorithm.");
diff --git a/drivers/block/dst/dcore.c b/drivers/block/dst/dcore.c
new file mode 100644
index 0000000..2bf7fc1
--- /dev/null
+++ b/drivers/block/dst/dcore.c
@@ -0,0 +1,1526 @@
+/*
+ * 2007+ Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/blkdev.h>
+#include <linux/bio.h>
+#include <linux/slab.h>
+#include <linux/miscdevice.h>
+#include <linux/socket.h>
+#include <linux/dst.h>
+#include <linux/device.h>
+#include <linux/in.h>
+#include <linux/in6.h>
+#include <linux/buffer_head.h>
+
+#include <net/sock.h>
+
+static LIST_HEAD(dst_storage_list);
+static LIST_HEAD(dst_alg_list);
+static DEFINE_MUTEX(dst_storage_lock);
+static DEFINE_MUTEX(dst_alg_lock);
+static int dst_major;
+static struct kst_worker *kst_main_worker;
+
+struct kmem_cache *dst_request_cache;
+
+/*
+ * DST sysfs tree. For device called 'storage' which is formed
+ * on top of two nodes this looks like this:
+ *
+ * /sys/devices/storage/
+ * /sys/devices/storage/alg : alg_linear
+ * /sys/devices/storage/n-800/type : R: 192.168.4.80:1025
+ * /sys/devices/storage/n-800/size : 800
+ * /sys/devices/storage/n-800/start : 800
+ * /sys/devices/storage/n-0/type : R: 192.168.4.81:1025
+ * /sys/devices/storage/n-0/size : 800
+ * /sys/devices/storage/n-0/start : 0
+ * /sys/devices/storage/remove_all_nodes
+ * /sys/devices/storage/nodes : sectors (start [size]): 0 [800] | 800 [800]
+ * /sys/devices/storage/name : storage
+ */
+
+static int dst_dev_match(struct device *dev, struct device_driver *drv)
+{
+	return 1;
+}
+
+static void dst_dev_release(struct device *dev)
+{
+}
+
+static struct bus_type dst_dev_bus_type = {
+	.name 		= "dst",
+	.match 		= &dst_dev_match,
+};
+
+static struct device dst_dev = {
+	.bus 		= &dst_dev_bus_type,
+	.release 	= &dst_dev_release
+};
+
+static void dst_node_release(struct device *dev)
+{
+}
+
+static struct device dst_node_dev = {
+	.release 	= &dst_node_release
+};
+
+static struct bio_set *dst_bio_set;
+
+static void dst_destructor(struct bio *bio)
+{
+	bio_free(bio, dst_bio_set);
+}
+
+/*
+ * Internal callback for local requests (i.e. for local disk),
+ * which are splitted between nodes (part with local node destination
+ * ends up with this ->bi_end_io() callback).
+ */
+static int dst_end_io(struct bio *bio, unsigned int size, int err)
+{
+	struct bio *orig_bio = bio->bi_private;
+
+	if (bio->bi_size)
+		return 0;
+
+	dprintk("%s: bio: %p, orig_bio: %p, size: %u, orig_size: %u.\n",
+		__func__, bio, orig_bio, size, orig_bio->bi_size);
+
+	bio_endio(orig_bio, size, 0);
+	bio_put(bio);
+	return 0;
+}
+
+/*
+ * This function sends processing request down to block layer (for local node)
+ * or to network state machine (for remote node).
+ */
+static int dst_node_push(struct dst_request *req)
+{
+	int err = 0;
+	struct dst_node *n = req->node;
+
+	if (n->bdev) {
+		struct bio *bio = req->bio;
+
+		dprintk("%s: start: %llu, num: %d, idx: %d, offset: %u, "
+				"size: %llu, bi_idx: %d, bi_vcnt: %d.\n",
+			__func__, req->start, req->num, req->idx,
+			req->offset, req->size,	bio->bi_idx, bio->bi_vcnt);
+
+		if (likely(bio->bi_idx == req->idx &&
+					bio->bi_vcnt == req->num)) {
+			bio->bi_bdev = n->bdev;
+			bio->bi_sector = req->start;
+		} else {
+			struct bio *clone = bio_alloc_bioset(GFP_NOIO,
+					bio->bi_max_vecs, dst_bio_set);
+			struct bio_vec *bv;
+
+			err = -ENOMEM;
+			if (!clone)
+				goto out_put;
+
+			__bio_clone(clone, bio);
+
+			bv = bio_iovec_idx(clone, req->idx);
+			bv->bv_offset += req->offset;
+			clone->bi_idx = req->idx;
+			clone->bi_vcnt = req->num;
+			clone->bi_bdev = n->bdev;
+			clone->bi_sector = req->start;
+			clone->bi_destructor = dst_destructor;
+			clone->bi_private = bio;
+			clone->bi_size = req->orig_size;
+			clone->bi_end_io = &dst_end_io;
+			req->bio = clone;
+
+			dprintk("%s: start: %llu, num: %d, idx: %d, "
+				"offset: %u, size: %llu, "
+				"bi_idx: %d, bi_vcnt: %d, req: %p, bio: %p.\n",
+				__func__, req->start, req->num, req->idx,
+				req->offset, req->size,
+				clone->bi_idx, clone->bi_vcnt, req, req->bio);
+
+		}
+	}
+
+	err = n->st->alg->ops->remap(req);
+
+out_put:
+	dst_node_put(n);
+	return err;
+}
+
+/*
+ * This function is invoked from block layer request processing function,
+ * its task is to remap block request to different nodes.
+ */
+static int dst_remap(struct dst_storage *st, struct bio *bio)
+{
+	struct dst_node *n;
+	int err = -EINVAL, i, cnt;
+	unsigned int bio_sectors = bio->bi_size>>9;
+	struct bio_vec *bv;
+	struct dst_request req;
+	u64 rest_in_node, start, total_size;
+
+	mutex_lock(&st->tree_lock);
+	n = dst_storage_tree_search(st, bio->bi_sector);
+	mutex_unlock(&st->tree_lock);
+
+	if (!n) {
+		dprintk("%s: failed to find a node for bio: %p, "
+				"sector: %llu.\n",
+				__func__, bio, bio->bi_sector);
+		return -ENODEV;
+	}
+
+	dprintk("%s: bio: %llu-%llu, dev: %llu-%llu, in sectors.\n",
+			__func__, bio->bi_sector, bio->bi_sector+bio_sectors,
+			n->start, n->start+n->size);
+
+	memset(&req, 0, sizeof(struct dst_request));
+
+	start = bio->bi_sector;
+	total_size = bio->bi_size;
+
+	req.flags = (test_bit(DST_NODE_FROZEN, &n->flags))?
+				DST_REQ_ALWAYS_QUEUE:0;
+	req.start = start - n->start;
+	req.offset = 0;
+	req.state = n->state;
+	req.node = n;
+	req.bio = bio;
+
+	req.size = bio->bi_size;
+	req.orig_size = bio->bi_size;
+	req.idx = bio->bi_idx;
+	req.num = bio->bi_vcnt;
+
+	req.bio_endio = &kst_bio_endio;
+
+	/*
+	 * Common fast path - block request does not cross
+	 * boundaries between nodes.
+	 */
+	if (likely(bio->bi_sector + bio_sectors <= n->start + n->size))
+		return dst_node_push(&req);
+
+	req.size = 0;
+	req.idx = 0;
+	req.num = 1;
+
+	cnt = bio->bi_vcnt;
+
+	rest_in_node = to_bytes(n->size - req.start);
+
+	for (i = 0; i < cnt; ++i) {
+		bv = bio_iovec_idx(bio, i);
+
+		if (req.size + bv->bv_len >= rest_in_node) {
+			unsigned int diff = req.size + bv->bv_len -
+				rest_in_node;
+
+			req.size += bv->bv_len - diff;
+			req.start = start - n->start;
+			req.orig_size = req.size;
+			req.bio = bio;
+			req.bio_endio = &kst_bio_endio;
+
+			dprintk("%s: split: start: %llu/%llu, size: %llu, "
+					"total_size: %llu, diff: %u, idx: %d, "
+					"num: %d, bv_len: %u, bv_offset: %u.\n",
+					__func__, start, req.start, req.size,
+					total_size, diff, req.idx, req.num,
+					bv->bv_len, bv->bv_offset);
+
+			err = dst_node_push(&req);
+			if (err)
+				break;
+
+			total_size -= req.orig_size;
+
+			if (!total_size)
+				break;
+
+			start += to_sector(req.orig_size);
+
+			req.flags = (test_bit(DST_NODE_FROZEN, &n->flags))?
+				DST_REQ_ALWAYS_QUEUE:0;
+			req.orig_size = req.size = diff;
+
+			if (diff) {
+				req.offset = bv->bv_len - diff;
+				req.idx = req.num - 1;
+			} else {
+				req.idx = req.num;
+				req.offset = 0;
+			}
+
+			dprintk("%s: next: start: %llu, size: %llu, "
+				"total_size: %llu, diff: %u, idx: %d, "
+				"num: %d, offset: %u, bv_len: %u, "
+				"bv_offset: %u.\n",
+				__func__, start, req.size, total_size, diff,
+				req.idx, req.num, req.offset,
+				bv->bv_len, bv->bv_offset);
+
+			mutex_lock(&st->tree_lock);
+			n = dst_storage_tree_search(st, start);
+			mutex_unlock(&st->tree_lock);
+
+			if (!n) {
+				err = -ENODEV;
+				dprintk("%s: failed to find a split node for "
+				  "bio: %p, sector: %llu, start: %llu.\n",
+						__func__, bio, bio->bi_sector,
+						req.start);
+				break;
+			}
+
+			req.state = n->state;
+			req.node = n;
+			req.start = start - n->start;
+			rest_in_node = to_bytes(n->size - req.start);
+
+			dprintk("%s: req.start: %llu, start: %llu, "
+					"dev_start: %llu, dev_size: %llu, "
+					"rest_in_node: %llu.\n",
+				__func__, req.start, start, n->start,
+				n->size, rest_in_node);
+		} else {
+			req.size += bv->bv_len;
+			req.num++;
+		}
+	}
+
+	dprintk("%s: last request: start: %llu, size: %llu, "
+			"total_size: %llu.\n", __func__,
+			req.start, req.size, total_size);
+	if (total_size) {
+		req.orig_size = req.size;
+		req.bio = bio;
+		req.bio_endio = &kst_bio_endio;
+
+		dprintk("%s: last: start: %llu/%llu, size: %llu, "
+				"total_size: %llu, idx: %d, num: %d.\n",
+			__func__, start, req.start, req.size,
+			total_size, req.idx, req.num);
+
+		err = dst_node_push(&req);
+		if (!err) {
+			total_size -= req.orig_size;
+
+			BUG_ON(total_size != 0);
+		}
+	}
+
+	dprintk("%s: end bio: %p, err: %d.\n", __func__, bio, err);
+	return err;
+}
+
+
+/*
+ * Distributed storage erquest processing function.
+ * It calls algorithm spcific remapping code only.
+ */
+static int dst_request(request_queue_t *q, struct bio *bio)
+{
+	struct dst_storage *st = q->queuedata;
+	int err;
+
+	dprintk("\n%s: start: st: %p, bio: %p, cnt: %u.\n",
+			__func__, st, bio, bio->bi_vcnt);
+
+	err = dst_remap(st, bio);
+
+	dprintk("%s: end: st: %p, bio: %p, err: %d.\n",
+			__func__, st, bio, err);
+	return 0;
+}
+
+static void dst_unplug(request_queue_t *q)
+{
+}
+
+static int dst_flush(request_queue_t *q, struct gendisk *disk, sector_t *sec)
+{
+	return 0;
+}
+
+static struct block_device_operations dst_blk_ops = {
+	.owner =	THIS_MODULE,
+};
+
+/*
+ * Block layer binding - disk is created when array is fully configured
+ * by userspace request.
+ */
+static int dst_create_disk(struct dst_storage *st)
+{
+	int err = -ENOMEM;
+
+	st->queue = blk_alloc_queue(GFP_KERNEL);
+	if (!st->queue)
+		goto err_out_exit;
+
+	st->queue->queuedata = st;
+	blk_queue_make_request(st->queue, dst_request);
+	blk_queue_bounce_limit(st->queue, BLK_BOUNCE_ANY);
+	st->queue->unplug_fn = dst_unplug;
+	st->queue->issue_flush_fn = dst_flush;
+
+	err = -EINVAL;
+	st->disk = alloc_disk(1);
+	if (!st->disk)
+		goto err_out_free_queue;
+
+	st->disk->major = dst_major;
+	st->disk->first_minor = (((unsigned long)st->disk) ^
+		(((unsigned long)st->disk) >> 31)) & 0xff;
+	st->disk->fops = &dst_blk_ops;
+	st->disk->queue = st->queue;
+	st->disk->private_data = st;
+	snprintf(st->disk->disk_name, sizeof(st->disk->disk_name),
+			"dst-%s-%d", st->name, st->disk->first_minor);
+
+	return 0;
+
+err_out_free_queue:
+	blk_cleanup_queue(st->queue);
+err_out_exit:
+	return err;
+}
+
+static void dst_remove_disk(struct dst_storage *st)
+{
+	del_gendisk(st->disk);
+	put_disk(st->disk);
+	blk_cleanup_queue(st->queue);
+}
+
+/*
+ * Shows node name in sysfs.
+ */
+static ssize_t dst_name_show(struct device *dev,
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_storage *st = container_of(dev, struct dst_storage, device);
+
+	return sprintf(buf, "%s\n", st->name);
+}
+
+static void dst_remove_all_nodes(struct dst_storage *st)
+{
+	struct dst_node *n, *node, *tmp;
+	struct rb_node *rb_node;
+
+	mutex_lock(&st->tree_lock);
+	while ((rb_node = rb_first(&st->tree_root)) != NULL) {
+		n = rb_entry(rb_node, struct dst_node, tree_node);
+		dprintk("%s: n: %p, start: %llu, size: %llu.\n",
+				__func__, n, n->start, n->size);
+		rb_erase(&n->tree_node, &st->tree_root);
+		if (!n->shared_head && atomic_read(&n->shared_num)) {
+			list_for_each_entry_safe(node, tmp, &n->shared, shared) {
+				list_del_rcu(&node->shared);
+				atomic_dec(&node->shared_head->refcnt);
+				node->shared_head = NULL;
+				dst_node_put(node);
+			}
+		}
+		dst_node_put(n);
+	}
+	mutex_unlock(&st->tree_lock);
+}
+
+/*
+ * Shows node layout in syfs.
+ */
+static ssize_t dst_nodes_show(struct device *dev,
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_storage *st = container_of(dev, struct dst_storage, device);
+	int size = PAGE_CACHE_SIZE, sz;
+	struct dst_node *n;
+	struct rb_node *rb_node;
+
+	sz = sprintf(buf, "sectors (start [size]): ");
+	size -= sz;
+	buf += sz;
+
+	mutex_lock(&st->tree_lock);
+	for (rb_node = rb_first(&st->tree_root); rb_node;
+			rb_node = rb_next(rb_node)) {
+		n = rb_entry(rb_node, struct dst_node, tree_node);
+		if (size < 32)
+			break;
+		sz = sprintf(buf, "%llu [%llu]", n->start, n->size);
+		buf += sz;
+		size -= sz;
+
+		if (!rb_next(rb_node))
+			break;
+
+		sz = sprintf(buf, " | ");
+		buf += sz;
+		size -= sz;
+	}
+	mutex_unlock(&st->tree_lock);
+	size -= sprintf(buf, "\n");
+	return PAGE_CACHE_SIZE - size;
+}
+
+/*
+ * Algorithm currently being used by given storage.
+ */
+static ssize_t dst_alg_show(struct device *dev,
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_storage *st = container_of(dev, struct dst_storage, device);
+	return sprintf(buf, "%s\n", st->alg->name);
+}
+
+/*
+ * Writing to this sysfs file allows to remove all nodes
+ * and storage itself automatically.
+ */
+static ssize_t dst_remove_nodes(struct device *dev,
+		struct device_attribute *attr,
+		const char *buf, size_t count)
+{
+	struct dst_storage *st = container_of(dev, struct dst_storage, device);
+	dst_remove_all_nodes(st);
+	return count;
+}
+
+static DEVICE_ATTR(name, 0444, dst_name_show, NULL);
+static DEVICE_ATTR(nodes, 0444, dst_nodes_show, NULL);
+static DEVICE_ATTR(alg, 0444, dst_alg_show, NULL);
+static DEVICE_ATTR(remove_all_nodes, 0644, NULL, dst_remove_nodes);
+
+static int dst_create_storage_attributes(struct dst_storage *st)
+{
+	int err;
+
+	err = device_create_file(&st->device, &dev_attr_name);
+	err = device_create_file(&st->device, &dev_attr_nodes);
+	err = device_create_file(&st->device, &dev_attr_alg);
+	err = device_create_file(&st->device, &dev_attr_remove_all_nodes);
+	return 0;
+}
+
+static void dst_remove_storage_attributes(struct dst_storage *st)
+{
+	device_remove_file(&st->device, &dev_attr_name);
+	device_remove_file(&st->device, &dev_attr_nodes);
+	device_remove_file(&st->device, &dev_attr_alg);
+	device_remove_file(&st->device, &dev_attr_remove_all_nodes);
+}
+
+static void dst_storage_sysfs_exit(struct dst_storage *st)
+{
+	dst_remove_storage_attributes(st);
+	device_unregister(&st->device);
+}
+
+static int dst_storage_sysfs_init(struct dst_storage *st)
+{
+	int err;
+
+	memcpy(&st->device, &dst_dev, sizeof(struct device));
+	snprintf(st->device.bus_id, sizeof(st->device.bus_id), "%s", st->name);
+
+	err = device_register(&st->device);
+	if (err) {
+		dprintk(KERN_ERR "Failed to register dst device %s, err: %d.\n",
+			st->name, err);
+		goto err_out_exit;
+	}
+
+	dst_create_storage_attributes(st);
+
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+/*
+ * This functions shows size and start of the appropriate node.
+ * Both are in sectors.
+ */
+static ssize_t dst_show_start(struct device *dev,
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_node *n = container_of(dev, struct dst_node, device);
+
+	return sprintf(buf, "%llu\n", n->start);
+}
+
+static ssize_t dst_show_size(struct device *dev,
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_node *n = container_of(dev, struct dst_node, device);
+
+	return sprintf(buf, "%llu\n", n->size);
+}
+
+/*
+ * Shows type of the remote node - device major/minor number
+ * for local nodes and address (af_inet ipv4/ipv6 only) for remote nodes.
+ */
+static ssize_t dst_show_type(struct device *dev,
+		struct device_attribute *attr, char *buf)
+{
+	struct dst_node *n = container_of(dev, struct dst_node, device);
+	struct sockaddr addr;
+	struct socket *sock;
+	int addrlen;
+
+	if (!n->state && !n->bdev)
+		return 0;
+
+	if (n->bdev)
+		return sprintf(buf, "L: %d:%d\n",
+				MAJOR(n->bdev->bd_dev), MINOR(n->bdev->bd_dev));
+
+	sock = n->state->socket;
+	if (sock->ops->getname(sock, &addr, &addrlen, 2))
+		return 0;
+
+	if (sock->ops->family == AF_INET) {
+		struct sockaddr_in *sin = (struct sockaddr_in *)&addr;
+		return sprintf(buf, "R: %u.%u.%u.%u:%d\n",
+			NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
+	} else if (sock->ops->family == AF_INET6) {
+		struct sockaddr_in6 *sin = (struct sockaddr_in6 *)&addr;
+		return sprintf(buf,
+			"R: %04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%d\n",
+			NIP6(sin->sin6_addr), ntohs(sin->sin6_port));
+	}
+	return 0;
+}
+
+static DEVICE_ATTR(start, 0444, dst_show_start, NULL);
+static DEVICE_ATTR(size, 0444, dst_show_size, NULL);
+static DEVICE_ATTR(type, 0444, dst_show_type, NULL);
+
+static int dst_create_node_attributes(struct dst_node *n)
+{
+	int err;
+
+	err = device_create_file(&n->device, &dev_attr_start);
+	err = device_create_file(&n->device, &dev_attr_size);
+	err = device_create_file(&n->device, &dev_attr_type);
+	return 0;
+}
+
+static void dst_remove_node_attributes(struct dst_node *n)
+{
+	device_remove_file(&n->device, &dev_attr_start);
+	device_remove_file(&n->device, &dev_attr_size);
+	device_remove_file(&n->device, &dev_attr_type);
+}
+
+static void dst_node_sysfs_exit(struct dst_node *n)
+{
+	if (n->device.parent == &n->st->device) {
+		dst_remove_node_attributes(n);
+		device_unregister(&n->device);
+		n->device.parent = NULL;
+	}
+}
+
+static int dst_node_sysfs_init(struct dst_node *n)
+{
+	int err;
+
+	memcpy(&n->device, &dst_node_dev, sizeof(struct device));
+
+	n->device.parent = &n->st->device;
+
+	snprintf(n->device.bus_id, sizeof(n->device.bus_id),
+			"n-%llu-%p", n->start, n);
+	err = device_register(&n->device);
+	if (err) {
+		dprintk(KERN_ERR "Failed to register node, err: %d.\n", err);
+		goto err_out_exit;
+	}
+
+	dst_create_node_attributes(n);
+
+	return 0;
+
+err_out_exit:
+	n->device.parent = NULL;
+	return err;
+}
+
+/*
+ * Gets a reference for given storage, if
+ * storage with given name and algorithm being used
+ * does not exist it is created.
+ */
+static struct dst_storage *dst_get_storage(char *name, char *aname, int alloc)
+{
+	struct dst_storage *st, *rst = NULL;
+	int err;
+	struct dst_alg *alg;
+
+	mutex_lock(&dst_storage_lock);
+	list_for_each_entry(st, &dst_storage_list, entry) {
+		if (!strcmp(name, st->name) && !strcmp(st->alg->name, aname)) {
+			rst = st;
+			atomic_inc(&st->refcnt);
+			break;
+		}
+	}
+	mutex_unlock(&dst_storage_lock);
+
+	if (rst || !alloc)
+		return rst;
+
+	st = kzalloc(sizeof(struct dst_storage), GFP_KERNEL);
+	if (!st)
+		return NULL;
+
+	mutex_init(&st->tree_lock);
+	/*
+	 * One for storage itself,
+	 * another one for attached node below.
+	 */
+	atomic_set(&st->refcnt, 2);
+	snprintf(st->name, DST_NAMELEN, "%s", name);
+	st->tree_root.rb_node = NULL;
+
+	err = dst_storage_sysfs_init(st);
+	if (err)
+		goto err_out_free;
+
+	err = dst_create_disk(st);
+	if (err)
+		goto err_out_sysfs_exit;
+
+	mutex_lock(&dst_alg_lock);
+	list_for_each_entry(alg, &dst_alg_list, entry) {
+		if (!strcmp(alg->name, aname)) {
+			atomic_inc(&alg->refcnt);
+			try_module_get(alg->ops->owner);
+			st->alg = alg;
+			break;
+		}
+	}
+	mutex_unlock(&dst_alg_lock);
+
+	if (!st->alg)
+		goto err_out_disk_remove;
+
+	mutex_lock(&dst_storage_lock);
+	list_add_tail(&st->entry, &dst_storage_list);
+	mutex_unlock(&dst_storage_lock);
+
+	return st;
+
+err_out_disk_remove:
+	dst_remove_disk(st);
+err_out_sysfs_exit:
+	dst_storage_sysfs_init(st);
+err_out_free:
+	kfree(st);
+	return NULL;
+}
+
+/*
+ * Allows to allocate and add new algorithm by external modules.
+ */
+struct dst_alg *dst_alloc_alg(char *name, struct dst_alg_ops *ops)
+{
+	struct dst_alg *alg;
+
+	alg = kzalloc(sizeof(struct dst_alg), GFP_KERNEL);
+	if (!alg)
+		return NULL;
+	snprintf(alg->name, DST_NAMELEN, "%s", name);
+	atomic_set(&alg->refcnt, 1);
+	alg->ops = ops;
+
+	mutex_lock(&dst_alg_lock);
+	list_add_tail(&alg->entry, &dst_alg_list);
+	mutex_unlock(&dst_alg_lock);
+
+	return alg;
+}
+EXPORT_SYMBOL_GPL(dst_alloc_alg);
+
+static void dst_free_alg(struct dst_alg *alg)
+{
+	dprintk("%s: alg: %p.\n", __func__, alg);
+	kfree(alg);
+}
+
+/*
+ * Algorithm is never freed directly,
+ * since its module reference counter is increased
+ * by storage when it is created - just like network protocols.
+ */
+static inline void dst_put_alg(struct dst_alg *alg)
+{
+	dprintk("%s: alg: %p, refcnt: %d.\n",
+			__func__, alg, atomic_read(&alg->refcnt));
+	module_put(alg->ops->owner);
+	if (atomic_dec_and_test(&alg->refcnt))
+		dst_free_alg(alg);
+}
+
+/*
+ * Removing algorithm from main list of supported algorithms.
+ */
+void dst_remove_alg(struct dst_alg *alg)
+{
+	mutex_lock(&dst_alg_lock);
+	list_del_init(&alg->entry);
+	mutex_unlock(&dst_alg_lock);
+
+	dst_put_alg(alg);
+}
+EXPORT_SYMBOL_GPL(dst_remove_alg);
+
+static void dst_cleanup_node(struct dst_node *n)
+{
+	struct dst_storage *st = n->st;
+
+	dprintk("%s: node: %p.\n", __func__, n);
+
+	n->st->alg->ops->del_node(n);
+
+	if (n->shared_head) {
+		mutex_lock(&st->tree_lock);
+		list_del_rcu(&n->shared);
+		mutex_unlock(&st->tree_lock);
+
+		atomic_dec(&n->shared_head->refcnt);
+		dst_node_put(n->shared_head);
+		n->shared_head = NULL;
+	}
+
+	if (n->cleanup)
+		n->cleanup(n);
+	dst_node_sysfs_exit(n);
+	kfree(n);
+}
+
+static void dst_free_storage(struct dst_storage *st)
+{
+	dprintk("%s: st: %p.\n", __func__, st);
+
+	BUG_ON(rb_first(&st->tree_root) != NULL);
+
+	dst_put_alg(st->alg);
+	kfree(st);
+}
+
+static inline void dst_put_storage(struct dst_storage *st)
+{
+	dprintk("%s: st: %p, refcnt: %d.\n",
+			__func__, st, atomic_read(&st->refcnt));
+	if (atomic_dec_and_test(&st->refcnt))
+		dst_free_storage(st);
+}
+
+void dst_node_put(struct dst_node *n)
+{
+	dprintk("%s: node: %p, start: %llu, size: %llu, refcnt: %d.\n",
+			__func__, n, n->start, n->size,
+			atomic_read(&n->refcnt));
+
+	if (atomic_dec_and_test(&n->refcnt)) {
+		struct dst_storage *st = n->st;
+
+		dprintk("%s: freeing node: %p, start: %llu, size: %llu, "
+				"refcnt: %d.\n",
+				__func__, n, n->start, n->size,
+				atomic_read(&n->refcnt));
+
+		dst_cleanup_node(n);
+		dst_put_storage(st);
+	}
+}
+EXPORT_SYMBOL_GPL(dst_node_put);
+
+static inline int dst_compare_id(struct dst_node *old, u64 new)
+{
+	if (old->start + old->size <= new)
+		return 1;
+	if (old->start > new)
+		return -1;
+	return 0;
+}
+
+/*
+ * Tree of of the nodes, which form the storage.
+ * Tree is indexed via start of the node and its size.
+ * Comparison function above.
+ */
+struct dst_node *dst_storage_tree_search(struct dst_storage *st, u64 start)
+{
+	struct rb_node *n = st->tree_root.rb_node;
+	struct dst_node *dn;
+	int cmp;
+
+	while (n) {
+		dn = rb_entry(n, struct dst_node, tree_node);
+
+		cmp = dst_compare_id(dn, start);
+		dprintk("%s: tree: %llu-%llu, new: %llu.\n",
+			__func__, dn->start, dn->start+dn->size, start);
+		if (cmp < 0)
+			n = n->rb_left;
+		else if (cmp > 0)
+			n = n->rb_right;
+		else {
+			return dst_node_get(dn);
+		}
+	}
+	return NULL;
+}
+EXPORT_SYMBOL_GPL(dst_storage_tree_search);
+
+/*
+ * This function allows to remove a node with given start address
+ * from the storage.
+ */
+static struct dst_node *dst_storage_tree_del(struct dst_storage *st, u64 start)
+{
+	struct dst_node *n = dst_storage_tree_search(st, start);
+
+	if (!n)
+		return NULL;
+
+	rb_erase(&n->tree_node, &st->tree_root);
+	dst_node_put(n);
+	return n;
+}
+
+/*
+ * This function allows to add given node to the storage.
+ * Returns -EEXIST if the same area is already covered by another node.
+ * This is return must be checked for redundancy algorithms.
+ */
+static struct dst_node *dst_storage_tree_add(struct dst_node *new,
+		struct dst_storage *st)
+{
+	struct rb_node **n = &st->tree_root.rb_node, *parent = NULL;
+	struct dst_node *dn;
+	int cmp;
+
+	while (*n) {
+		parent = *n;
+		dn = rb_entry(parent, struct dst_node, tree_node);
+
+		cmp = dst_compare_id(dn, new->start);
+		dprintk("%s: tree: %llu-%llu, new: %llu.\n",
+				__func__, dn->start, dn->start+dn->size,
+				new->start);
+		if (cmp < 0)
+			n = &parent->rb_left;
+		else if (cmp > 0)
+			n = &parent->rb_right;
+		else {
+			return dn;
+		}
+	}
+
+	rb_link_node(&new->tree_node, parent, n);
+	rb_insert_color(&new->tree_node, &st->tree_root);
+
+	return NULL;
+}
+
+/*
+ * This function finds devices major/minor numbers for given pathname.
+ */
+static int dst_lookup_device(const char *path, dev_t *dev)
+{
+	int err;
+	struct nameidata nd;
+	struct inode *inode;
+
+	err = path_lookup(path, LOOKUP_FOLLOW, &nd);
+	if (err)
+		return err;
+
+	inode = nd.dentry->d_inode;
+	if (!inode) {
+		err = -ENOENT;
+		goto out;
+	}
+
+	if (!S_ISBLK(inode->i_mode)) {
+		err = -ENOTBLK;
+		goto out;
+	}
+
+	*dev = inode->i_rdev;
+
+out:
+	path_release(&nd);
+	return err;
+}
+
+/*
+ * Cleanup routings for local, local exporting and remote nodes.
+ */
+static void dst_cleanup_remote(struct dst_node *n)
+{
+	if (n->state) {
+		kst_state_exit(n->state);
+		n->state = NULL;
+	}
+}
+
+static void dst_cleanup_local(struct dst_node *n)
+{
+	if (n->bdev) {
+		sync_blockdev(n->bdev);
+		blkdev_put(n->bdev);
+		n->bdev = NULL;
+	}
+}
+
+static void dst_cleanup_local_export(struct dst_node *n)
+{
+	dst_cleanup_local(n);
+	dst_cleanup_remote(n);
+}
+
+/*
+ * Setup routings for local, local exporting and remote nodes.
+ */
+static int dst_setup_local(struct dst_node *n, struct dst_ctl *ctl,
+		struct dst_local_ctl *l)
+{
+	dev_t dev;
+	int err;
+
+	err = dst_lookup_device(l->name, &dev);
+	if (err)
+		return err;
+
+	n->bdev = open_by_devnum(dev, FMODE_READ|FMODE_WRITE);
+	if (!n->bdev)
+		return -ENODEV;
+
+	if (!n->size)
+		n->size = get_capacity(n->bdev->bd_disk);
+
+	return 0;
+}
+
+static int dst_setup_local_export(struct dst_node *n, struct dst_ctl *ctl,
+		struct dst_le_template *tmp)
+{
+	int err;
+
+	err = dst_setup_local(n, ctl, &tmp->le.lctl);
+	if (err)
+		goto err_out_exit;
+
+	n->state = kst_listener_state_init(n, tmp);
+	if (IS_ERR(n->state)) {
+		err = PTR_ERR(n->state);
+		goto err_out_cleanup;
+	}
+
+	return 0;
+
+err_out_cleanup:
+	dst_cleanup_local(n);
+err_out_exit:
+	return err;
+}
+
+static int dst_request_remote_config(struct dst_node *n, struct socket *sock)
+{
+	struct dst_remote_request cfg;
+	struct msghdr msg;
+	struct kvec iov;
+	int err;
+
+	memset(&cfg, 0, sizeof(struct dst_remote_request));
+	cfg.cmd = cpu_to_be32(DST_REMOTE_CFG);
+
+	iov.iov_base = &cfg;
+	iov.iov_len = sizeof(struct dst_remote_request);
+
+	msg.msg_iov = (struct iovec *)&iov;
+	msg.msg_iovlen = 1;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = MSG_WAITALL;
+
+	err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
+	if (err <= 0) {
+		if (err == 0)
+			err = -ECONNRESET;
+		return err;
+	}
+
+	iov.iov_base = &cfg;
+	iov.iov_len = sizeof(struct dst_remote_request);
+
+	msg.msg_iov = (struct iovec *)&iov;
+	msg.msg_iovlen = 1;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = MSG_WAITALL;
+
+	err = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);
+	if (err <= 0) {
+		if (err == 0)
+			err = -ECONNRESET;
+		return err;
+	}
+
+	if (be32_to_cpu(cfg.cmd) != DST_REMOTE_CFG)
+		return -EINVAL;
+
+	n->size = be64_to_cpu(cfg.sector);
+
+	return 0;
+}
+
+static int dst_setup_remote(struct dst_node *n, struct dst_ctl *ctl,
+		struct dst_remote_ctl *r)
+{
+	int err;
+	struct socket *sock;
+
+	err = sock_create(r->addr.sa_family, r->type, r->proto, &sock);
+	if (err < 0)
+		goto err_out_exit;
+
+	sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo =
+		msecs_to_jiffies(DST_DEFAULT_TIMEO);
+
+	err = sock->ops->connect(sock, (struct sockaddr *)&r->addr,
+			r->addr.sa_data_len, 0);
+	if (err)
+		goto err_out_destroy;
+
+	if (!n->size) {
+		err = dst_request_remote_config(n, sock);
+		if (err)
+			goto err_out_destroy;
+	}
+
+	n->state = kst_data_state_init(n, sock);
+	if (IS_ERR(n->state)) {
+		err = PTR_ERR(n->state);
+		goto err_out_destroy;
+	}
+
+	return 0;
+
+err_out_destroy:
+	sock_release(sock);
+err_out_exit:
+	return err;
+}
+
+/*
+ * This function inserts node into storage.
+ */
+static int dst_insert_node(struct dst_node *n)
+{
+	int err;
+	struct dst_storage *st = n->st;
+	struct dst_node *dn;
+
+	err = st->alg->ops->add_node(n);
+	if (err)
+		return err;
+
+	err = dst_node_sysfs_init(n);
+	if (err)
+		goto err_out_remove_node;
+
+	mutex_lock(&st->tree_lock);
+	dn = dst_storage_tree_add(n, st);
+	if (dn) {
+		err = -EINVAL;
+		dn->size = st->disk_size;
+		if (dn->start == n->start) {
+			err = 0;
+			n->shared_head = dst_node_get(dn);
+			atomic_inc(&dn->shared_num);
+			list_add_tail_rcu(&n->shared, &dn->shared);
+		}
+	}
+	mutex_unlock(&st->tree_lock);
+	if (err)
+		goto err_out_sysfs_exit;
+
+	if (n->priv_callback)
+		n->priv_callback(n);
+
+	return 0;
+
+err_out_sysfs_exit:
+	dst_node_sysfs_exit(n);
+err_out_remove_node:
+	st->alg->ops->del_node(n);
+	return err;
+}
+
+static struct dst_node *dst_alloc_node(struct dst_ctl *ctl,
+		void (*cleanup)(struct dst_node *))
+{
+	struct dst_storage *st;
+	struct dst_node *n;
+
+	st = dst_get_storage(ctl->st, ctl->alg, 1);
+	if (!st)
+		goto err_out_exit;
+
+	n = kzalloc(sizeof(struct dst_node), GFP_KERNEL);
+	if (!n)
+		goto err_out_put_storage;
+
+	n->w = kst_main_worker;
+	n->st = st;
+	n->cleanup = cleanup;
+	n->start = ctl->start;
+	n->size = ctl->size;
+	INIT_LIST_HEAD(&n->shared);
+	n->shared_head = NULL;
+	atomic_set(&n->shared_num, 0);
+	atomic_set(&n->refcnt, 1);
+
+	return n;
+
+err_out_put_storage:
+	mutex_lock(&dst_storage_lock);
+	list_del_init(&st->entry);
+	mutex_unlock(&dst_storage_lock);
+
+	dst_put_storage(st);
+err_out_exit:
+	return NULL;
+}
+
+/*
+ * Control callback for userspace commands to setup
+ * different nodes and start/stop array.
+ */
+static int dst_add_remote(struct dst_ctl *ctl, void __user *data)
+{
+	struct dst_node *n;
+	int err;
+	struct dst_remote_ctl rctl;
+
+	if (copy_from_user(&rctl, data, sizeof(struct dst_remote_ctl)))
+		return -EFAULT;
+
+	n = dst_alloc_node(ctl, &dst_cleanup_remote);
+	if (!n)
+		return -ENOMEM;
+
+	err = dst_setup_remote(n, ctl, &rctl);
+	if (err < 0)
+		goto err_out_free;
+
+	err = dst_insert_node(n);
+	if (err)
+		goto err_out_free;
+
+	return 0;
+
+err_out_free:
+	dst_node_put(n);
+	return err;
+}
+
+static int dst_add_local_export(struct dst_ctl *ctl, void __user *data)
+{
+	struct dst_node *n;
+	int err;
+	struct dst_le_template tmp;
+
+	if (copy
Previous message: [thread] [date] [author]
Next message: [thread] [date] [author]

Messages in current thread:
Distributed storage. Security attributes and ducumentation u..., Evgeniy Polyakov, (Fri Aug 31, 12:06 pm)
Re: Distributed storage. Security attributes and ducumentati..., Paul E. McKenney, (Thu Sep 13, 11:03 am)