Paradigm(kiss | MPI | OpenMP | Mapreduce | Spark)
Why distributed computational framework
Scalability Challenge
One machine with multi-threads | SSD | GPU
Parallel version of ML alg(impl with MPI/pthread/bsp...)
Distributed computing architecture(Mapreduce/Graphlab/Spark/Pregel...)
Scalability(both data size and model size)
Simple communication model
Learning problem(iterative)
Optimized synchronous/straggler-worker problem/last-reducer problem
Minimized model transformation cost
Parameter server
Asynchronous model
Learning problems
$$ \theta \leftarrow \theta + \delta $$
Predictable stragglers
Unpredictable stragglers
may get 'stale' para
not too 'stale'
text file | user-defined partition | restricted fmt
one socket per req/rep
k-v tables
replicas | paxos-like strategy
pattern: linesplit(default) | fmap | smap | fsmap | fvec
mix flag
# parasol loader usage parser = lambda line: line.strip('\n').split() ld = loader('./*.csv', 'fsmap', parser, False) lines = ld.load() graph = ld.create_graph(lines) matrix = ld.create_matrix(lines)
estimation | 1/5000.
100-500 procs | 0.90-0.99
1000-3000 procs | 0.54-0.81
ft api(TODO)
parser + pattern + mix_flag
one k-v table
user-defined update | update vs inc
no replicas | alg responsibility
def h(x, theta): return e ** np.dot(x, theta) / (1. + e ** np.dot(x, theta)) def log_reg_sgd(x, y, alpha, iter): m, n = x.shape theta = np.random.rand(n) z = np.arragne(m) for it in xrange(iter): z = np.random.permutation(z) for i in z: theta += alpha * (y[i] - h(x[i], theta)) * x[i] return theta
class sgd(paralg): def __init__(self): ... def sgd_kernel(self): self.theta = np.random.rand(n) paralg.write(self, 'theta', self.theta) m, n = self.sample.shape self.theta = np.random.rand(n) z = np.arragne(m) for it in xrange(iter): z = np.random.permutation(z) for i in z: self.theta = paralg.paralg.read(self, 'theta') delta = alpha * (y[i] - h(x[i], theta)) * x[i] self.theta += delta paralg.paralg_inc(self, 'theta', delta) self.theta = paralg.paralg_read(self, 'theta') return theta
class sgd(paralg): def __init__(self): ... def sgd_kernel(self): self.splits = paralg.get_decomp(self) self.theta = np.random.rand(n) paralg.write(self, 'theta', self.theta) m, n = self.sample.shape self.theta = np.random.rand(n) z = np.arragne(m) for it in xrange(iter): z = np.random.permutation(z) for i in z: self.theta = paralg.paralg.read(self, 'theta') delta = alpha * (y[i] - h(x[i], theta)) * x[i] self.theta += delta paralg.paralg_inc(self, 'theta', delta / self.splits) self.theta = paralg.paralg_read(self, 'theta') return theta
limit_s = 3 class sgd(paralg): def __init__(self): ... def sgd_kernel(self): self.splits = paralg.get_decomp(self) self.theta = np.random.rand(n) paralg.write(self, 'theta', self.theta) m, n = self.sample.shape self.theta = np.random.rand(n) z = np.arragne(m) for it in xrange(iter): z = np.random.permutation(z) for i in z: self.theta = paralg.paralg.read(self, 'theta') delta = alpha * (y[i] - h(x[i], theta)) * x[i] self.theta += delta paralg.paralg_inc(self, 'theta', delta / self.splits) paralg.phase_done(self) self.theta = paralg.paralg_read(self, 'theta') return theta
class paralg: def get_nodeid(): def get_nodesize(): def set_phases(); def sync(): def loadinput(): def getlines(): def get_graph(): def paralg_read(): def paralg_write(): def paralg_inc(): def phase_done(): ...
jump read/write/update
server 1 | k 50 | netflix_small | 0.005,0.02 | rounds 5 | limit_s 0 | beater7
rmse | pull/round time | learning/round time | inc/round time | total time | |
1 | 0.8820 | 9s | 470-514s | 1-2s | 3109s |
2 | 0.8435 | 6-16s | 275-310s | 6-9s | 1585s |
4 | 0.8796 | 7-15s | 130-140s | 3-5s | 780s |
8 | 0.8966 | 7-25s | 49-75s | 3-4s | 455s |
16 | 0.9163 | 14-26s | 40-69s | 2-4s | 440s |
server 1 | k 50 | netflix_small | 0.005,0.02 | rounds 5 | worker 16 | beater7
rmse | total | |
0 | 0.9163 | 440s |
1 | 0.9170 | 422s |
3 | 0.9171 | 454s |
5 | 0.9147 | 407s |
server scalability
mf-v2 to test limit_s
pi-like(word count) | y
pagerank | graph algorithms | bsp | y
other | cos?
bug fix
C++ version
optimize hash function (load balance & key balance)
A distributed framework for learning algs
Distributed both dataset and parameter space
Ssp to solve last-reducer problem
0 pain from serial to parallel
(0.5,1) - (0, 0.5) - (1,0.5) - (0.5, 1) - (0.5, 0.25) - (0.25, 0.25)
Talk from Jeff Dean, Jan 2013, Stanford
Large Scale Distributed Deep Networks, NIPS12
Solving the straggler problem with bounded staleness, HotOS13
MapReduce/Bigtable for distributed optimization, NIPS10 LCCC Workshop.
contact info