Cluster computing can be used in esProc for performing complicated analytic and processing tasks. A cluster system consists of multiple parallel servers running on independent computers in a network. Every single computer in this network can send a parallel computing request to the cluster.
A cluster system enhances computational performance significantly by gathering the computational power of multiple servers. It expands its size as needed whenever necessary without paying the high cost associated with the mainframe.
1. Perform cluster computation through callx function
The article esProc Parallel Computing: The Server introduced how to launch parallel servers. After multiple parallel servers are started and a cluster system is created, cluster computation can be performed through callx function. In esProc, every server will carry out the parallel computing tasks by computing the specified dfx file and returning the result.
Cluster computing requires the preparation of the cellset file to be used in performing the subtasks, like CalcStock.dfx shown below:
A | B | |
1 | >output@t(“calc begin, SID=”+string(arg1) +”. “) | =file(“StockRecord.txt”).cursor@t() |
2 | =B1.select(SID==arg1) | =A2.fetch() |
3 | =B2.count() | =B2.max(Closing) |
4 | =B2.min(Closing) | =round(B2.avg(Closing),2) |
5 | >output@t(“calc finish, SID=”+string(arg1) +”. “) | result A3,B3,A4,B4 |
The file StockRecord.txt contains closing prices of some stocks in a certain period of time. Compute transaction information of specified stock codes in each subtask, including the total number of trading days, the highest and lowest closing prices and the average closing price. The average closing price will be simply computed by trading days, without taking the number of transactions into consideration. In order to know the detailed information about the execution of subtasks, output@t function is used at the beginning of the computation and before result returning to output times and prompts. In the cellset file, arg1 is a parameter for passing stock codes in and needs to be set before the parallel computation. Select Set arguments before run to set the parameter:
Parallel computing requires that in computers that run servers the cellset file CalcStock.dfx should exist and be placed in mainPath of every server’s configuration.
This example configures three servers in two computers, whose IPs and port numbers respectively are: Server I (192.168.0.86:4001), Server II (192.168.0.66:4001) and Server III (192.168.0.86:4004). Set different callxTask – number of parallel tasks – 2, 1, and 4 respectively, for the three servers in running. nodeTask – the number of parallel machines – is 4.
Then we can start the parallel computation using callx function in the main program on any computer in the network:
A | |
1 | [192.168.0.86:4001,192.168.0.66:4001,192.168.0.86:4004] |
2 | [124051,128857,131893,136760,139951,145380] |
3 | =callx(“CalcStock.dfx”,A2;A1) |
4 | =A3.new(A2(#):SID,~(1):Count,~(2):Maximum,~(3):Minimum,~(4):Average) |
About callx’s parameters in A3’s expression, the one before the semicolon is used for computing the cellset file. The parallel computing will divide the computational task into multiple subtasks according to the sequence parameter passed through it. The length of the sequence parameter represents the number of subtasks; multiple parameters are separated with commas, in which the single-value parameter will be duplicated to every subtask. callx’s parameter after the semicolon is the sequence of servers, each of which is represented by a string in the format of “IP:port number”, for example “192.168.0.86:4001”. After the computations on the parallel servers are completed, the result will be returned by the return or result statement; multiple results will be returned in the form of a sequence.
In this example, we need to get the data of six stocks. Their codes are 124051,128857,131893,136760,139951,145380, which correspond subtasks a, b, c, d, e, f that will be distributed among the three servers for performing parallel computation. The distribution is conducted according to the order of the sequence of servers specified in the function. If one of the servers with specified IP and port number is available, then give it one subtask; if the server hasn’t been started or the number of parallel subtasks on it reaches its maximum, then move on to the next server on the list. We can read the distribution and execution messages of the subtasks on each of their System Output window:
As can be seen from the output information, Server I is handling subtasks a and d; Server II is handling subtask b; and Server III is handling subtasks c and e. After the first three subtasks a, b and c have been distributed in order among the servers, the second round of distribution begins by giving subtask d to Server I. Subtask e is given to Server III for execution because the callxTask set for Server II is 1 and the server has already been given a task. Server III also receives subtask f as both Server II and Server III are fully loaded.
After the computation is completed, we can see the result in A4:
This example shows that, by splitting apart the data-intensive main computational task and having each server process one of the parts, cluster computing effectively avoids the memory overflow caused by big data processing.
Note that as subtasks are executed separately on each server, they are independent of each other. The dfx file must exist in each server. If data sources or data files are needed in the computation, they should be configured for or stored on the related servers.
Let’s look at another example. Still we’ll get the statistical data of the six stocks, but this time each subtask only gets data of a certain year. The corresponding cellset file CalcStock2.dfx is as follows:
A | B | |
1 | >output@t(“calc begin, year: “+string(arg1)+”. “) | =file(“StockRecord”+string(arg1)+”.txt”) |
2 | if !(B1.exists()) | end “File of year “+string(arg1)+” is not found!” |
3 | =B1.cursor@t() | [124051,128857,131893,136760,139951,145380] |
4 | =A3.select(B3.pos(SID)>0).fetch() | =A4.groups(SID;count(~):Count,max(Closing):Maximum, min(Closing):Minimum,sum(Closing):Sum) |
5 | >output@t(“calc finish, year: “+string(arg1)+”. “) | result B4 |
Stock data of different years are stored in their respective data files. For example the data of 2010 are stored in StockRecord2010.txt. The five data files used for the computation are stored on three servers. Data of 2010 and 2014 are stored on Server I, data of 2011 and 2013 are on Server II and data of 2012 are on Server III. Each subtask computes the transaction data of the six stocks in a specified year. As in the above example, output@t function is used to output times and prompts as the computation begins and before the result is returned. Since each data file is only stored on one certain server, A2 judges if the file exists on the current server. If it is false, the result will be returned by end statement and the subtask needs to be redistributed.
In the cellset file, arg1 is a parameter that passes a specified year whose data are to be computed:
Then perform cluster computation in the main program as follows:
A | |
1 | [192.168.0.86:4001,192.168.0.66:4001,192.168.0.86:4004] |
2 | [2010,2011,2012,2013,2014] |
3 | =callx(“CalcStock2.dfx”,A2;A1) |
4 | =A3.merge(SID).groups(SID;sum(Count):Count,max(Maximum):Maximum,min(Minimum):Minimum,sum(Sum):Sum) |
5 | =A4.new(SID,Count,Maximum,Minimum,round(Sum/Count,2):Average) |
A3 performs cluster computation, A4 merges its results and A5 generates the final result. Here to get the statistics of five years, cluster computing gives five subtasks to three servers. The distribution and execution messages about the subtasks will be displayed on each server’s System Output window:
As can be seen from these output messages, the first three subtasks – 2010, 2011 and 2012 – are distributed to the servers in order, and they execute normally. Then the subtask of 2013 is distributed to Server I. The subtask of 2014 is given to Server III as the number of parallel subtasks on Server II has reached the maximum. But as Server I hasn’t the data file of 2013 and Server III hasn’t the data file of 2014, both subtasks of 2013 and 2014 cannot find the files they need for computation and throw exceptions through end statement; then the two subtasks are redistributed by the main program. At this time the computation on Server II hasn’t completed, so subtask of 2013 is distributed to Server III and subtask of 2014 is distributed to Server I. The latter executes normally while the former throws exception again. The redistribution of subtask of 2013 suspends due to the non-availability of both Server I and Server III; it has to stay waiting until Server II accomplishes its current subtask and the redistribution resumes.
After all computations are completed, A5’s result is as follows:
Thus when a server can only perform certain subtasks, redistribution of subtasks may be unavoidable when exceptions are thrown. In this case use @a option in callx function to distribute each task to the server strictly in order, as shown in the following cellset:
A | |
1 | [192.168.0.86:4001,192.168.0.66:4001,192.168.0.86:4004, 192.168.0.66:4001,192.168.0.86:4001] |
2 | [2010,2011,2012,2013,2014] |
3 | =callx@a(“CalcStock2.dfx”,A2;A1) |
4 | =A3.merge(SID).groups(SID;sum(Count):Count,max(Maximum):Maximum,min(Minimum):Minimum,sum(Sum):Sum) |
5 | =A4.new(SID,Count,Maximum,Minimum,round(Sum/Count,2):Average) |
To use callx@a, the length of the sequence of servers should be the same as the number of the subtasks, and duplicated servers are allowed. Thus each subtask will be sure to be distributed to a specified server, on which if the parallel subtasks are full, this subtask will wait until it is available and the distribution continues. The System Output window displays information about distribution and execution of the subtasks:
The use of @a option can avoid the loss of efficiency caused by redistribution of subtasks.
From this example we can see that troubles will arise if a subtask can only be performed on a certain server. This may even result in the failure of the whole task if one of the servers goes wrong. The problem can be solved by storing data files on multiple servers to increase the steadiness of the system. See esProc Parallel Computing: Data Redundancy for related information.
2. Handle large-scale computation
In the previous examples, cluster computing is used to handle data-intensive computational task. By processing part of the data in each subtask, it makes the most use of the limited memory. In another scenario, the task involves extremely complicated computation that can also be split apart through cluster computing into multiple subtasks that will be distributed among multiple servers for execution. Results of the subtasks will be merged by the main program.
Take the following subroutine CalcPi.dfx as an example:
A | B | C | |
1 | 1000000 | 0 | >output@t(“Task “+ string(arg1)+ ” start…”) |
2 | for A1 | =rand() | =rand() |
3 | =B2*B2+C2*C2 | ||
4 | if B3<1 | >B1=B1+1 | |
5 | >output@t(“Task “+ string(arg1)+ ” finish.”) | result B1 |
Parameter arg1 is used to record serial numbers of subtasks:
This subroutine is created to estimate the value of π – ratio of circumference to diameter – using probability theory. Look at the following figure:
There is the quarter-circle in a square with side length of 1. Area of the square is 1 and area of the sector isπ/4. The probability of a point of the square that falls in the sector is the ratio of their areas, i.e. π/4. This subroutine randomly generates 1,000,000 points whose x, y coordinates are within the interval [0,1), computes the distances of each of these points from the origin, records the number of points that fall in the sector and then estimates the value of π.
The main program is as follows:
A | B | |
1 | [192.168.0.86:4001,192.168.0.66:4001,192.168.0.86:4004] | 20 |
2 | =callx(“CalcPi.dfx”,to(B1);A1) | =A2.sum()*4/(1000000d*B1) |
By invoking the subroutine 20 times, cluster computing distributes the task of computing 20,000,000 points into servers (they are those used in the previous section). Distribution and execution information of the subtasks can be seen on each server’s System Output window:
It can be seen that the number of subtasks exceeds the maximum number of subtasks the servers can receive. After the first 7 subtasks are distributed, all the three servers have reached their maximum number of parallel subtasks set for them. The subsequent subtasks need to wait to be distributed until a server is available. In this case a subtask will be given to any server that has the idle resource, so distribution solutions may vary. Seen from the output information of Server III, subtasks executed on the same server have separate threads and each computational process is an independent one. A subtask added behind another one may be accomplished earlier, which will not affect the result of parallel processing.
The approximate value of π is computed in B2. Because the computation is performed based on probability, every result is slightly different from other ones.