Background & Challenge
Motivation & Goal
System Design & Implementation
Paracel in Action
Background & Challenge
Motivation & Goal
System Design & Implementation
Paracel in Action
5+ follows
100+ note activities
200+ movie actions
500+ dongxi purchases
1000+ status activities
10000+ fm records
...
Distributed - Parallel - Optimize
Scalablity T(n, p)
Load Balance
Background & Challenge
Motivation & Goal
System Design & Implementation
Paracel in Action
Scalability Challenge
Performance Challenge
Machine Learning Algorithms
Scalability
High-level Communication Model
Adaptive to Machine Learning Problems
Easy Programming
Background & Challenge
Motivation & Goal
System Design & Implementation
Paracel in Action
Machine Learning Problems
$$ \theta \leftarrow \theta + \delta $$
Jeff Dean's talk @stanford, 2013
Split Model with a Global Distributed Key-Value Storage
Simple Communication Model
Predictable stragglers
Unpredictable stragglers
Load Imbalance(little tricky)
Asynchronous Controlling for iterative tasks
spend less time waiting for each other
spend less time communicating with parameter server
help slow/straggling workers to catch up
currently only applies to:
C++11
Eigen3
MPI
ZeroMQ
Msgpack-C: an increment version
Parasol(prototype) finished at 2013-10-25
First Internel Release at 2014-03-31
paracel_load
Load Balance
Client
Server
Rpc/Communication
bool ssp_switch = true; int limit_s > 0; int clock_server = 0; int stale_cache, clock, total_iters; paracel::dict_typecached_para;
bool paracel_write(const string & k, void paracel_update(const string & k, const V & v) { const V & delta) { cached_para[key] = val; V v = remote_update(k, delta); return remote_push(k, v); cached_para[key] = v; } }
bool paracel_read(const string & k, V & v) { if(stale_cache + limit_s > clock) { v = cached_para[k]; } else { while(stale_cache + limit_s < clock) { stale_cache = pull_int("server_clock"); } cached_para[key] = remote_pull<V>(key); val = caced_para[key]; } }
void set_total_iters(int n) { total_iters = n; }
void iter_commit() { string clock_key = "client_clock_" + std::to_string(clock % limit_s); remote_incr_int(clock_key, 1); clock ++; }
...
clock server
...
Background & Challenge
Motivation & Goal
System Design & Implementation
Paracel in Action
On Douban's Dev Machine
xunzhang@balin:/mfs/user/xunzhang$ git clone https://github.com/douban/paracel.git;
xunzhang@balin:/mfs/user/xunzhang/paracel$ cd paracel; mkdir build;
xunzhang@balin:/mfs/user/xunzhang/paracel/build$ cd build;
xunzhang@balin:/mfs/user/xunzhang/paracel/build$ cmake ..; make -j 2; make install
1. Your class must inherits paralg baseclass
#include "ps.hpp" class HOF : public paralg { public: HOF(paracel::Comm comm, string hosts_dct_str, string input, string output, int a) : paralg(hosts_dct_str, comm, output, 1) { this.a = a; } private: int a; };
2. Write a update function
#include "proxy.hpp" #include "paracel_types.hpp" int reduce(int a, int b) { return a + b; } // register your updater into parcel framework update_result updater = update_proxy(reduce);
3. Implement the solve member function
4. Create a driver
#include "utils.hpp" #include "wc.hpp" DEFINE_string(server_info); DEFINE_string(cfg_file); int main(int argc, char *argv[]) { // init worker environment paracel::main_env comm_main_env(argc, argv); paracel::Comm comm(MPI_COMM_WORLD); // init parameter server environment google::SetUsageMessage("[options]\n\t--server_info\n"); google::ParseCommandLineFlags(&argc, &argv, true); string input = "/alg/data/*.csv"; string output = "result.txt"; int a = 7; // create a instance HOF solver(comm, FLAGS_server_info, input, output, a); // solve problems solver.solve(); return 0; }
specify LD_LIBRARY_PATH, then use prun.py
Options: -h, --help show this help message and exit -p PARASRV_NUM, --snum=PARASRV_NUM number of parameter servers --m_server=local | mesos | mpi running method for parameter servers. If not given, set with the same value of -m or --method --ppn_server=PPN_SERVER mesos case: procs number per node of parameter servers. If not given, set with the same value of --ppn --mem_limit_server=MEM_LIMIT_SERVER mesos case: memory size of each task in parameter servers. If not given, set with the same value of --mem_limit --hostfile_server=HOSTFILE_SERVER mpi case: hostfile for mpirun of parameter servers -w WORKER_NUM, --wnum=WORKER_NUM number of workers for learning -m local | mesos | mpi, --method=local | mesos | mpi running method for workers --ppn=PPN mesos case: procs number per node for workers --mem_limit=MEM_LIMIT mesos case: memory size of each task of workers --hostfile=HOSTFILE mpi case: hostfile for mpirun for workers -c CONFIG, --cfg_file=CONFIG config file in json fmt, for alg usage
class WordCount : public paralg { private: vector<pair<string, int> > result; }; void WordCount::learning(const vector<string> & lines) { for(auto & line : lines) { auto word_lst = parser(line); for(auto & word : word_lst) { paracel_bupdate(word, 1, "/local/lib/libwc_update.so", "wc_updater"); } // word_lst } // lines // update function sync(); int local_update(int a, int b) { paracel_read_topk(topk, result) return a + b; } } virtual void WordCount::solve() { paracel::update_result wc_updater = auto lines = paracel_load(input); paracel::update_proxy(local_update); sync(); learning(lines); sync(); }
void logistic_regression::solve() { auto lines = paracel_load(input); local_parser(lines); sync(); // init data set_total_iters(rounds); dgd_learning(); } void logistic_regression::dgd_learning() { theta = paracel::random_double_list(data_dim); paracel_write("theta", theta); // init push for(int rd = 0; rd < rounds; ++rd) { for(int i = 0; i < data_dim; ++i) { delta[i] = 0.; } random_shuffle(idx.begin(), idx.end()); theta = paracel_read<vector<double> >("theta"); // pull thera for(auto sample_id : idx) { for(int i = 0; i < data_dim; ++i) { delta[i] += coff1 * samples[sample_id][i] - coff2 * theta[i]; } } // traverse paracel_bupdate("theta", delta, "update.so", "lg_theta_update"); // update with delta iter_commit(); } theta = paracel_read<vector<double> >("theta"); // last pull }
// filename: update.so // funcname: lg_theta_update vector<double> local_update(const vector<double> & a, const vector<double> & b) { vector<double> r; for(size_t i = 0; i < a.size(); ++i) { r.push_back(a[i] + b[i]); } return r; } paracel::update_result lg_theta_update = paracel::update_proxy(local_update);
$$ pagerank(A) = \frac{1-d}{N} + d \times \sum_{it \in inlink} \frac{pr(it)}{out-degree(it)} $$
class PageRankVertex : public Vertex<double, void, double> { virtual void compute(MessaeIterator *msgs) { if(superstep() >= 1) { double sum = 0.; for(; !msgs->Done(); msg->Next()): sum += msg->Value(); *MutableValue = (1 - d) / NumVertices() + d * sum; } // if if(superstrp() < 30) { const int64 n = GetOutEdgeIterator().size(); SendMessageToAllNeighbors(GetValue() / n); } else { Vote2Halt(); } } // compute };
10+ algorithms, 15+ users
User applications:
Communication Efficient Distributed Machine Learning with the Parameter Server, NIPS14
Scaling Distributed Machine Learning with the Parameter Server, OSDI14
Parameter Server for Distributed Machine Learning, Big Learning Workshop of NIPS13
Talk from Jeff Dean, Jan 2013, Stanford
Solving the straggler problem with bounded staleness, HotOS13
More Effective Distributed ML via a Stale Synchronous Parallel Parameter Server, NIPS13
Large Scale Distributed Deep Networks, NIPS12
MapReduce/Bigtable for distributed optimization, NIPS10 LCCC Workshop
Piccolo: Building Fast, Distributed Programs with Partitioned Tables, OSDI10
Fault Tolerance
Data Flow Interface
Streaming Paracel
A Distributed Optimization Framework
Distributed Both Dataset and Parameter Space
SSP to Solve the Straggler Problem
Easy Programming
Try it out at : https://github.com/douban/paracel
About Me