Computer Science Technical Report 95-03
Barry DwyerDepartment of Computer Science University of Adelaide G.P.O. Box 498, Adelaide South Australia 5005 October 30, 1995. AbstractThis report describes how a very high throughput of transactions can be supported on a parallel processor architecture by processing batches of transactions. The resulting speed-up is strictly proportional to the number of processors, and is obtained because access to the database is contention free. The possibility of contention is eliminated by factorising the system specification into independent parts. Each part has its own database, which is consistent with the others only before and after each batch of updates. The factorisation is not possible for an arbitrary system specification, and requires its structure to have the properties of `separability' and `independence'. The techniques used are directly analogous to those traditionally used in sequential file batch processing.KeywordsTransaction processing, parallel algorithms, batch processing, speed up, contention-free, inconsistent databases, separability, independence, sequential access.1. Transaction Processing SystemsThis report is concerned with the implementation of batch transaction processing systems. The key aspects of a transaction processing system are a database, and transactions that update or query the state of the database. Transaction processing systems should guarantee the four so-called ACID properties [5,6]:Atomicity: Failed transactions do not produce side-effects: either a transaction completes its action on the database, or it has no effect. Consistency: A transaction moves the database from one consistent state to another. Isolation: A transaction depends on other transactions only through the changes they make to the database. Durability: After a transaction completes, the changes it made to the system state persist indefinitely. The isolation property greatly simplifies the semantics of transaction processing by guaranteeing that the effect of a transaction depends only on the state of the database, and not on any concurrent transactions. Of course, transactions may still interact. If one person books the last seat on an aeroplane, a second person cannot also book it. However, the interaction occurs purely because the first transaction makes the number of free seats in the database become zero. So, to a first approximation, neither transaction need be aware of the other; but it is only to a first approximation, because, if the two transaction are concurrent, they must actively avoid interaction in some way. Typically, the first transaction will lock part of the database, the second transaction will detect the lock, and either wait for it to complete or, if a deadlock would result, it must roll back. Record locking and roll-back add to the overheads of database management--even if nothing is ever rolled back, it is still necessary to log the records that make it possible. However, if it is logically impossible for concurrent transactions to interact, we say that the database is contention-free, and the overheads are avoided. 2. Separable SystemsIt has long been known that productivity can be raised through the division of labour. Consider a manual library system that records the numbers of copies of books remaining on the shelves, and the number borrowed by each library patron. Each book and each patron is associated with a card index. When a patron borrows a book, the book record must be adjusted to decrease the number of books on the shelves, and the patron record must be adjusted to increase the number of books borrowed. In the simplest possible implementation of the system, a library clerk adjusts these records directly after each loan is made. Now suppose that the library becomes busier, and that adjusting the file cards causes queues of patrons to build up. The clerk therefore simply notes the identifiers of the book and the patron, and updates the file cards later during quiet times, in batches. The clerk will quickly find that it is faster to sort the loans into card file order, first by book to update the book card index, then by patron to update the patron cards, rather than have to search the book and patron files at random.Now suppose the library becomes busier still, and it is necessary to employ two clerks. They may return to the direct method of recording loans, but since they share the use of the same card indexes, they will need to co-operate in using them, and will sometimes even contend for the same book record. Again, they will find it easier to record loans and update the files separately. There are several ways they can do this. One option is for one clerk to control the book file, and the second clerk to control the patron file as symbolised in Fig. 2.1. The `C' process represents the first clerk, and the `D' process represents the second. Loans are recorded as pairs of values: `titles' and `users', (t, u). The `C' process updates the number of copies of each book, and the `D' process updates the number of books drawn by each user. The (t, u) pairs enter the `C' update process, which decrements the number of copies for title `t'. The `u' values are passed to the `D' process, which increments the number of books drawn by patron `u'. It is a property of this system that the `C' and `D' attributes stored in the database will typically be inconsistent, with the state of `D' lagging behind the state of `C'.
Fig. 2.1: A Process Pipeline Suppose that the library becomes so busy that more clerks are required. How can they operate most efficiently? One option is for several clerks to record loans, while two clerks continue to update the files. But what if this is not enough? If two clerks are allocated to update the books file, they will need to share its use, and it may prove that two clerks can work no more quickly than one. But suppose that the books file is split A-N, O-Z. Then two clerks can work independently, with double the speed. The same trick can be used to speed up access to the patrons file, and by splitting the files into more and more parts, the speed-up can be increased as much as needed. The `C' and `D' processes of Fig. 2.1 each then have the internal structure of Fig. 2.2. The transactions are distributed to a number of update processes, each of which has access to a partition of the `C' or `D' data. When a transaction inspects or updates more than one title or user, the results have to be collected again. It is possible for the states of the partitions to be inconsistent with one another, because each has an independent history. This causes a problem when the state of the whole database must be checked, e.g., to count the number of books on loan, but, as will be shown, the problem is easily resolved.
Fig. 2.2: A Model of Independent Access The principles underlying this report are `separability' and `independence'. There are three ways in which two processes can interact: If messages need to be passed between them in both directions, they are said to be `closely coupled'. Two processes are `separable' if messages need to be passed between them in one direction only. Two processes are `independent' if no messages need to be passed between them at all. If two processes are separable, they can be connected by a queue. If they are independent, they need no connection. If processes are separable or independent, their databases can be inconsistent. There is a close analogy between the design of a parallel batch transaction processing system and a traditional batch sequential file updating system. Separability is the property needed to to link processes using transfer files. Independence allows files to be updated sequentially, rather than by random access. In a sequential update, a single process updates each record independently, simulating the effect of many parallel processes. A major difference between sequential and parallel processing is the means of distribution and collection of transactions. In a sequential update process, transactions are distributed and collected by sorting work files. In a parallel update, they are distributed and collected by a message network. A generalisation of the algorithm given below is applicable to many systems that update database states in response to a stream of transactions unfolding over a period of time. Such systems arise in information processing, databases, and discrete-event simulations. A newly important area is that of `reactive' programs, i.e., interactive programs (often with mouse-driven graphical user-interfaces) that respond to almost arbitrary sequences of user commands. Another emerging area of importance is that of `Workflow Management' or `Groupware' [1], which allows users at several work-stations to co-operate in the completion of a task by sending messages over a network. This technology is a direct automation of manual office procedures in which messages used to be passed as memos. 3. Specifying The Library System
void benchmark ()
{ int now, time, kind, title, user, stock, loans;
for (now = 1; now <= n_events; now++)
{ time = event_0[now].time; kind = event_0[now].kind;
title = event_0[now].title; = event_0[now].user;
switch (kind)
{ case BUY:
C[title] = C[title] + 1; break;
case BORROW:
if (C[title] > 0)
C[title] = C[title] - 1;
D[user] = D[user] + 1; break;
case RESHELVE:
C[title] = C[title] + 1; D[user] = D[user] - 1; break;
case AUDIT:
stock = 0; loans = 0;
for (title = 1; title <= n_titles; title++)
stock = stock + C[title];
for (user = 1; user <= n_users; user++)
loans = loans + D[user];
printf ("At time %d, %d books are in stock, and %d on loan.\n",
now, stock, loans);
break;
default: break;
}
}
}
Fig. 3.1: The Benchmark Implementation It is one thing to state that parallelism is possible in theory; it is another to demonstrate a useful speed up over a single process. The overheads of message passing may easily outweigh the advantages of parallelism. To verify the basic idea, a model of the library system was implemented on a 32-processor CM-5, using the `C' programming language and the CM-5's CMML message-passing library. It was decided to compare the parallel implementation with a single process benchmark, shown in Fig. 3.2. This benchmark may also be regarded as a system specification. In addition to the `borrow' transactions discussed in the previous section, the system makes provision for new books to be bought, books to be reshelved, and the total numbers of books in stock and on loan to be audited. The database is modelled by two arrays: `C', which records the number of copies in stock of each book, and `D', which records the number of books drawn by each patron. A single structure, `event' is used for all transaction types, containing four fields: the kind, time, title and user. The `kind' is a tag that distinguishes `buy', `borrow', `reshelve' and `audit' transactions from one another. The `time' is used to ensure that transactions are uniquely identified and are processed in the right order. The `title' identifies a book, and the `user' identifies a patron. In the case of a `buy' transaction, the `user' field is ignored, and in the case of an `audit' both `user' and `title' are ignored. The array `event_0' models a batch of transactions. The `benchmark' function steps through the terms of this sequence, branching to an different function for each kind of transaction. The three kinds of update (`buy', `borrow', and `reshelve') affect at most one book or patron. In contrast, the `audit' query inspects all the book and patron records. An important aspect of the specification is the condition `C[title] > 0' in the `borrow' function. This makes the updating of the `D' array contingent on the state of the `C' array--but the updating of the `C' array is not contingent on the state of the `D' array. This makes it possible to use the pipe-line architecture of Fig. 2.1. Such a factorisation and its converse substitution is not possible in general, information can flow only from the `C update' to the `D update'--but none can flow the other way. If the condition `C[title] > 0' were changed to read `D[user] < 10'--limiting each patrons' borrowings to a maximum of 10 books--the `D update' would have to precede the `C update'. If both conditions had to be tested, the separation of the two processes would not be possible at all. Figs. 3.2 and 3.3 show how the benchmark implementation of Fig. 3.1. can be factorised into separate `C' and `D' processes in the manner of Fig. 2.1. The `C update' process accesses only the `C' array, and the `D update' process accesses only the `D' array. The `C update' may be allowed to complete before the `D update' begins. The two processes communicate through the `event_0' array. Typically, the `C update' does some processing, and leaves the transaction record for the `D update' to find. However, in the case of `borrow' transactions for which the number of copies is zero, the `C update' sets kind of the transaction to `done', so that the `D update' will ignore it. In the case of an `audit' transaction, the `C update' passes a partial result (the number of books in stock) to the `D update'. The important property of this two-step implementation is that is preserves the correctness of the specification of Fig. 3.1. One way to see this is to consider each transaction as a procedure call: as each transaction is examined by the loop, it causes a specific procedure to be executed. This is true of the system as a whole, and is also true for the communication between `C update' and `D update'. Therefore, if the `C update' passes a transaction record to the `D update', it is equivalent to a call on a procedure in the `D update'. The effect is the same as if the corresponding segment of the `D update' procedure is substituted for the `call' in the `C update' procedure. These substitutions yield the procedures of Fig. 3.1.
void C_update ()
{ int title, now, stock;
for (now = 1; now <= events_0; now++)
{ title = event_0[now].title;
switch (event_0[now].kind)
{ case BUY: C[title] = C[title] + 1; break;
case BORROW:
if (C[title] > 0) C[title] = C[title] - 1;
else event_0[now].kind = DONE;
break;
case RESHELVE: C[title] = C[title] + 1; break;
case AUDIT:
stock = 0;
for (title = 1; title <= max_title; title++)
stock = stock + C[title];
event_0[now].stock = stock;
break;
default: break;
}
}
}
Fig. 3.2: The `C' Update Phase
void D_update ()
{ int user, now, loans, stock;
for (now=1; now <= events_0; now++)
{ user = event_0[now].user;
switch (event_0[now].kind)
{ case BORROW: D[user] = D[user] + 1; break;
case RESHELVE: D[user] = D[user] - 1; break;
case AUDIT:
loans = 0;
for (user = 1; user <= max_user; user++)
loans = loans + D[user];
stock = event_0[now].stock;
printf ("At time %d, %d books are in stock, and %d on loan.\n",
now, stock, loans);
break;
default: break;
}
}
}
Fig. 3.3: The `D' Update Phase Note too, that the updates to each element of either the `C' or `D' arrays are independent of the values of the other elements. This independence allows the elements of the arrays to be updated in parallel, but it is only revealed after first separating the `C' and `D' processes. Without the separation, transactions affect title-user pairs. Parallel processes for each title would contend for patron records, and parallel processes for each user would contend for book records. Parallel processes for each title-user pair would contend for both book and patron records. Thus, although the factorisation of Fig. 3.1 into Figs. 3.2 and 3.3 achieves little in itself, it opens the way to massive contention-free parallelism. Separability and independence are properties of a system that may be deduced from its specification. This important topic is not considered here, as it is the subject of an earlier Technical Report [2]. 4. ImplementationThis section describes some experiments using a CM-5 parallel processor that show how the decomposition of the library system can be implemented as a parallel computer program. To understand them, it is first necessary to understand the architecture of the CM-5.The CM-5 contains a number of identical parallel processors--32 in the configuration used in the experiment. A separate front-end `host' processor loads the parallel processors with identical copies of the same program. This means they have identical memory maps, which simplifies inter-process communication. The processors need not execute the same instructions however; because each processor has its own identification number, it can learn its own identity and perform code specific to itself. The processors of a CM-5 are interconnected by two message switching networks: the data network, and the control network. The data network is used for large message packets, and the control network is used for short messages, mainly to control synchronisation. The control network has some built-in logical capability. It is organised as a tree, and, among other things, can find a global sum using a logarithmic time reduction algorithm. The network structures are invisible to an application program, which may send messages from point to point between any pair of processors. A problem in any parallel algorithm is to obtain sufficient granularity; the size of the task given to each processor has to be big enough to justify the overhead of inter-processor communication. This proved to be a major issue. The CM-5 has a 50MHz clock (20nS per cycle), and the latency for message passing proved to be about 30uS, a ratio of over 1,000:1. Because all the transactions except `audit' cause only a few instructions to be executed, their execution times are easily swamped by the message passing overhead. For the sake of argument, say that processing a `borrow' event takes 200 clock cycles, 4uS on a single processor. If each event involved passing just two messages as in Fig. 2.1, it would take about 60uS. If the work of processing many events were equally shared across 32 processors, 32 events could be processed in the same time, so each event could be said to take about 2uS. This is a very modest speed up. On the other hand, since they access the whole database, `audit' events have much bigger granularity. Suppose there are 1,000 titles and 1,000 users. Then it is hard to imagine how an `audit' event could be processed in fewer than about 5,000 clock cycles, or 100uS. This is greater than the latency of message passing, so a useful speed-up should be obtained, and indeed this proved to be the case. The main challenge in modelling the library system was to avoid the bottleneck of having a single process distribute transactions to the update processes. This was avoided by assigning a collecting processor to each transaction, and distributing the transactions evenly across the collectors. (The reader should imagine Fig. 2.2 with 32 distribution and collection processes, as well as 32 update processes.) The collectors then distributed messages to each update process as needed. The same collectors were used to collect results, as when accumulating the results of an `audit'. (An alternative would be to have the CM-5's control network compute sums, but unfortunately this option was not technically compatible with other aspects of the implementation. Instead, partial sums were accumulated on each processor, then forwarded to the collector.) The collector for each update was chosen on the basis of the `title' or `user' concerned. An `audit' concerns all titles and users, so a different collecting processor was assigned cyclically to each `audit' transaction in turn. In this way, each collector was given a roughly equal load. To set up the test conditions, each processor generated an identical pseudo-random batch of transactions. The number of transactions in the batch could be varied from one experiment to another. Each processor also generated feasible initial values for the database (e.g., to ensure that a `reshelve' transaction did not leave a patron with a negative number of books). However, because `audit' transactions have such a big effect on the overall performance, random variations in their numbers made it difficult to compare experiments, so they were generated at regular intervals--to snapshot the database after every 100 updates, say.
Fig. 4.1: A Model of the Implementation The execution of the algorithm was divided into six well-defined phases corresponding to the distribution, update, and collection phases (of Fig. 2.2) of the `C' and `D' processes (of Fig. 2.1). Fig. 4.1 shows how the implementation was organised. Each processor assumed the roles of `C distributor', `C update', `C cordinator', `D distributor', `D update' and `D collector' in turn. (Fig. 4.1 shows 3 rather than 32 processors.) Messages can be passed between any two successive processes in the pipe-line, except between a `C collector' and a `D distributor'. This is because each `C collector' assumes the role of `D distributor' without the need for message passing. For ease of programming, no processor started a new phase until every processor had completed its current one. This ensured that all outstanding messages had been received before each phase was wound up. The phase boundaries were enforced by a `synchronise' function, described later. The initial values of the `C' and `D' arrays were generated on all the processors. `Title' and `user' identifiers were represented by positive integers, and were mapped onto the processors modulo 32, so that Processor 1 had access to C(1), C(33), C(65), etc., and to D(1), D(33), D(65), etc., Processor 2 had access to C(2), C(34), C(66), etc., and to D(2), D(34), D(66), etc., and so on for each processor. However, elements C(0) and D(0) were unused, `0' being used as a dummy title or patron. After the arrays were initialised, each processor repacked the `C' and `D' elements allocated to it so that they were in successive locations. The processor number and array index could then be computed as follows: Consider the number of copies of title 100. 100 divided by 32 is 3, remainder 4. Therefore the number of copies of title 100 is actually stored in location C(3) of Processor 4. A similar mapping was used for transactions, so that the sequence of input transactions was initially distributed evenly across processors 0-31. Once the test conditions had been set up, a timer was started, and each processor functioned as a `C distributor' by executing its `C distribution' phase, sending the test data it had been allocated to the `C update' process concerned. In the case of all except the `audit' transactions, the update processor was computed from the value of `title', modulo 32. The `audit' transactions had to be broadcast to all the `C update' processes. Since `audit' transactions appeared at regular intervals in the list of transactions, the time of the transaction itself, modulo 32, determined the collecting processor. (This was done for simplicity. It does not guarantee an equal load on each processor unless the interval is an odd number.) The second, `C update' phase, consisted of two steps: first the transactions received by each `C update' process were sorted into `time' sequence, then used to update or inspect the `C' array. The sort was needed because inputs could arrive from 32 sources. It is possible that one `borrow' transaction could overtake an earlier one, and the wrong patron could then withdraw the last copy of a book. In processing an `audit' transaction, each processor could only form a partial sum, since it had access to only 1/32 of the entire `C' array. The third phase, which began only when all the `C updates' had completed, was called `C collection'. In this phase each update processor sent the partial sums it had sampled for `audit' transactions to their `C collector' processes. Each collector then added the sums it received to its own sum, computing the stock of books at the time of the audit. No action was needed for other transactions, because the `C collector' was assigned to the same physical processor as the `C update'. The fourth phase, `D distribution', is as follows: For a successful `borrow' or `reshelve' transaction, each `D distributor' sends a message to the `D update' process determined by the `user', modulo 32. (The `D distributor' for a transaction is always the same processor as its `C collector' so no messages have to be passed between the third and fourth phases.) For `buy' transactions and unsuccessful `borrow' transactions, no message is sent (or needed). For an `audit' transaction, its `C collector' broadcasts to each `D' process a message containing the number of books in stock. It is therefore similar to the `C distribution' phase. The fifth phase, called the `D update', is essentially similar to the `C update'. It is followed by the sixth, `D collection', phase, which is similar to the `C collection' phase, and which computes the total number of books on loan. Following the sixth phase, the timer was stopped, and the elapsed time displayed. Each processor then displayed the results of its `audit' transactions. The time required to write these outputs was excluded from the experiment. This was because all output had to pass through the bottleneck of a single processor. Actually, the treatment of the `audit' transaction in the `D distribution' phase is more general than this particular problem demands. The `C collector' and `D distribution' processes are allocated to the same processors, so that each `D distribution' process already has a list of all the `audit' transactions, because its processor has just finished executing the `C collection' phase. Furthermore, the processor allocated to the `D collector' of an `audit' transaction is the same one as the `C collector', so it already knows the number of books in stock. It is therefore unnecessary for the `audit' transactions to be broadcast. However, for global transactions of a more general type it is likely that processing during the `D update' phase would depend on the result of the `C update' phase (as it does for `borrow' transactions), so a broadcast would be needed. Including the broadcast was considered a fairer measure of speed up. 5. Program NotesFigs. 5.1a-k show the details of the implementation. (The reader may skip this section at first reading.) For convenience in exposition, the program is described top down. The order in which its functions are described is not the correct order for compilation. (The correct order is shown by each figure being labelled `(Part 1)', `(Part 2)', and so on.)Fig. 5.1a declares constants and global variables. Different experiments were conducted by changing the values of `n_titles', `n_users', `n_events' and `audit_gap'.
#include <stdio.h> /* standard input-output */
#include <cm/cmmd.h> /* command level inter-process communication */
#include <sys/file.h> /* C input-output */
#define NPROCS 32 /* number of processors in our CM-5 */
#define WHICH 31 /* to find processor number, i.e. x%32 */
#define WHERE 5 /* to find user or title, i.e. x/32 */
#define TITLES 1000000 /* highest title in any experiment */
#define USERS 1000000 /* highest user in any experiment */
#define EVENTS 100000 /* number of transactions in any experiment */
#define DONE 0 /* code for transactions that are finished */
#define BUY 1 /* code for Buy transactions */
#define BORROW 2 /* code for Borrow transactions */
#define RESHELVE 3 /* code for Reshelve transactions */
#define AUDIT 4 /* code for Audit transactions */
struct event{int time, kind, title, user;};
/* all transactions have this structure */
int process; /* identifies the process instance */
int events_0,events_1,events_2 = 0; /* lengths of the transaction lists */
int sends, receives = 0; /* numbers of messages sent and received */
int n_titles = 1000; /* number of titles in an experiment */
int n_users = 1000; /* number of users in an experiment */
int n_events = 1000; /* transactions in batch */
int audit_gap = 1; /* gap between audit transactions */
int max_title = 0; /* index of biggest title assigned to process */
int max_user = 0; /* index of biggest user assigned to process */
static seed = 1; /* seed for random number generator */
static int C[TITLES+1], D[USERS+1]; /* the C and D arrays */
struct event event_0[EVENTS+1], event_1[EVENTS+1], event_2[EVENTS+1];
/* event lists */
Fig. 5.1a: Parallel Implementation (Part 1) A single structure, `event', is used for all transaction types, containing the `kind', `time', `title' and `user'. There are three transaction lists: `event_0', `event_1' and `event_2'. The first is generated by `initialise' and `remap' to contain the test data. The second is the list received by each `C update' process. During the `C update' phase, the `C' processes modify their lists so that they become outputs to be sent to the `D update' processes. Modifying the lists rather than creating new ones was done for simplicity. In this example, at most one output is written by any transaction; but in general, each `C' process might need to write its outputs to a separate list. The third list, `event_2', is the list received by each `D' process. Fig. 5.1b shows the main procedure of the program. All 32 processes engage in a common initialisation phase; all generate the same test data and initialise their `C' and `D' arrays to the same values. (Initialisation is not included in the measured execution time.) After initialisation, all the processes are synchronised. Process 0 then starts a timer, which it displays when the experiment is complete. Processes 0-31 then execute the phases already outlined, synchronising with each other between phases. Using global synchronisation between phases is a convenience rather than a necessity. It means for example, that any message received during the `C collection' phase must be a partial sum sent to a collector; it cannot be a message from a faster process that has already reached the `C send to D' phase. This makes the program more procedural, less event driven, and easier to understand--but since a process that is waiting to synchronise is not doing useful work, the convenience entails a small loss of efficiency.
main ()
{ int now;
process = CMMD_self_address();/* find identifier of this process */
CMMD_fset_io_mode(stdout, CMMD_independent); /* merge all .. */
fcntl(fileno(stdout),F_SETFL,O_APPEND); /* ... processes' outputs */
initialise (); /* generate test data */
remap (); /* allocate test data to processors */
if (process == 0) CMMD_node_timer_clear (0);
synchronise (); /* synchronise all processors */
if (process == 0) CMMD_node_timer_start (0);
C_distribution (); /* start C distribution */
synchronise (); /* end distribution */
C_update (); /* start C update */
synchronise (); /* end of C update */
C_collection (); /* start C collection */
synchronise (); /* end of C collection */
D_distribution (); /* start C send to D */
synchronise (); /* End of D_distribution */
D_update (); /* Start D_update */
synchronise (); /* End of D_update */
D_collection (); /* start D collection */
synchronise (); /* End of D collection */
if (process == 0)
{ CMMD_node_timer_stop (0);
printf ("Parallel: %f secs.\n", CMMD_node_timer_elapsed (0));
}
for (now = 1; now <= events_2; now++)
if (event_2[now].kind == AUDIT)
printf ("At time %d, %d books are in stock, and %d on loan.\n",
event_2[now].time, event_2[now].title, event_2[now].user);
}
Fig. 5.1b: Parallel Implementation (Part 11) After the final `D collection' phase, each process reports the results of the `Audit' transactions it has collected. To verify the correctness of the program, these reports were compared with those produced by the benchmark. Fig. 5.1c shows the low-level routines associated with message passing. The function `send_event' sends a message containing an event record. It has three parameters: the identifier of the receiving process, the `reader' function in the receiving process that will accept the message, and the event to be sent. First, it increments the number of messages that have been sent, then it calls the CM-5's `CMAML_request' library function, which sends a short message of at most four words. (It is fortunate that an event contains only four fields, or message passing would be more complex.) When the message is received, the current activity of the receiving process is interrupted, and the message is passed as a set of parameters to the specified `reader' function. The reader function itself cannot be interrupted. Every reader function must call `acknowledge', which echoes the original message back to the sender. The `received' function in the sender then increments the number of messages received. (The reader functions will be described later.) The `synchronise' function is used to bring all the processors into step, clearing the message switching network. When a message is sent, it is immediately placed in an output buffer. The buffer may then take time to clear before the message is transmitted over the network. When it arrives at the receiver, the message is placed in an input buffer until its reader function can be activated. `Synchronise' begins by waiting until all pending messages in the process's output buffer have been sent. It then waits until the process's own input buffer is clear, meaning that all received messages have been read. Unfortunately, these two tests are not enough to ensure that the message switching network is also clear of traffic. This is tested by waiting until all messages that have been sent have been acknowledged. Finally, the `CMMD_sync_with_nodes' library function waits until all the processors have reached a similar state. By the time `synchronise' completes, every processor has had all its messages acknowledged, so there can be no messages in the network. (Several alternative means were tried to check when all messages were received, because echoing each message is costly. Unfortunately, it is not possible in advance for a receiving process to know how many messages it should receive. It could learn retrospectively, for example, by each sender passing it a termination message specifying the total number of messages it has sent to it. However, there would need to be such a message for each sender-receiver pair--but the number of termination messages needed would equal the square of the number of processors, so that such a scheme would not scale properly to a large number of processors. No reliable alternative was found that was any faster that of Fig. 5.1c.)
void send_event (receiver, reader, e)
int receiver; void reader (); struct event *e;
{ sends++;
CMAML_request (receiver, reader,
e->time, e->kind, e->title, e->user);
}
void received (time, kind, title, user)
int time, kind, title, user;
{ receives++;
}
void acknowledge (sender, time, kind, title, user)
int sender, time, kind, title, user;
{ CMAML_reply (sender, received, time, kind, title, user);
}
void synchronise ()
{ while (CMMD_all_sends_done () == 0);
while (CMMD_all_msgs_done () == 0);
while (sends > receives);
CMMD_sync_with_nodes ();
}
Fig. 5.1c: Parallel Implementation (Part 2) Fig. 5.1d shows the details of generating the test data. The `initialise' function generates an `audit' transaction whenever the serial number of the event is exactly divisible by `audit_gap'. Otherwise, a `buy', `borrow' or `reshelve' transaction is generated with equal probability. Where an transaction requires a `title' or `user' parameter, values are chosen randomly and uniformly. The test transactions are stored in the `event_0' array. Each `reshelve' transaction that is generated also increments the `D' array for the `user', so that its value cannot become negative during updating. Similarly, one-half of the `borrow' transactions increment the `C' array, so that some `borrows' will prove acceptable, and others will not. The `initialise' function is executed on all the processors to ensure all the copies of the arrays are identically initialised.
int at_most (n) int n;
{ return ((seed = seed * 4093 % 524261) % n + 1);
}
void initialise ()
{ int now, title, user;
for (title = 0; title <= n_titles; title++) C[title] = 0;
for (user = 0; user <= n_users; user++) D[user] = 0;
seed = 1; events_1 = 0; events_2 = 0;
for (now = 1; now <= n_events; now++)
{ event_0[now].time = now;
if (now % audit_gap == 0)
{ event_0 [now].kind = AUDIT;
event_0 [now].title = 0;
event_0 [now].user = 0;
}
else
{ event_0 [now].kind = at_most(RESHELVE);
event_0 [now].title = at_most(n_titles);
event_0 [now].user = at_most(n_users);
if (event_0 [now].kind == RESHELVE)
D[event_0 [now].user]++;
if (event_0 [now].kind == BORROW && event_0[now].user % 2)
C[event_0 [now].title]++;
}
}
}
void remap ()
{ int i;
max_title = max_user = events_0 = 0;
for (i = process; i <= n_titles; i+=NPROCS)
{ max_title++; C[max_title] = C[i]; };
for (i = process; i <= n_users; i+=NPROCS)
{ max_user++; D[max_user] = D[i]; };
for (i = process; i <= n_events; i+=NPROCS)
if (i != 0)
{ events_0++;
event_0[events_0].kind = event_0[i].kind;
event_0[events_0].time = event_0[i].time;
event_0[events_0].title = event_0[i].title;
event_0[events_0].user = event_0[i].user;
}
}
Fig. 5.1d: Parallel Implementation (Part 4) Once the test data have been set up, each processor calls `remap' to compact the elements it accesses into contiguous locations. (This is needed to allow the loops of `audit' transactions to run as quickly as they do in the benchmark.)
void receive_C (time, kind, title, user) int time, kind, title, user;
{ events_1++;
event_1[events_1].time = time; event_1[events_1].kind = kind;
event_1[events_1].user = user; event_1[events_1].title = title;
acknowledge(time & WHICH, time, kind, title, user);
}
void C_distribution()
{ int now, receiver;
for (now = 1; now <= events_0; now++)
if (event_0[now].kind != AUDIT)
send_event(event_0[now].title & WHICH,receive_C,&event_0[now]);
else
for (receiver = 0; receiver < NPROCS; receiver++)
send_event (receiver, receive_C, &event_0[now]);
}
Fig. 5.1e: Parallel Implementation (Part 5) Fig. 5.1e shows the `C_distribution' function that causes each processor to send each update transaction in its own `event_0' array to the `C' update process determined by the `title' field. In contrast, `audit' transactions are broadcast to every update process. The `receive_C' function in the receiving process appends the transactions it receives to its own `event_1' array. (`Receive_C' is one of the `reader' functions needed by `send_event'.)
void C_update ()
{ int title, now, stock;
sort (event_1, 1, events_1);
for (now = 1; now <= events_1; now++)
{ title = (event_1[now].title >> WHERE) + 1;
switch (event_1[now].kind)
{ case BUY: C[title] = C[title] + 1; break;
case BORROW:
if (C[title] > 0) C[title] = C[title] - 1;
else event_1[now].kind = DONE;
break;
case RESHELVE: C[title] = C[title] + 1; break;
case AUDIT:
stock = 0;
for (title = 1; title <= max_title; title++)
stock = stock + C[title];
event_1[now].title = stock;
break;
default: break;
}
}
}
Fig. 5.1f: Parallel Implementation (Part 6) Fig. 5.1f shows the `C update' phase, which is a parallel form of the `C_update' process of Fig. 3.2. Because an unsorted list results from merging the message streams from each distributor, `C_update' begins by sorting its transactions into time order. Each `C_update' process then steps through its batch of transactions calling the proper transaction procedures, adjusting the values of `title' to map them to the correct array elements. (There is a programming short-cut here: the `title' field of an `audit' transaction is used to accumulate the stock of the title. Several such short-cuts were used because the low-level message passing functions of the CM-5 support only four-word messages. This is just enough to hold the four fields of an event. When any other information has to be transmitted, it is done by using a spare event field. As a result, the names of the event fields sometimes disguise their real contents. Sorry!) Fig. 5.1g shows the `C collection' phase. At its start, each `audit' transaction contains a partial sum in its `title' field. If a process holding a partial sum is not the transaction's collector, `C_collection' sends the sum to the collecting process. (It also overwrites the `kind' of the event with the identifier of the sending process, so that the receiver will know which sender to acknowledge. After acknowledging the message, its `kind' is set to zero to prevent it from being distributed to a `D' process.) The `receive_C_collection' reader function in the collecting process adds the partial sums as they are received, so that at the end of the phase the collectors contain the whole totals. To do this, the reader function has to search for the `audit' event record in the sorted transaction list.
void receive_C_collection (time, kind, title, user)
int time, kind, title, user;
{ int tmp;
tmp = find_time (event_1, events_1, time);
event_1[tmp].title += title;
acknowledge(kind, time, kind, title, user);
}
void C_collection ()
{ int now, collector, time; struct event tmp;
for (now = 1; now <= events_1; now++)
if (event_1[now].kind == AUDIT)
{ collector = event_1[now].time & WHICH;
if (process != collector)
{ event_1[now].kind = process;
send_event (collector, receive_C_collection, &event_1[now]);
event_1[now].kind = DONE;
}
}
}
Fig. 5.1g: Parallel Implementation (Part 7) Fig. 5.1h implements the `D distribution' phase. `Borrow' and `reshelve' transactions are sent to the process that accesses the `user'. An `audit' transaction is sent by its collector to all 32 `D' processes. Transactions that have had their `kind' set to zero are ignored. The `receive_D' reader function in the receiving process appends each incoming transaction to its `event_2' array.
void receive_D (time, kind, title, user)
int time, kind, title, user;
{ events_2++;
event_2[events_2].time = time; event_2[events_2].kind = kind;
event_2[events_2].user = user; event_2[events_2].title = title;
if (kind == AUDIT)
{ event_2[events_2].title = user;
event_2[events_2].user = 0;
}
acknowledge (title, time, kind, title, user);
}
void D_distribution ()
{ int now, receiver;
for (now = 1; now <= events_1; now++)
{ if (event_1[now].kind != DONE)
{ if (event_1[now].kind == AUDIT)
{ event_1[now].user = event_1[now].title;
event_1[now].title = process;
for (receiver = 0; receiver < NPROCS; receiver++)
send_event (receiver, receive_D, &event_1[now]);
}
else
{ event_1[now].title = process;
send_event (event_1[now].user & WHICH, receive_D,
&event_1[now]);
}
}
}
}
Fig. 5.1h: Parallel Implementation (Part 8) Fig. 5.1i shows the functions associated with the `D update' phase, which implements the procedure of Fig. 3.3 in parallel. It is analogous to the `C update' phase.
void D_update ()
{ int user, now, loans;
sort (event_2, 1, events_2);
for (now=1; now <= events_2; now++)
{ user = (event_2[now].user >> WHERE) + 1;
switch (event_2[now].kind)
{ case BORROW: D[user] = D[user] + 1; break;
case RESHELVE: D[user] = D[user] - 1; break;
case AUDIT:
loans = 0;
for (user = 1; user <= max_user; user++)
loans = loans + D[user];
event_2[now].user = loans;
break;
default: break;
}
}
}
Fig. 5.1i: Parallel Implementation (Part 9) Fig. 5.1j shows the `D collection' phase. Its logic is similar to the `C collection' phase.
void receive_D_collection (time, kind, title, user)
int time, kind, title, user;
{ int tmp;
tmp = find_time (event_2, events_2, time);
event_2[tmp].user += user;
acknowledge (kind, time, kind, title, user);
}
void D_collection ()
{ int now, collector, time; struct event tmp;
for (now = 1; now <= events_2; now++)
if (event_2[now].kind == AUDIT)
{ collector = event_2[now].time & WHICH;
if (process != collector)
{ event_2[now].kind = process;
send_event (collector, receive_D_collection, &event_2[now]);
event_2[now].kind = DONE;
}
}
}
Fig. 5.1j: Parallel Implementation (Part 10)
void swap (e1, e2) struct event *e1, *e2;
{ int time1, kind1, title1, user1;
time1 = e1->time; e1->time = e2->time; e2->time = time1;
kind1 = e1->kind; e1->kind = e2->kind; e2->kind = kind1;
user1 = e1->user; e1->user = e2->user; e2->user = user1;
title1 = e1->title; e1->title = e2->title;e2->title = title1;
}
void sort (e, lb, ub) struct event e[]; int lb, ub;
{ int l, r, pivot;
if (ub - lb == 1) { if (e[lb].time > e[ub].time) swap(&e[lb], &e[ub]); }
else if (ub - lb > 1)
{ pivot = e[(lb + ub)/2].time;
l = lb; r = ub;
while (l <= r)
{ while (e[l].time < pivot) l++;
while (e[r].time > pivot) r--;
if (l <= r) { swap (&e[l], &e[r]); l++; r--; }
};
sort (e, lb, r); sort (e, l, ub);
};
}
int find_time (e, len, time) struct event e[]; int len, time;
{ int lo = 1, hi = len, mid;
while (lo <= hi)
{ mid = (hi + lo) / 2;
if (time < e[mid].time) hi = mid - 1;
else if (time > e[mid].time) lo = mid + 1;
else return (mid);
}
}
Fig. 5.1k: Parallel Implementation (Part 3) Fig. 5.1k shows the utility functions connected with event lists. The `sort' function uses the `Quicksort' algorithm. It causes transactions to be sorted by `time'. When a partial sum message is received in the `C collection' or `D collection' phases, the `find_time' function uses binary search to find the index of the collector's `audit' transaction, so that the partial sum can be added into it. 6. Analysis of the Parallel AlgorithmThe program of the previous section was compared with the single-thread benchmark program over a range of sizes of the `C' and `D' arrays from 100 to 1,000,000 elements, a number of transactions from 1 to 100,000, and a proportion of `Audit' transactions from 0 to 100%. (The benchmark function is shown in Fig. 3.1.) It was given identical test data and timed in the same way as the parallel implementation. Experiment confirmed that the parallel implementation produces identical outputs to the benchmark, proving that it is feasible to update inconsistent, independent databases correctly.`Audit' and update transactions have different characteristics, and are best considered separately. The update transactions are discussed first. The results of the experiments are tabulated in Figs. 6.1 and 6.3. Fig. 6.1 shows times in micro-seconds per update (when no `audit' transactions are present), and Fig. 6.3 shows times in micro-seconds per `audit' per element. (Although mixtures of `audit' and update transactions were tested, the results are not shown. They can be closely approximated by interpolation.)
Number of Updates
Array Size
1 10 100 1,000 10,000 100,000
100 27.0 6.4 4.2 3.9 4.3 4.3
1,389.0 155.6 19.0 5.0 4.1 4.1
1,000 23.0 6.4 4.8 4.0 4.6 4.6
1,392.0 150.4 18.7 4.6 3.3 3.4
10,000 31.0 9.0 5.6 4.7 5.0 5.2
1,444.0 165.2 20.3 4.6 3.3 3.3
100,000 29.0 9.4 7.3 6.9 7.2 7.4
1,521.0 173.2 20.7 4.9 3.3 3.2
1,000,000 36.0 9.9 8.1 7.7 8.2 8.2
1,493.0 173.7 20.8 4.9 3.3 3.3
Fig. 6.1: Elapsed Time per Update (uS) Fig. 6.1 shows the values for different numbers of transactions (horizontally) and numbers of `C' and `D' array elements (vertically). (The `C' and `D' arrays were always the same size.) Each cell shows two values: the upper value is the time for the benchmark of Fig. 3.1, the lower figure is for the parallel implementation of Figs. 5.1a-k. The benchmakr program was executed on a single processor of the CM-5, so that the times directly measure the speed-up due to parallelism. The CM-5 timer measures only the elapsed time when the program is able to run (i.e., not swapped out, say), and the times shown are reproducible to about 1% accuracy. However, only one set of test data was used, so sampling effects caused some inconsistencies between experiments. Since the benchmark processes each transaction independently, its execution time should increase linearly with the number of transactions. There should also be a constant overhead independent of the number of transactions. Fig. 6.1 (upper figures) shows that this expectation is met in practice, the overhead being about 25uS. However, the cost per transaction depends on the size of the `C' and `D' arrays, rising from about 4.2uS to about 8.2uS per transaction as their sizes are increased, with most of the rise occurring for array sizes between 10,000 and 100,000 elements. It is believed that this effect is due to memory caches within the CM-5 processors, whose effective size seems to be about 32K words. The results for 1,000 transactions seem anomalously fast; the only reasonable explanation is that the test data especially favoured this case. The parallel algorithm should be expected to display a much larger overhead. First, updates require one or two messages to be sent between processors, each of which is acknowledged by echoing. There are about 3 messages per update on average. Second, the transaction lists must be sorted, and the cost per transaction should grow with the logarithm of the number of transactions. Third, the loads on each processor will not be exactly equal, an extreme case being when there is only 1 transaction. However, Fig. 6.1 (lower figures) shows that the caching effect has disappeared, presumably because even when there are 1,000,000 array elements, each processor stores only 31,250 of them. The cost per transaction is about 3.3uS over 32 processors--about 100uS on one processor, suggesting a latency of about 30uS per message. For very small array sizes the time per update rises because the load on each processor is not well balanced. There is no increase in the time per update for long event lists, so the cost of sorting must be negligible. The fixed overhead appears to be about 1,300uS, apparently due to globally synchronising the processors between phases of the algorithm. (The consistency of the results seems poorest in the first column of Fig. 6.1, for the case of one update. The particular update in each case differed only in the values of `title' and `user'. The measured time is dominated by the time taken to synchronise the processors between phases.) Fig. 6.2 compares the parallel and benchmark algorithms graphically. The curve `P6' shows the time per update for the parallel algorithm, when the `C' and `D' arrays each contained 1,000,000 elements. `P2' shows the time per update for the parallel algorithm, when the `C' and `D' arrays each contained 100 elements. Curves `B6' and `B2' show the corresponding times for the benchmark algorithm. It will be seen that about 1,000 updates are needed for the parallel algorithm to amortise its overhead sufficiently to break even with the benchmark.
Fig. 6.2: Parallel Updating v. Benchmark The elapsed times for `audit' transactions are tabulated in Fig. 6.3. Because `audit' transactions examine every element of the `C' and `D' arrays, their execution times should increase linearly with the sizes of the arrays. Therefore Fig. 6.3 shows the elapsed times per array element, per transaction. The upper values are the times for the benchmark, the lower values are the times for the parallel algorithm. The empty cells correspond to experiments where the number of `audits' times the number of array elements would exceed 10,000,000. The smallest of the missing results would take over 40 minutes to measure.
Number of Audits
Array Size
1 10 100 1,000 10,000 100,000
100 2.780 2.262 2.235 2.241 2.237 2.235
29.390 6.028 2.576 2.213 2.414 2.743
1,000 2.442 2.300 2.291 2.293 2.294
3.005 0.688 0.326 0.287 0.302
10,000 2.407 2.390 2.396 2.394
0.371 0.131 0.094 0.090
100,000 2.404 2.406 2.408
0.106 0.076 0.071
1,000,000 2.409 2.407
0.079 0.076
Fig. 6.3: Elapsed Time per Audit per Element (uS) For a sufficiently large number of array elements, the times per element per transaction tend to constant values, which are the times for one iteration of each loop within the `audit' transaction--about 2.4uS for the single-processor benchmark and 0.075uS for the parallel algorithm. The parallel program is about 32 times faster because its work is divided between 32 processors. However, the parallel program does not enjoy such a big advantage when the arrays are small, even when the total number of iterations of the loops is the same. (Consider a diagonal stripe from top right to bottom left in Fig. 6.3.) `Audit' transactions have a much higher message passing cost than updates; each sends a total of 126 messages, and their acknowledgments make this 252 messages in all. The measured latency appears to be about 220uS over 32 processors, again about 30us per message. Since `audit' transactions are broadcast to every processor, their event lists are 32 times longer than those for update transactions, and there is some evidence that sorting them increases the cost per transaction by about 0.3uS for every factor of 10 increase in the number of transactions. For small numbers of transactions also there is an increase in cost, because although most aspects of processing `audit' transactions are well balanced, the loads on the collector processors aren't.
Fig. 6.4: Parallel Updating v. Benchmark Fig. 6.4 compares the parallel and benchmark algorithms graphically. Curve `P' shows the times per `audit' per element for the parallel algorithm, for a batch of 10 transactions, as the number of array elements is varied from 100 to 1,000,000. Curve `B' shows the comparable curve for the benchmark algorithm. Finally, Fig. 6.5 shows the speed-ups due to parallel processing. (A `speed up' of less than 1.0 is actually a `slow down'.) The upper figure in each cell is the speed up for update transactions and the lower figure is the speed up for `audit' transactions. The best speed up is a factor of 33.7 for 100 `audits' and 100,000 array elements. (Presumably, this exceeds 32 because the benchmark function is slowed by caching effects.) There is every reason to believe that this speed up would be bettered for greater numbers of transactions and array elements. The worst speed up is the 50 times slow down for a single update transaction.
Number of Transactions
Array Size
1 10 100 1,000 10,000 100,000
100 0.019 0.041 0.221 0.783 1.046 1.029
0.094 0.375 0.867 1.017 0.923 0.817
1,000 0.017 0.043 0.255 0.865 1.369 1.352
0.813 3.341 7.021 8.003 7.589
10,000 0.021 0.054 0.277 1.034 1.515 1.576
6.487 18.248 25.611 26.551
100,000 0.019 0.054 0.351 1.408 2.182 2.313
22.694 31.761 33.717
1,000,000 0.024 0.057 0.389 1.571 2.485 2.485
30.671 31.889
Fig. 6.5: Speed Up Due to Parallelism The speed-up for updates depends more strongly on the number of transactions than the sizes of the arrays. The break even between the benchmark and the parallel processes is around 1,000 updates. The time for the parallel version is dominated by message latency, about 100uS in all, but only 3.3uS when shared across 32 processors. This is marginally less (typically by about 1.3uS) than the time taken by a single processor. The break even occurs when this saving outweighs the fixed overhead of 1,300uS, i.e., at about 1,000 updates. (If the CM-5 used in the experiments had had 64 processors rather than 32, the parallel execution times for large numbers of updates would have been halved, the time saving would have been about 3uS, and the break even would have come much earlier, at about 45 updates.) The speed-up for `audits' depends more on the sizes of the arrays than on the number of transactions. The break even lies somewhere near 100 elements. Here, the saving per transaction per element is about 2.3uS, and the overhead is about 220uS per transaction. Although mixtures of update and `audit' transactions have not been tabulated here, the time taken to process them can be estimated fairly accurately by interpolation. For example, a batch containing 999 updates and 1 audit gives, for 10,000 array elements, an estimated time of (9994.7uS+10,0002.407)uS = (4,695+24,070)uS = 28,765uS for the benchmark, and (9994.6uS+10,0000.371)uS = (4,595+3,710)uS = 8,305uS for the parallel implementation. These estimates compare with experimentally measured times of 28,287us and 6,882uS respectively. Note too that, for this example, the single audit takes roughly as long as the 999 updates. In the experiment, `titles' and `users' were uniformly distributed across the database. In practice, conditions may be less favourable. Because of the `synchronise' function, the process time for each phase will be determined by the longest tme of any process. This problem could be reduced by removing the synchronisations. This would greatly complicate the program, which would need to be able to deal successfully with asynchronous interrupts caused by messages arriving from senders that had reached different phases. Second, the input transactions were initially distributed evenly over the 32 processors. This simulated a situation where messages have arrived from a large number of sources direct to each processor--something that the CM-5 architecture does not support. What would happen if the transactions had been presented to the system as a single queue? It is not hard to predict. In terms of the architecture of Fig. 4.1, there would now be a single `C distribution' process, which would have to distribute all the incoming messages to the `C update' processes. The time to send and acknowledge each transaction would be about 60uS, and this work could not be shared. Alone, this would increase the time per update by a factor of 10-20, and in no case could the parallel algorithm offer a speed up over the benchmark, even with an unlimited number of processors. The overhead for an `audit' is the same, but it can be amortised over many array elements, so that a good speed up is still possible given enough titles or users. Since caching effects were significant, is it possible to improve the cache hit rate by accessing the arrays sequentially (after sorting the transactions into `title' or `user' order), leading to greater speed? The short answer is `No'. The snag is that to correctly interleave the updates and `audits', the read loops in the `C update' and `D update' phases have to be more intricate. This extra complexity easily outweighs any advantage that results from more orderly access to the arrays. 7. Persistent DatabasesA defect of the experimental situation is that a real-world transaction processing application is likely to need a persistent database, so that the arrays should be either stored or shadowed in disk files. This would always favour a parallel implementation, because typical disk access times (about 20mS) easily exceed the latency of message passing (about 30uS). Unfortunately, the CM-5 handles disk input-output centrally, passing all data through the message network. Disk input-output becomes a bottleneck that makes parallelism irrelevant. Only if the CM-5 had disk drives attached to each processor, would a good speed up be obtainable. Clearly, the CM-5 architecture is not suited to this situation, but a suitable test machine was not available. However, it was possible to simulate disks attached to each processor by time delays.Although with current technology, one would expect access times of the order of 20mS, the experimentally measured time per disk access was about 2mS, irrespective of whether the access mode was random or sequential. Such a low average access time resulted because files were stored on a disk server with a very large internal cache. Most accesses involved only the cache. The server was connected to the CM-5 by an EtherNet network. So 2mS is really a measure of the latency of the network. Given that an average update needs about 3 disk accesses, the resulting 6mS overhead is enough to completely swamp the 100uS overhead of message passing. As might be expected, the speed up from using 32 processors was close to 32 under a wide range of conditions--a throughput of about 5,000 transactions per second. Despite the random and sequential access times being equal, sequential access still has the advantage that no record is inspected or updated more than once. Given enough transactions in a batch--especially `audits'--the speed-up due to sequential access could made be as great as desired. For example, if each record is inspected by an average of 3 transactions, sequential access is 3 times faster than random access. Combined the speed up due to parallellism, sequential access resulted in simulated speed-ups exceeding 100. Another possibility relevant to high-volume transaction processing is to use a main memory database [3]. This technique stores the complete database in main memory, but also records a persistent copy on disk. This means that no input-output is needed for queries, but that updates must be sent to a log. Log records may be buffered in a small non-volatile memory, so that logging can occur lazily, a disk access being required only when a buffer is full. This is a very efficient option, which becomes more attractive if each processor stores a partition of the database in its own persistent disk store. It is difficult to estimate the performance of this configuration, because its speed ultimately depends on how often the log buffer needs to be emptied. Pessimistically, we may assume that a log record is written after each update--an average of 1.5 disk accesses, or 30mS per update. Spread over 32 processors, the time per update would be about 1mS, or 1,000 transactions per second. Since one log record per disk access is very pessimistic, it should be possible for 32 processors to easily exceed a rate of 1,000 transactions per second. Since an `audit' query requires no disk access, its execution time is given by Fig. 6.3. For 1,000,000 records this is nearly 80mS--which is comparatively slow. Sequential access and parallel processing are independent ways of improving performance, and can be used separately or in combination. But both hinge on the structure of the system specification in the same way. Neither sequential access nor partitioned parallel access is possible unless accesses to the `C' and `D' arrays can be separated. But once that separation is made, all their elements can be processed independently. Suppose the separation of the `C' and `D' array accesses had not been made. Would some other form of parallel processing be possible? Consider first the possibility that the processes are parallel with respect to the `C' array, but must therefore access the `D' array randomly. Then, there is no guarantee that the history of a `D' element will be correct. It is possible that it could be updated by a message sent from a fast running `C' process before a logically earlier update message arrived from a slower running `C' process. A similar objection applies to making the processes parallel with respect to the `D' array. A third possibility is for each processor to process the transactions initially allocated to it. This is open to both objections. Of course, correct results could still be obtained if some protocol were used to ensure that updates occurred in the correct order. There are many ways this could be done. The most familiar is the two-phase locking protocol used by many database systems. Suppose the arrays were partitioned between the processors as before. Using the two-phase locking protocol, each processor would process the transactions originally allocated to it, locking the `C' and `D' array elements it needed to access. Locking an element would cause any other update that wanted to access the same element to wait until the lock was released. This would be done at the completion of each update procedure. (By each transaction always locking `C' elements before `D' elements, we may guarantee that this particular system will not deadlock.) On its own, locking does not guarantee correctness, because a later transaction may cause the procedure for an earlier transaction to wait if they happen to access the same element. However, the system satisfies the serialisability criterion: the state of the system is consistent with some order of transactions, even if the particular order cannot be predicted [4]. Even after relaxing the correctness requirement in this way, the system would be no faster than the algorithm given here, because each access to a `C' or `D' element would require a message to be sent from the client process to the server controlling access to the element, and back again. There would be no less message traffic, and indeed, there could easily be more: a message to read the value of the element, and a second message to update it. (The alternative, to package reading and updating within a single procedure, saves a message, but it is essentially the solution presented here.) In addition, one must remember the cost of implementing record locking, which must be done by the server processors. If a read message is not followed by an update message, it must be followed by an unlock message instead. `Audit' transactions would complicate the issue further. It would no longer be possible to update the database during an `audit'. `Borrow' and `reshelve' transactions cause changes to both the `C' and `D' arrays. An `borrow' occurring when an `audit' had completed counting the stock but not yet counted the loans would be reflected in the loans total, but not in the stock total causing the total number of books to be over-estimated. It is therefore necessary for an `audit' transaction to lock the entire database. This would effectively divide execution into parallel update phases and single-thread query phases. In effect, the updates between each query would effectively constitute a batch. If 1% of all transactions were `audits', the average batch would contain 100 transactions. As Fig. 6.1 shows, small batches lead to less than optimal performance. So far, we have only considered the possibility that the database is partitioned between the processors. An alternative is for it to be replicated. Suppose each processor has a copy of the whole of the `C' and `D' arrays. This certainly simplifies the `audit' transactions, which require no message passing at all. But now each update must broadcast every change to a `C' or `D' element to every processor. This makes the processing of updates much slower. (There is an additional problem in keeping the copies of the database consistent. The updates must be made to each copy in a consistent order.) However, if the ratio of queries to updates is high, this is clearly an optimal arrangement. Conversely, when the ratio of queries to updates is low, a partitioned database is better. For intermediate ratios, it is possible for a mixture of replication and partitioning to work best; for example, there could be 4 copies of the database, each distributed over 8 processors. 8. ConclusionsThe report has shown how a useful speed-up can be obtained by using parallel processors to implement batch processing. There is no easy way to estimate a break-even point between parallel and single-thread implementations without detailed knowledge of the performance of the computer concerned, in particular, the ratio of the latency of message passing to the time taken to execute an transaction procedure. For transactions involving one or a few records, parallel processing pays off roughly when the message latency divided by the number of processors is less than the processing time for one transaction. This condition is easily met for transactions that update a disk store. For global queries or updates, parallel processing pays off whenever the database is big enough. Given reasonably big arrays, parallel processing should be faster for any batch that contains at least one global transaction.The report has also demonstrated that it is possible to avoid database contention problems by exploiting independence and separability, which are properties that may--or may not-- be inherent in a system specification. Avoiding contention simplifies and speeds the implementation, because it is not necessary to lock the database or provide mechanisms for transaction roll-back. Independence and separability allow the state of a database to be inconsistent, without sacrificing the ability to make global queries that inspect a consistent state. Although the report has discussed a single example, the same approach could be generalised to any system specification that has independence and separability properties. Briefly, this requires that accesses to arrays or file attributes can be pipe-lined, with transaction data flowing in a single direction along the pipe line. An earlier report by the author [2] explains how to discover whether this is possible for a given specification. Fortunately, the conditions are often (but not universally) met in practice. This good fortune usually results from business practices that have evolved around similar pipe-lined architectures, first with manual systems, and later, with computerised batch processing systems. The reader may question whether the techniqes have any practical application. There are two possibilites: a main memory database, or a very large disk database distributed across many processors. Using the CM-5 as a guide, each processor could support a main memory database of nearly 5MB, or about 150MB over 32 processors. A global query would take only 0.15 seconds, and the system could easily handle 1,000 updates per second. If the same database were stored on disk, a global query would take about 10 seconds, and the system could handle about 500 transactions per second. Such systems could collect transactions over a period of 1 or 2 seconds to form efficient batches. Possible applications include message switching networks (including telephony), air-traffic control, large-scale banking and booking systems, and military defence systems. 9. References1. Baecker, Readings in Groupware & Computer-Supported Cooperative Work: Assisting Human-Human Collaboration, Morgan Kaufmann, 1994.2. B. Dwyer, "The Data Dependency Method of Systems Design", Computer Science Technical Report 92-05, University of Adelaide, 1992. 3. M. H. Eich, "Main Memory Databases: Current and Future Research Issues", IEEE Transactions on Knowledge and Data Engineering, Vol. 4, No. 6, Dec 1992, pp. 506-508. 4. K.P. Eswaran, J. Gray, R. Lorie & I.L. Traiger, "The Notions of Consistency and Predicate Locks in a Database System", Communications of the ACM, Vol. 19, No. 11, pp. 624-633. 5. J. Gray & A. Reuter, Transaction Processing: Concepts and Techniques, Morgan Kaufmann, 1992. 6. T. Haerder & A. Reuter, "Principles of Transaction-Oriented Database Recovery", Computing Surveys, Vol. 15, No. 4, Dec. 1983 pp. 287-317. This page has been accessed
|