esProc Parallel Computing: Memory Cluster

Course 1151 0

Through esProc Parallel Computing: Data Redundancy, we know that for cluster computation data files can be stored in separate servers, with data redundancy solution adopted. Thus the parallel program will find the appropriate node for a subtask according to where the needed files are stored. The fact is, during real world cases, some data files may be frequently accessed. But performance will be compromised if they need to be loaded by the server for each use. To cope with this, esProc offers the scheme of storing those data files in each server’s memory during the cluster computation, thereby forming a memory cluster.

1. Memory partitions and shared resource

In the memory of every server performing the cluster computation, you can place the resources, or the shared data, which will be accessed repeatedly. In order to better sort out the shared resources, they will be stored in multiple memory partitions on each server. You can create memory partitions with specified names on each node using zone(z) function; and retrieve the shared resource r from partition z, or assign value to r, with prc(r:z,x).

Take the following file testZone.dfx as an example:

  A B
1 =file(“StockRecord”+string(arg1)+”.txt”)  
2 if !(A1.exists()) end “File of year “+string(arg1)+” is not found!”
3 =A1.cursor@t() [124051,128857,131893,136760,139951,145380]
4 =A3.select(B3.pos(SID)>0).fetch() =A4.groups(SID;count(~):Count,max(Closing):Maximum, min(Closing):Minimum,sum(Closing):Sum)
5 =zone(“z1”) =prc(Total:”z1″)
6 >prc(Total:”z1″,B5+B4.sum(Count)) >output(“Total records:”+string(prc(Total:”z1”)))
7 return B4  

The cellset file computes the transaction data of 6 stocks, with each subtask handling data of a certain year. The stock data of each year is stored in a separate file – StockRecord2010.txt, for instance, contains the data of 2010, and each server holds data files of different years. A subtask computes the transaction data of stocks with specified codes, including the total trading days, the highest and the lowest closing price, and the average closing price. Here we calculate the average simply according to trading days, without taking into consideration the number of transactions. In the cellset, arg1 is a parameter for passing into the year. 

esProc_parallel_memory_2

A similar example has been cited by esProc Parallel Computing: Cluster Computing. But here we’ll use it in a slightly different way. After the statistical computation is finished, each server will have count the transactions involved. A5 defines a memory partition – z1- with zone function; B5 retrieves the parameter Total from z1; and A6 adds the count to Total. B6 outputs the value of Total to the console using the output function.

The parallel computation uses 3 servers, whose IPs and port numbers are as follows: Server A (192.168.0.86:4001),Server B (192.168.0.66:4001) and Server C (192.168.0.86:4004). The numbers of parallel tasks, callxTask, set for the 3 servers at runtime are different – 2, 1 and 4 respectively; while the number of parallel machines, nodeTask, is 4.

The main program performs the parallel computation with callx function:

  A
1 [192.168.0.86:4001,192.168.0.66:4001,192.168.0.86:4004]
2 [2010,2011,2012,2013,2014]
3 =callx(“testZone.dfx”,A2;A1)
4 =A3.merge(SID).groups(SID;sum(Count):Count,max(Maximum):Maximum,min(Minimum):Minimum,sum(Sum):Sum)
5 =A4.new(SID,Count,Maximum,Minimum,round(Sum/Count,2):Average)

On the server’s system information output window, you know how the task is distributed and executed:

esProc_parallel_memory_4

esProc_parallel_memory_5

esProc_parallel_memory_6

During the cluster computation, callx doesn’t use @a option to designate servers for subtasks, which thus will be distributed in order. If a subtask can’t find the data files it needs in a certain server, it will be redistributed. Here let’s take a special look at the Total value. According to the output information, Server A processes the data of 2010 and 2013, with the Total values being 1506 and 3018; Server B processes the data of 2012, with a Total value of 1500; Server C processes the data of 2011 and 2014, with Total values being 1512 and 3024. We also notice that the servers get their own data independently and unmistakably, even though they use the same memory partition and parameter. A server may have more than one subtask run on it, but the real-time lock mode makes sure that data in the memory partition can be accessed separately without clashing.

Before the server is stopped, the data stored in the memory partitions won’t be deleted unless it is get re-assigned. In the above example, The Total value recorded by each server will continue to accumulate if the main program is executed again.

In addition to using shared resources in the memory partitions as the common parameter to compute the total running time and the number of subtasks being performed, we could also put table sequences or other data files that need to be accessed repeatedly in the memory partitions to get rid of the unnecessary repetitive retrievals.

Here’s an example that requires using parallel processing to count employees from each state in every department, while listing the states in abbreviations. Since the EMPLOYEE table holds only the full names of the states, the abbreviations and other information of the states need to be obtained from the STATES table. We can use the following dfx file to perform the parallel computation:

  A
1 =connect(“demo”)
2 =A1.query(“select ABBR,NAME from STATES”)
3 =A1.query(“select * from EMPLOYEE where DEPT=?”, dept)
4 =A3.switch(STATE,A2:NAME)
5 =A4.group(STATE)
6 >A1.close()
7 return A5.new(STATE.ABBR:State,dept:Dept,~.count():Count)

A cellset parameter – dept – is used in this cellset file to represent the department name:

esProc_parallel_memory_8

Then the program retrieves from the database the records of the states and those of the employees in the current department, groups the employee records and returns the desired table sequence. The following, for instance, is the table sequence of Finance department:

esProc_parallel_memory_9

The first thing during the computation is to retrieve the information of the sates and employees from the database. And we notice that for each subtask the table sequence of retrieved state information is the same. In this case the database data retrievals are repetitive. So we would store the state information in each server’s memory partitions as shared resource.

To do this we’ll execute the dfx file above in two steps. Load_State.dfx will store the state information as the shared resource:

  A
1 =connect(“demo”)
2 =A1.query(“select ABBR,NAME from STATES”)
3 =zone(“z1”)
4 >prc(States:”z1″,A2)
5 >A1.close()

Since we have created a memory partition z1 in the preceding computation, statement A3 can be omitted.

CalcEmp_Zone.dfx performs the computation and returns a table sequence of statistical result for each department:

  A
1 =connect(“demo”)
2 =A1.query(“select * from EMPLOYEE where DEPT=?”, dept)
3 =prc(States:”z1″)
4 =A2.switch(STATE,A3:NAME)
5 =A4.group(STATE)
6 >A1.close()
7 return A5.new(STATE.ABBR:State,dept:Dept,~.count():Count)

This cellset file uses a cellset parameter dept to specify the department name and the servers in the preceding file – Server A (192.168.0.86:4001) and Server B (192.168.0.66:4001) – to carry out the cluster computation:

  A
1 [192.168.0.86:4001,192.168.0.66:4001]
2 [Finance,Sales,HR,Administration,Marketing,Production,R&D,Technology]
3 =callx(“Load_State.dfx”,to(2);A1)
4 =callx(“CalcEmp_Zone.dfx”,A2;A1)
5 =A4.conj().sort(State)

After the execution, we can check the result in A5:

esProc_parallel_memory_13

Note that as the parallel computation is performed on several parallel servers, and each server has its own memory partitions. So we need to have in mind that the shared resource on a server is only locally available and thus inaccessible from any other server. That’s why parameter to(2) is set for the spatial variable in A3 to assign values to the shared resources in the memory partitions on both the two servers. Then when a server performs the same computational tasks, it can use the shared resource, thereby avoiding the repetitive operations, such as the retrieval of table sequence as with this example. We can also make the database connection object a shared resource to better control the connection to and disconnection from a database.

2. Shared resource stored on separate servers

During cluster computation, the shared resource which each server uses can be different; and in some other cases the data to be loaded is so huge that storing it on a single server may cause serious performance problem and even memory overflow. In these situations, we would break the data table apart to have it loaded by separate servers.

Suppose if we want to find the date on which a stock has its highest closing price in each year and to get the average stock price in each year, we need each stock’s statistical data in every year for the query. To perform the computation, we would place the data in the memory partitions, as the following Load_Stock.dfx does:

  A B
1 =file(“StockRecord”+string(arg1)+”.txt”)  
2 if !(A1.exists()) end “File of year “+string(arg1)+” is not found!”
3 =A1.cursor@t() =A3.groups(SID;count(~):Count,max(Closing):Maximum, sum(Closing):Sum)
4 =B3.new(SID, Maximum, round(Sum/Count,2):Average)  
5 =zone(arg1) >prc(StockInfo:arg1,A4)

If the current server lacks the needed data files, B2 will return the information of their absence and the main program will reassign the subtask. A4 computes the highest closing price and the average price for each stock in the current year. The result is stored in a corresponding memory partition, whose name will be passed in through the parameter arg1:

esProc_parallel_memory_15

Statistical data of different years will be stored in different memory partitions, for instance, the data of 2010 will be placed in a partition named z2010. Different shared resources loaded by servers should be able to distinguish themselves from one another with different names.

After this is done, the computation will be carried out by retrieving data from the memory partitions, as is shown by FindStock.dfx:

  A B
1 =file(“StockRecord”+string(arg1)+”.txt”)  
2 if !(A1.exists()) end “File of year “+string(arg1)+” is not found!”
3 =A1.cursor@t() =prc(StockInfo:”z”+string(arg1))
4 =B3.select@1(SID==arg2) =A3.select(SID==arg2 && Closing==A4.Maximum).fetch()
5 =B4.new(SID;Date,Closing, A4.Average:AnnuaAvg) return A5

Of which arg1 is the year whose data is being handled and arg2 is the stock code:

esProc_parallel_memory_17

When callx function is invoked from the host machine to perform the parallel computation, the program needs to distribute tasks according to the data stored in the memory partitions. For instance, query of the data of 2010 should execute on the sever on which memory partition z2010 exists. esProc supports using hosts(h,zs,dfx) to search the server list h to find the server where each partition in the partition sequence zs has been loaded and return a list of servers. With hosts@i used, if a partition can’t be found, the program will load it by executing the dfx, for example:

  A
1 [192.168.0.86:4001,192.168.0.66:4001,192.168.0.86:4004]
2 [2010,2014]
3 124051
4 =hosts@i(A1,A2.(“z”+string(~)),”Load_Stock.dfx.dfx”)
5 =callx@a(“Find_Stock.dfx”,A2,A3;A4)
6 =A5.conj()

As the table to be split and stored in the memory partitions usually holds a great amount of data, we would not load more than one partition on a single server. So host@i requires that the servers shouldn’t be outnumbered by the partitions. Here we’ll load the data of 2010 and 2014 to separate servers – Server A and Server B to perform the computation. Below is the output information from the servers:

esProc_parallel_memory_19

esProc_parallel_memory_20

You see that there is a corresponding relationship between the loading of shared resource and its access. That explains why A5 uses @a option in performing callx to assign the query of data of a certain year to a server with corresponding partition loaded.

After the computation is finished, A6 gets result as follows:

esProc_parallel_memory_21

Higher efficiency can be achieved with the statistical stock data having been loaded onto the memory partitions on servers, when the stock data needs to be accessed repeatedly and frequently.

FAVOR (0)
Leave a Reply
Cancel
Icon

Hi,You need to fill in the Username and Email!

  • Username (*)
  • Email (*)
  • Website