Displaying Source Code(s)
|
|
Message Transfer In Cluster Computing
--------------------------------------------------------------------------------
Description : In this article improved version of new networking
protocol for distributed or parallel computations is presented.
In common, it is suitable just for fast, reliable and featureful
interchange of small messages. Protocol's implementation and
demo project are provided.
Commands Transfer Protocol (CTP) - New Networking Protocol for
Distributed or Parallel Computations
By dum
In this article improved version of new networking protocol for
distributed or parallel computations is presented. In common, it
is
suitable just for fast, reliable and featureful interchange of
small
messages. Protocol's implementation and demo project are
provided.
Introduction
Computational cluster is a specific system that asserts special
requirements for network functionality. Main properties of the
networking mechanism for good quality cluster are:
1. Fast data interchange.
2. Reliable data interchange.
3. Broadcasting support. As usual, all workstations inside the
subnet
took part in the computational experiment, so broadcasting makes
controlling
easier.
4. Huge data blocks interchange support (interchange of such
blocks
takes place not so often, that means that it need not be extra
fast).
5. Peer-to-peer networking. Any nodes can be data source and
data
destination, so they all are clients and servers simultaneously.
In fact, majority of parallel computation software toolkits are
represented as libraries, which use standard general-purpose
networking
protocols (TCP/IP [1], in most cases). There are a lot of
disadvantages
of using this protocol:
• Low speed of data interchange. The "reliability" and
"universality"
of TCP has a lot of overhead charges.
• TCP does not support broadcasting. UDP does, but it is not
reliable
and size of UDP datagram is limited by 65467 bytes [1].
• Ideology of logical channel creation before data interchange
is
redundant for cluster computations. Firstly because, as usual,
cluster
is a well tuned, good working net. Secondly because, some
strategies of
cluster computing lead to disordered interchange between
workstations.
• TCP is a stream-based protocol, but for determined task
bounded
blocks, interchange is preferable, because it allows to say
definitely, when
all necessary data have arrived.
Of course, specialized networking protocol can be adapted to
special
requirements, which arise for cluster computations. So, CTP is a
protocol that is to satisfy needs of arbitrary distributed
parallel computations
platform.
Ideology
The majority of existing toolkits for parallel computations use,
so
named, "messages" as basic abstractions. The basic abstraction
used in CTP is<BR>command". Command is an order from somebody to
someone to do something
(in most cases, workstations in clusters are communicating
exactly in
this way) or the response for such order. From last sentence, it
is possible
to conclude that command is characterized by following
parameters:
• "somebody" - sender.
• "someone" - recipient.
• "something" - command's description.
So, it is needed to define sender and recipient somehow. For
this
purpose, IP addresses will be used (the reason is that IP is
used extremely
widely). Commands will be identified by integer numbers.
In terms of discussed protocol words, "command" and "message"
are
synonyms, because "command" is "message" (but not always vice
verse).
CTP needs to satisfy cluster networking protocol properties. The
way in
that this will be achieved follows (in the same order as in
introduction):
1. For incrementing the speed of interchange, UDP will be used
as the
basis of the protocol (if IP was chosen for sender's and
recipient's
networking, it is obvious to use one of TCP/IP or UDP/IP
ideology).
2. Reliability of data interchange is to be implemented. Each
sent
packet will be stored until receiver has not returned the
confirmation of the
receiving. For implementation of this mechanism, packets are to
be
provided with identifiers. Identification will be performed by
assigning integer numbers on the sender-side. These IDs cannot
be unique in
general, but are to be unique for each sender.
3. Broadcasting support is one more argument to use UDP as the
basic
protocol.
4. Huge data interchange support is to be implemented. If a
message
that is greater than some limit (65400 bytes by default) is
going to be sent
then it is to be divided into smaller parts. These parts will be
enumerated and sent to the recipient separately. On the
recipient's
side, these parts will be united to arrange the initial command.
Recipient
application will get information about it's arrival only after
all its
parts were received. Such commands will be named as "large
messages",
but on practice majority of commands are carried by normal
messages (need
single packet).
5. For peer-to-peer interchange CTP's implementations are to
include
both, client and server functionality, united in solid unity.
CTP/IP's relationship with the OSI-model [2] and UDP/IP ideology
is
shown on pic. 1.
Pic. 1. Relationship between OSI-model, UDP/IP-model and CTP/IP-model
Implementation from the Theoretical Point of View
Each CTP-packet is represented, as usual, as header plus body
(data).
The structure of header is shown in table 1 (in order of
appearance).
Name Size (in bits) Comment
Packet size 16 Unsigned integer - size of the packet (including
header)
Command's number 16 Unsigned integer - command's number
(from zero to 32767, highest bit is not set). If highest bit is
set then packet represents
a confirmation of the receiving of the message with the
specified command.
Packet's number inside the large message 32 Unsigned integer.
For large messages - packets number, from zero to amount of
packets
(given by next field of the header) minus unity.
For normal messages - zero.
Amount of packets for the message 32 Unsigned integer.
For large messages - amount of packets, needed for it's sending.
For normal messages - zero or unity. ID 32 Unsigned integer.
Identifier of message. Must be unique for each sender Message
size
64 (only 48 are used)
Unsigned integer - size of the whole message (without any
headers).
So, size of biggest command is 232*65400
(maximal amount of packets multiplied by maximal size of
packet), that
equals 280890861158400 bytes or more than 255 terabytes. That is
near
to unlimited size of the message. It is obvious that 48 bits are
enough to
store size value, but 64 bits were apportioned for alignment.
Options 8 Set of bits. Each bit determines an option is set or
not.
Options will be discussed later.
Table 1. CTP packets header
Total size of the header is 25 bytes.
Important note is that ID is an identifier of message, not of
packet.
Single packet can be fully identified by its sender, ID and
number.
Messages need only sender and ID for their identification. The
uniqueness of ID for each sender is provided in the following
way: initial value
of ID of next message that will be sent is unity. After sending
of each
message, it is to be incremented. It is necessary to save it
after any
changing, to some local node's storage (in current
implementation for
Windows, it is stored in the registry to "LastID" value in
running
application's key "CTP") to protect systems from errors after
restarting and after failure.
The flowchart, that shows the sequence of operations for packet
interchange is shown on pic. 2.
2.
For sending the command, packets are to be arranged. Some memory
is to
be allocated and filled with packets headers and data. It will
not be
freed and unallocated just after sending, but stored to the sent
commands
storage (pointer to sent buffer and corresponding parameters).
Record
can be removed from sent commands storage only after all it's
packets
arrival have been confirmed.
This ideology can be implemented not for "each command", but for
"each
packet" (like in CTP v. 1.0), but first variant is preferable.
In this
case, so named, "smart buffers" can keep from redundant memory
allocations, by reserving memory, needed for headers, while
packets
data arrangement.
One of the key parameters of CTP's implementation is, so named,
timeout.
If confirmation of packets arrival will not be received during
timeout
then packet will be resent. If it will not be confirmed during 5
timeouts, multiplied by amount of packets, used for this
command, then error
message "Command is not confirmed too long" will be generated.
If timeout is
set to zero then this feature will be switched off.
First thing recipient does when it receives a packet is checking
if
same packet was already received (packet with same ID and number
from the
same sender). For this purpose, receiver looks through the
received packets
storage (list of structures that stores IP-addresses of senders,
and
lists of IDs and numbers of received packets). If such a packet
was already
received that means that sender failed to get the confirmation,
so
confirmation has to be sent again and this packet's receiving
could be
skipped.
Confirmation exactly "has to be sent again", not "resent".
Confirmations are not stored in the sent packets storage, but
are generated when
needed.
If such a packet have not been received earlier, then
information about
it's arrival needs to be stored in the received packet's
storage. This
storage is to be limited, and if this limit will be reached,
then the
oldest record is to be removed from the storage. If got packet
represents the normal message, then server informs the recipient
about data
arrival.
If it is part of a large message, then server stores it to the
large
messages storage (list of large packet descriptors: sender,
amount of
received parts, total amount of parts and buffer for arranging
the
whole message). Each part of the message, except the last one,
are of maximal
datagram size, so parts can easily find their places in the
buffer,
knowing their numbers and the total amount of parts. After
putting next
part to the buffer, the amount of received parts is to be
increased.
When the number of received parts reaches the total amount of
parts, the
message is considered to be compiled and recipient can be
informed
about data arrival.
The received packets storage keeps from putting the same part of
the
large message to buffer twice. That means that protocol is
stable and
errorless when the limit of received packets storage is greater
than the amount
of parts for the longest message. But on practice, there is no
need to set
this limit to 232, because first parts become not topical, after
being
confirmed, so value of limit several thousands, usually, will be
enough.
After the packet has found it's place, confirmation has to be
sent, and
then recipient begins to wait for the next packet.
When sender receives any confirmation, it is to delete
corresponding
record from the sent packets storage. The received packets
storage is
freed only when CTP server is to terminate. The large messages
storage
is also freed when CTP server is to terminate. Single record is
to be
deleted from it after the whole message has been compiled.
This means, that mechanism, like an alive creature, aspire to
minimize
it's potential energy, to free all storages as soon, as
possible.
Questions that can rise after explanations above are: what
confirmation
actually is, and how recipient application is informed about
arrival of
the message or error occurred while networking?
Answer for the first question: confirmation is a packet with
empty body
(header only), which has only three differences between headers
of the
packet having been confirmed and the confirmation. In
confirmations
header:
• packet's size is set to header's size;
• in command's number highest bit is set;
• message size is set to zero.
Answer for the second question: whole received message or error
description is, so named, "delivery". Speaking in terms of
object-oriented programming: objects, that implement recipient
application, can
subscribe to get deliveries for given command. In this case
corresponding object
will receive information about command's arrival, and about
errors,
which are related to this command.
Also, there is to be default receiver that gets information
about
command, which has no subscribers, and about errors, which have
no related
commands.
In conclusion, it is time to discuss available options of
packets (last
field of packet's header). There are three possible options:
1. ErrorOnce - if set, then information about sending this
packet will
be removed after receiver was informed that its arrival was not
confirmed.
So, this error will be sent only once.
2. NoResend - if set, then this packet will not be resent even
if it was not confirmed.
3. UniqueCommand - if set, then confirmation of this packet's
command will confirm all
packets with the same command's number that were sent to given
recipient.
It is not allowed to use this option for large messages to
protect it from integrity corruption.
These options can be used in any combination: separately, all
together and so on.
For example, options set ErrorOnce + NoResend + UniqueCommand
can be useful for commands like: <BR>answer me if you are alive"
(ping). For commands, that are sent often, which are
small and which does not bring information, but response or
confirmation does
(recipient is working).
Implementation for Windows
CTP was implemented for operating system Windows to become basis
of
networking mechanism used in (Cellular Automata Modeling
Environment &
Library) project. Of course, it also could be used for arbitrary
applications.
Protocol's implementation is represented by a set of classes.
The class
that contains main functionality of CTP is CCTPNet. Description
of all
classes, which are involved in CTP's implementation follows.
IPAddr Class
Objects of IPAddr class represent IP-address of workstation.
This class
does not need any explanations except the source:
union IPAddr
{
// Constructors
IPAddr() {SetLocalhost();};
IPAddr(unsigned char b1,unsigned char b2,
unsigned char b3,unsigned char b4)
{Bytes.b1=b1;Bytes.b2=b2;Bytes.b3=b3;Bytes.b4=b4;};
IPAddr(unsigned long l) {Solid=l;};
// Returns true is this ip-address refers to localhost
inline bool IsLocalhost() {return Bytes.b1==127&&Bytes.b2==0
&&Bytes.b3==0&&Bytes.b4==1;};
// Returns via s and return value dotted string
//representation of ip-address
inline STRING GetString(STRING s)
{sprintf(s,"%d.%d.%d.%d",Bytes.b1,Bytes.b2,
Bytes.b3,Bytes.b4); return s;};
// Set stored ip address to value, represented with string s (in
// dot-separated format). Returns true if
// succeeded and false otherwise
bool FromString(STRING s);
// Set stored ip address to localhost
inline void SetLocalhost() {Bytes.b1=127;Bytes.b2=0;
Bytes.b3=0;Bytes.b4=1;};
// Operators
bool operator ==(unsigned long ip) {return Solid==ip;}
bool operator ==(IPAddr ip) {return Solid==ip.Solid;}
bool operator !=(IPAddr ip) {return Solid!=ip.Solid;}
IPAddr& operator =(const IPAddr ip) {Solid=ip.Solid; return
*this;}
// Actual data
struct IPBytes {
unsigned char b1,b2,b3,b4;
} Bytes;
unsigned long Solid;
};
Objects of IPAddr class represent "smart buffers", that save
CTP's
implementation from redundant memory allocations. It reserves
place for
packet's header once per definite size of data (maximum amount
of data
in single packet) on the fly. So user just put data to smart
buffer. Then
CTP's sending function adds headers and packets are ready to go.
This
class's definition
CCTPNet Class
Class implements main CTP's functions (client and server
simultaneously).
Following definition describes the members
Implementation of protocol functionality is multithreaded. There
are
three types of threads:
• Server thread realizes confirmation support, error message
generation
and other necessary CTP specific networking functions. If data
arrived
or error information appeared, thread adds it to the deliveries
list. On
idle loop, thread can fall asleep for one tenth of timeout (or
for 100
milliseconds if timeout is zero). Once per tenth of timeout,
this
thread checks existence of packets that need resending, and
resends them if
necessary.
• Delivery manager thread creates additional deliverer threads
if all
existing deliverers are busy and deliveries list is not empty.
Of
course, maximum amount of deliverers is limited by CCTPNet's
parameter.
Protocol's mechanism aspires to reduce loading. On idle loop,
thread can fall
asleep for 100 milliseconds.
• Deliverer threads check deliveries list and if it is not empty
then
deliver first delivery to corresponding subscriber or to the
default
recipient. If thread does nothing for a minute then it will be
terminated.
On idle loop, thread can fall asleep for
(log10(number_of_idle_loops)+1)*50 milliseconds. Each command is
order
(or response). It can order to do something difficult and
enduring. So
implementation of deliverers in separate threads allows to
compute
something "by order". Nevertheless, it is strongly not
recommended to
use AfxMessageBox and modal dialogs in command receiving
handlers, because
it will keep deliverer busy uselessly.
How to use all this?
First of all, add files CTPNet.h, CTPNet.cpp, Network.h,
Network.cpp to
projects that are to use CTP. Then put following directives to
StdAfx.h:
// Include MFC multithreading support
#include <afxmt.h>
// Include STL
#pragma warning(push)
// Disable STL-critical warnings
#pragma warning(disable: 4245)
#pragma warning(disable: 4100)
#pragma warning(disable: 4663)
#pragma warning(disable: 4018)
#pragma warning(disable: 4786)
#pragma warning(disable: 4097)
#include <list>
#include <vector>
#include <algorithm>
using namespace std;
// Enable STL-critical warnings
#pragma warning(pop)
// CRT library includes
#include <sys/timeb.h>
#include <time.h>
#include <math.h>
// Include Windows Sockets
#include <Winsock2.h>
#include <Ws2tcpip.h>
These includes, except the last one, are put to provided file
NetIncludes.h. It is necessary also to link WinSocket library
ws2_32.lib
to your project (choose "Project" | "Settings" | "Link" then
type<BR>ws2_32.lib" in "Object/library modules" edit field).
Then, start Winsock up. For this purpose, for example, put
following
code in project's main window's initialization function:
WSADATA wsaData;
WSAStartup(MAKEWORD(2,2),&wsaData);
After this, place following code to start CTP server up:
m_pCTP = new CCTPNet("CTP",m_pCTPReceiver,1515,1000);
// Server created suspended so it needs to be started manually
m_pCTP->SetSuspended(false);
Last code is correct in the assumption that m_pCTPReceiver is
NetReceiver's descendant.
Demo Application
Demo application that is provided with this article shows usage
of
described CTP implementation. It also includes implementations
of TCP
and UDP in the same framework, so all these protocols can be
used together
and, obviously, can be compared (pic. 3).
Pic. 3.
Time of interchange via CTP, TCP and UDP (where possible) in
microseconds versus size of message. Results of protocols
research
using provided demo project.
Similar result of CTP and UDP shows that CTP's implementation
doesn't
use critical amount of resources. It's overhead expenses are
small enough
to be ignored.
CTP is up to 70% faster than TCP while working with normal
messages and
messages that could be brought by several packets. That is great
result, because the overwhelming majority of interactions in
cluster are
performed using normal messages. Order to start computations,
query of some
values and response for such queries are small.
But TCP is better using large messages. Nevertheless huge data
blocks
appear in cluster computations rarely, for example, on the stage
of
task separation (and even here, not always). Important note is
that CTP is
not critically slow for large blocks, so it can be used as
networking
mechanism in cluster, paying attention to previous paragraph.
Besides, test had been performed on two nodes, because it is
more
interesting here: pure protocols throughput. Results of
comparing for
rapid interchange between dozen of nodes are to be more pleasant
for
CTP, because it's activities will stay the same, but TCP will
loose a lot on
channels creating and recreating. For CTP does not matter, for
whom to
send.
Reason, why it is impossible to overcome TCP, because it is on
kernel
level, but CTP is implemented by application. From one side -
this is
disadvantage of the last one, but from another, it is absolutely
independent and complete. |
|
|