Look Back


Concept/Terminology


Summation(plus)


Paradigm(kiss | MPI | OpenMP | Mapreduce | Spark)


Why distributed computational framework


Motivation

Scalability Challenge

  • massive data size
  • massive model size

Some Solutions

One machine with multi-threads | SSD | GPU

  • hardware solution | expensive | sample data

Parallel version of ML alg(impl with MPI/pthread/bsp...)

  • trivial | less readable | no code reuse

Distributed computing architecture(Mapreduce/Graphlab/Spark/Pregel...)

  • mapreduce | model transformation cost | no communication api
  • Spark | wide dependency | iterative tasks
  • Bsp/pregel | synchronous overhead | Graphlab?

Challenge/Goal

Scalability(both data size and model size)

Simple communication model

Learning problem(iterative)

Optimized synchronous/straggler-worker problem/last-reducer problem

Minimized model transformation cost

Parasol

Parameter server

  • motivated by DistBelief(google's deep-learning-system)
  • a distributed centralized key-value storage
  • shared parameters by workers

Asynchronous model

  • stale synchronous parallel(SSP)
  • lazy update | bounded staleness

Abstraction

    Learning problems

    $$ \theta \leftarrow \theta + \delta $$

Communication Model

Description
source: infolab.stanford.edu/infoseminar/dean.pdf

Communication Model

Description
source: infolab.stanford.edu/infoseminar/dean.pdf

Communication Model

Description
source: infolab.stanford.edu/infoseminar/dean.pdf

Communication Model

Description
source: infolab.stanford.edu/infoseminar/dean.pdf

Communication Model

Description
source: infolab.stanford.edu/infoseminar/dean.pdf

Communication Model

Description
source: infolab.stanford.edu/infoseminar/dean.pdf

Communication Model

Description
source: infolab.stanford.edu/infoseminar/dean.pdf

Communication Model

Description
source: infolab.stanford.edu/infoseminar/dean.pdf

Communication Api

read

write

update

read_all

...

Last-Reducer Problem

Predictable stragglers

  • Slow/old machine
  • Bad network card
  • Load unbalance

Unpredictable stragglers

  • Hardware: disk seeks | network | CPU interrupts
  • Software: garbage collection | virtualization
  • Algorithmic: Calculating objectives | stop conditions

SSP

phase/clock

limit_s

may get 'stale' para

not too 'stale'

SSP

Description

First Design

text file | user-defined partition | restricted fmt

one socket per req/rep

k-v tables

replicas | paxos-like strategy

Loader

pattern: linesplit(default) | fmap | smap | fsmap | fvec

mix flag

# parasol loader usage
parser = lambda line: line.strip('\n').split()
ld = loader('./*.csv', 'fsmap', parser, False)
lines = ld.load()
graph = ld.create_graph(lines) 
matrix = ld.create_matrix(lines)
      

Socket/Threading Model

Client

  • lazy socket
  • block: write, read
  • nonblock: update

Server

  • multi-threads per k-v procs
  • pull-like
  • push-like
  • delete/clear
  • update
  • x

Multi2multi

Fault Tolerance

estimation | 1/5000.

100-500 procs | 0.90-0.99

1000-3000 procs | 0.54-0.81

ft api(TODO)

Second Design

parser + pattern + mix_flag

one k-v table

user-defined update | update vs inc

no replicas | alg responsibility

Implementation

Python

Mpi4py

Zeromq

Msgpack

Demo (logistic regression)

def h(x, theta):
    return e ** np.dot(x, theta) / (1. + e ** np.dot(x, theta))

def log_reg_sgd(x, y, alpha, iter):
    m, n = x.shape
    theta = np.random.rand(n)
    z = np.arragne(m)
    for it in xrange(iter):
        z = np.random.permutation(z)
        for i in z:
            theta += alpha * (y[i] - h(x[i], theta)) * x[i]
    return theta
    

Work with Parasol

class sgd(paralg):
    def __init__(self): ...

    def sgd_kernel(self):
        self.theta = np.random.rand(n)
        paralg.write(self, 'theta', self.theta)
        m, n = self.sample.shape
        self.theta = np.random.rand(n)
        z = np.arragne(m)
        for it in xrange(iter):
            z = np.random.permutation(z)
            for i in z:
                self.theta = paralg.paralg.read(self, 'theta')
                delta = alpha * (y[i] - h(x[i], theta)) * x[i]
                self.theta += delta
                paralg.paralg_inc(self, 'theta', delta)
        self.theta = paralg.paralg_read(self, 'theta')
        return theta
    

Work with Parasol

class sgd(paralg):
    def __init__(self): ...

    def sgd_kernel(self):
        self.splits = paralg.get_decomp(self)
        self.theta = np.random.rand(n)
        paralg.write(self, 'theta', self.theta)
        m, n = self.sample.shape
        self.theta = np.random.rand(n)
        z = np.arragne(m)
        for it in xrange(iter):
            z = np.random.permutation(z)
            for i in z:
                self.theta = paralg.paralg.read(self, 'theta')
                delta = alpha * (y[i] - h(x[i], theta)) * x[i]
                self.theta += delta
                paralg.paralg_inc(self, 'theta', delta / self.splits)
        self.theta = paralg.paralg_read(self, 'theta')
        return theta
    

Work with Parasol (ssp)

limit_s = 3
class sgd(paralg):
    def __init__(self): ...

    def sgd_kernel(self):
        self.splits = paralg.get_decomp(self)
        self.theta = np.random.rand(n)
        paralg.write(self, 'theta', self.theta)
        m, n = self.sample.shape
        self.theta = np.random.rand(n)
        z = np.arragne(m)
        for it in xrange(iter):
            z = np.random.permutation(z)
            for i in z:
                self.theta = paralg.paralg.read(self, 'theta')
                delta = alpha * (y[i] - h(x[i], theta)) * x[i]
                self.theta += delta
                paralg.paralg_inc(self, 'theta', delta / self.splits)
            paralg.phase_done(self)
        self.theta = paralg.paralg_read(self, 'theta')
        return theta
    

Parasol APIs

class paralg:

    def get_nodeid():

    def get_nodesize():

    def set_phases();

    def sync():

    def loadinput():

    def getlines():

    def get_graph():

    def paralg_read():

    def paralg_write():

    def paralg_inc():

    def phase_done():
    
    ...
    

Optimization

batch_read/write/update

jump read/write/update

Piece of Result

worker scalability

server 1 | k 50 | netflix_small | 0.005,0.02 | rounds 5 | limit_s 0 | beater7

rmsepull/round timelearning/round timeinc/round timetotal time
10.88209s470-514s1-2s3109s
20.84356-16s275-310s6-9s1585s
40.87967-15s130-140s3-5s780s
80.89667-25s49-75s3-4s455s
160.916314-26s40-69s2-4s440s

Piece of Result

limit_s

server 1 | k 50 | netflix_small | 0.005,0.02 | rounds 5 | worker 16 | beater7

rmsetotal
00.9163440s
10.9170422s
30.9171454s
50.9147407s

More Result

server scalability

mf-v2 to test limit_s

demo

General Framework?

pi-like(word count) | y

pagerank | graph algorithms | bsp | y

other | cos?

Future Work

bug fix

C++ version

optimize hash function (load balance & key balance)

ft

Conclusion

A distributed framework for learning algs

Distributed both dataset and parameter space

Ssp to solve last-reducer problem

0 pain from serial to parallel

Two Love Stories

Logo

(0.5,1) - (0, 0.5) - (1,0.5) - (0.5, 1) - (0.5, 0.25) - (0.25, 0.25)

Reference

Thank You!

contact info