Skip to content Skip to sidebar Skip to footer

Embarassingly Parallel DB Update Using Python (PostGIS/PostgreSQL)

I need to update every record in a spatial database in which I have a data set of points that overlay data set of polygons. For each point feature I want to assign a key to relate

Solution 1:

Okay this is an answer to my own post. Well done me =D

Produces about a 150% increase in speed on my system going from a single core thread to quad core multiprocessing.

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
    multiprocessing.Process.__init__(self)
    self.task_queue = task_queue
    self.result_queue = result_queue

def run(self):
    proc_name = self.name
    while True:
        next_task = self.task_queue.get()
        if next_task is None:
            print 'Tasks Complete'
            self.task_queue.task_done()
            break            
        answer = next_task()
        self.task_queue.task_done()
        self.result_queue.put(answer)
    return


class Task(object):
def __init__(self, a):
    self.a = a

def __call__(self):        
    pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
    pyConn.set_isolation_level(0)
    pyCursor1 = pyConn.cursor()

        procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

    pyCursor1.execute(procQuery)
    print 'What is self?'
    print self.a

    return self.a

def __str__(self):
    return 'ARC'
def run(self):
    print 'IN'

if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()

num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
for w in consumers:
    w.start()

pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConnX.set_isolation_level(0)
pyCursorX = pyConnX.cursor()

pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')    
temp = pyCursorX.fetchall()    
num_job = temp[0]
num_jobs = num_job[0]

pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')    
cityIdListTuple = pyCursorX.fetchall()    

cityIdList = []

for x in cityIdListTuple:
    cityIdList.append(x[0])


for i in xrange(num_jobs):
    tasks.put(Task(cityIdList[i - 1]))

for i in xrange(num_consumers):
    tasks.put(None)

while num_jobs:
    result = results.get()
    print result
    num_jobs -= 1

Now I have another question which I have posted here:

Create DB connection and maintain on multiple processes (multiprocessing)

Hopefully we can get rid of some overhead and speed this baby up even more.


Solution 2:

In plain SQL one could do something like:

UPDATE city ci
SET gid_fkey = co.gid 
FROM country co 
WHERE ST_within(ci.the_geom , co.the_geom) 
AND ci.city_id = _some_parameter_
        ;

There could be a problem if a city would fit into more than one country (causing multiple updates to the same target row), but that is probably not the case in your data.


Post a Comment for "Embarassingly Parallel DB Update Using Python (PostGIS/PostgreSQL)"