import socket import os import sys sys.path.append('/home/csee1/bwilson1/BDServer/image/PIL') sys.path.append('/home/csee1/bwilson1/BDServer') import time import glob import signal import marshal import base64 import zlib from random import random from random import seed from random import shuffle from OpenSSL import SSL import PolicyParser import BDtree START_TIME = time.time() NUM_MESSAGES = 0 TOTAL_BYTES_SENT = 0 STATS_COUNT = 0 DOT_PATH = "dot" BLACK_LIST = [] DEBUG = True WS_POLICY = False try: sum except NameError: def sum(l): total = 0 for num in l: total += num return total class WSPolicyMismatch: pass def getloadavg(): return float(open('/proc/loadavg').read().split()[0]) def get_ip(): # return '192.168.0.50' return os.popen('/sbin/ifconfig').read().split()[6].split(':')[1] # Prints out debugging messages to stderr def debug_print(msg): if DEBUG and __name__ == "__main__": sys.stderr.write(str(time.ctime(time.time())) + " ~ " + \ msg + '\n') # Returns the load everage in the past minute. # load of 3.0 = 300% of CPU would have been used # load of .99 = 99% CPU usage def get_load(): return getloadavg() # creates a lock by creating a directory def lock_file(path, blocking=True, ident="someone"): DEAD_LOCK_MAYBE = 10 while True: try: os.mkdir(path + ".LOCK") open(path+".LOCK/"+ident,'w').close() # debug_print(ident + " acquired lock on " + path) return True # Lock acquired except OSError: # debug_print(ident + " failed to acquire lock on " + path) if not blocking: return False DEAD_LOCK_MAYBE -= 1 time.sleep(random()/3 + .05) if DEAD_LOCK_MAYBE < 0: debug_print(ident + " failing to acquire lock. Is there a deadlock?") # releases a lock by deleting a directory def unlock_file(path): locker = glob.glob(path+".LOCK/*")[0] # debug_print(locker + " releasing lock on " + path) try: os.remove(locker) os.rmdir(path + ".LOCK") except OSError: debug_print("No lock to delete?!") # reads in a configuration file of form KEY=VALUE and returns the dictionary def read_config(path): lines = open(path).readlines() config = {} for line in lines: line = line.strip() if len(line) == 0: continue if line[0] == '#': continue key, value = line.split('=') key = key.strip().lower() value = value.strip().lower() # turn it into a float if we can try: value = float(value) except ValueError: pass config[key] = value return config def message(host, port, message, ws_policy="NONE"): global TOTAL_BYTES_SENT global NUM_MESSAGES if ws_policy == "NONE": ws_policy = WS_POLICY if host in BLACK_LIST: raise socket.error sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def noob(a,b,c,d,e): pass # print 'callback',a,b,c,d,e ctx = SSL.Context(SSL.SSLv3_METHOD) ctx.set_verify(SSL.VERIFY_NONE, noob) s = SSL.Connection(ctx,sk) s.connect((str(host), int(port))) #debug_print("Message sent to " + host + ":" + str(port) + " - " + str(message)) # debug_print("WS-POLICY ~ Sending Policy: " + str(ws_policy)) encoded_policy = encode(ws_policy) TOTAL_BYTES_SENT += len(encoded_policy) s.send(encoded_policy) NUM_MESSAGES += 1 raw_data = s.recv(320000) ws_intersect = decode(raw_data) # debug_print("WS-POLICY ~ Got intersection: " + str(ws_intersect)) if ws_policy and (ws_intersect == False): debug_print("WS-POLICY ~ We can't talk to " + host + ":" + str(port) + " - Policy mismatch") raise WSPolicyMismatch encoded_message = encode(message) TOTAL_BYTES_SENT += len(encoded_message) s.send(encoded_message) NUM_MESSAGES += 1 try: tmp = s.recv(10000000) data = decode(tmp) except EOFError: raise socket.error except SSL.SysCallError: raise socket.error # debug_print("Message recieved from " + host + ":" + str(port) + " - " + str(data)) return data # encodes an object by marshaling it then base 64 encoding it def encode(obj): return zlib.compress(marshal.dumps(obj)) #return base64.encodestring(marshal.dumps(obj)) # encodes an object by base 64 decoding it then unmarshaling it def decode(encoded_str): return marshal.loads(zlib.decompress(encoded_str)) #return marshal.loads(zlib.decompress(base64.decodestring(encoded_str))) def package_files(base_dir, file_paths): old_path = os.path.abspath('.') os.chdir(base_dir) package = [] for file_path in file_paths: package.append((file_path, open(file_path, 'rb').read())) os.chdir(old_path) return encode(package) def unpackage_files(base_dir, package): old_path = os.path.abspath('.') os.chdir(base_dir) files = decode(package) for file_path, contents in files: open(file_path, 'wb').write(contents) os.chdir(old_path) def get_wsdl(bdserver_addr, service, ws_policy_file='/home/csee1/bwilson1/BDServer/Policy1.xml'): ip, port = bdserver_addr.split(':') try: response = message(ip, int(port), ["WHEREIS", service, PolicyParser.ParsePolicy(ws_policy_file)], PolicyParser.ParsePolicy(ws_policy_file)) except socket.error: return False except: return False if response[0] == "CANTFIND": return False if response[0] == "POLICYMISMATCH": print "policy mismatch for service: " + service + "
" return False if response[0] == "127.0.0.1" or response[0] == "ME": host = ip else: host = response[0] port = response[1] ws_dir = response[2] url = "http://" + host + ":" + str(port) + "/" + ws_dir + service + "?wsdl" return url def kill_all(bdserver_addr): to_get = [bdserver_addr] data = {} while len(to_get) > 0: next_addr = to_get.pop() if next_addr in data: continue ip, port = next_addr.split(':') try: data[next_addr] = message(ip, int(port), ["STATUS"]) message(ip, int(port), ["KILL"]) to_get += data[next_addr][0].keys() except socket.error: data[next_addr] = False except: pass return len(data) def get_audit_info(bdserver_addr): to_get = [bdserver_addr] data = {} # Format for STATUS incoming message: # [servers, (static_services, dynamic_services), (load, RST, RRT, RT)] while len(to_get) > 0: next_addr = to_get.pop() if next_addr in data: continue ip, port = next_addr.split(':') try: data[next_addr] = message(ip, int(port), ["STATUS"]) to_get += data[next_addr][0].keys() except socket.error: data[next_addr] = False except: pass return data def create_dot_file(audit_info, output_file): output_str = 'digraph bdnodes {\n\tsize="8,15";' edges = [] nodes = [] services = [] dead = [] for key_from in audit_info: if audit_info[key_from] == False: dead.append(key_from) continue nodes.append(key_from) for key_to in audit_info: if key_from == key_to: continue elif key_to not in audit_info[key_from][0].keys(): edges.append('\t"'+key_from+'" -> "'+key_to+'"[color=red];') elif audit_info[key_to] == False and key_to in audit_info[key_from][0].keys(): edges.append('\t"'+key_from+'" -> "'+key_to+'"[color=orange];') elif not PolicyParser.hasIntersection(audit_info[key_from][3], audit_info[key_to][3]): edges.append('\t"'+key_from+'" -> "'+key_to+'"[color=yellow];') else: edges.append('\t"'+key_from+'" -> "'+key_to+'"[color=green];') for service in audit_info[key_from][1][0]: edges.append('\t"'+key_from+'" -> "' +service + \ '";') for service in audit_info[key_from][1][1]: edges.append('\t"'+key_from+'" -> "' + service + \ '" [color=blue];') if len(dead) > 0: output_str += "\n\tnode [color=red, fontsize=40];" for node in dead: output_str += ' "' + node + '"' output_str += ';' output_str += "\n\tnode [color=green, fontsize=40 ];" for node in nodes: output_str += ' "' + node + '"' output_str += ';' output_str += "\n\tnode [color=black, fontsize=30] ; " for edge in edges: output_str += "\n" + edge + "\n" output_str += '\n}\n' output = open(output_file, 'w') output.write(output_str) output.close() def create_dot_png(audit_info, bdserver_addr): create_dot_file(audit_info, '/home/csee1/bwilson1/www/bdnetwork.dot.txt') message(bdserver_addr.split(':')[0], int(bdserver_addr.split(':')[1]), \ ['MAKEIMAGE']) # BDdaemon.create_graph(None, '/home/don/Desktop/test.png') def create_graph(audit_info, output_file, ROW_HEIGHT=20, HOST_WIDTH=165, BAR_HEIGHT=16, WEIGHT_WIDTH=4, TITLE_HEIGHT = 23, MIN_WIDTH=720): sys.stderr = sys.stdout import Image import ImageDraw import ImageFont # Get the list of services services = [ [ service for service in audit_info[site][1][0] ] for site in audit_info if audit_info[site] ] services += [ [ service for service in audit_info[site][1][1] ] for site in audit_info if audit_info[site] ] # Add the services to the color key hash color_key = {} for msite in services: for service in msite: color_key[service] = None # Possible colors colors = [(255,80,80), (80,255,80), (80,80,255), (255,255,80), (255,80,255), \ (80,255,255), (255,128,80),(255,80,128), (128,255,80), (80,255,128), \ (128,80,255), (80,128,255),(255,128,128),(128,255,128),(128,128,255) ] # Assign colors to services services = color_key.keys() services.sort() for service in services: color_key[service] = colors[0] colors = colors[1:] + [colors[0]] NUM_SITES = len(audit_info) # THe height of the image HEIGHT = ROW_HEIGHT*NUM_SITES + ROW_HEIGHT * len(color_key) + TITLE_HEIGHT # Calculate the largest total weight. sites = audit_info.keys() weights = [] site_weights = [] #print audit_info for i in range(len(sites)): site = sites[i] if audit_info[site] == False: continue #print "
DEBUG
", audit_info[site][1][0], "
END DENUBG
" #static_services static_services_weight = sum([ audit_info[site][1][0][service] for service in audit_info[site][1][0].keys()]) #dynamic_services dynamic_services_weight = sum([ audit_info[site][1][1][service] for service in audit_info[site][1][1].keys()]) site_weights.append((static_services_weight + dynamic_services_weight,site)) site_weights.sort() site_weights.reverse() WIDTH = (int(site_weights[0][0] * WEIGHT_WIDTH)) + HOST_WIDTH + 10 if WIDTH < MIN_WIDTH: WIDTH = MIN_WIDTH im = Image.new( "RGB", (WIDTH, HEIGHT), (255,255,255)) try: luxi16 = ImageFont.truetype("/usr/share/fonts/truetype/freefont/FreeSans.ttf", 16) except IOError: # try: luxi16 = ImageFont.truetype("/usr/X11R6/lib/X11/fonts/TTF/luxisr.ttf", 16) # except IOError: # debug_print("No font!!") #return draw = ImageDraw.Draw(im) # Write out top of the table draw.text((0, 0), "Site - host:port", font=luxi16, fill=(0,0,0)) for i in range(int((WIDTH - HOST_WIDTH)/ WEIGHT_WIDTH) - 10): if i % 10 == 0: for y in range(TITLE_HEIGHT - 9, TITLE_HEIGHT-4): im.putpixel((i*WEIGHT_WIDTH + HOST_WIDTH + 3, y),(0,0,0)) draw.text((i*WEIGHT_WIDTH + 3 + HOST_WIDTH, 0), str(i), font=luxi16, fill=(0,0,0)) # Draw horizontal line separating title and rest for x in range(WIDTH): im.putpixel((x,TITLE_HEIGHT-4), (0,0,0)) im.putpixel((x,TITLE_HEIGHT-5), (0,0,0)) # Draw horizontal line separating title and rest for x in range(WIDTH): im.putpixel((x,TITLE_HEIGHT + ROW_HEIGHT*NUM_SITES-1), (0,0,0)) im.putpixel((x,TITLE_HEIGHT + ROW_HEIGHT*NUM_SITES-2), (0,0,0)) # Draw vertical line separating hosts and bars for x in range(TITLE_HEIGHT-5, HEIGHT - ROW_HEIGHT * len(color_key) ): im.putpixel((HOST_WIDTH,x), (0,0,0)) # Draw each row sites = audit_info.keys() sites.sort() i = 0 for i in range(len(sites)): site = sites[i] if audit_info[site] == False: draw.text((0, TITLE_HEIGHT + i*ROW_HEIGHT), site, font=luxi16, fill=(170,170,170)) continue #static_services static_services = [ (audit_info[site][1][0][service], service, False) for service in audit_info[site][1][0] ] #dynamic_services dynamic_services = [ (audit_info[site][1][1][service], service, True) for service in audit_info[site][1][1] ] services = static_services + dynamic_services services.sort() services.reverse() cur_col = HOST_WIDTH + 3 for weight, service, is_dynamic in services: for col in range(int(weight*WEIGHT_WIDTH)): cur_col += 1 for row in range(BAR_HEIGHT): if is_dynamic and ((i*ROW_HEIGHT + row) % BAR_HEIGHT) == (cur_col*2 % BAR_HEIGHT): im.putpixel((cur_col, TITLE_HEIGHT + i*ROW_HEIGHT + row), (0,0,0)) else: im.putpixel((cur_col, TITLE_HEIGHT + i*ROW_HEIGHT + row), color_key[service]) receiver = True for row in range(-2, BAR_HEIGHT + 2): if receiver and im.getpixel((audit_info[site][2] * WEIGHT_WIDTH + HOST_WIDTH + 3, TITLE_HEIGHT + i*ROW_HEIGHT + row)) != (255,255,255): receiver = False im.putpixel((audit_info[site][2] * WEIGHT_WIDTH + HOST_WIDTH + 3, TITLE_HEIGHT + i*ROW_HEIGHT + row), (0,0,0)) if receiver: draw.text((0, TITLE_HEIGHT + i*ROW_HEIGHT), site, font=luxi16, fill=(0,0,0)) else: draw.text((0, TITLE_HEIGHT + i*ROW_HEIGHT), site, font=luxi16, fill=(255,25,25)) services = color_key.keys() services.sort() for service in services: i += 1 draw.text((0, TITLE_HEIGHT + i*ROW_HEIGHT), service, font=luxi16, fill=color_key[service]) im.save(output_file) def strip_img(img_file): import Image img = Image.open(img_file) width,height = img.size min_row = 0 min_col = 0 max_row = height-1 max_col = width-1 stop = False i = 0 for row in range(height): for col in range(width): #print row,col i += 1 if i % 4 != 0: continue if img.getpixel((col,row)) != 15: stop = True break if stop: break min_row += 1 min_row -= 1 img = img.crop((0, min_row, width, height)) width, height = img.size i= 0 stop = False for col in range(width): for row in range(height): i += 1 if i % 4 != 0: continue if img.getpixel((col,row)) != 15: stop = True break if stop: break min_col += 1 min_col -= 1 img = img.crop((min_col, 0, width, height)) width, height = img.size max_row = height - 1 i = 0 stop = False row_idxs = range(height) row_idxs.reverse() for row in row_idxs: for col in range(width): i += 1 if i % 4 != 0: continue if img.getpixel((col,row)) != 15: stop = True break if stop: break max_row -= 1 max_row += 1 img = img.crop((0,0,width, max_row)) width, height = img.size max_col = width - 1 i = 0 stop = False col_idxs = range(width) col_idxs.reverse() for col in col_idxs: for row in range(height): i += 1 if i % 4 != 0: continue if img.getpixel((col,row)) != 15: stop = True break if stop: break max_col -= 1 max_col += 1 img = img.crop((0,0,max_col, height)) img.save(img_file) def print_audit_website(bdserver_addr): sys.stderr = sys.stdout print "Content-type: text/html\n\n"; print "BranDon Distributed WebService Audit
" try: data = get_audit_info(bdserver_addr) except socket.error: print "

Error connecting to"+bdserver_addr+"

" return # SYSTEM GRAPH print "

BranDon SYSTEM STATUS


" print "Query started at node", bdserver_addr, "at time", time.ctime(time.time()), "
" print "
" print "

NETWORK TOPOLOGY

" try: create_dot_png( data, bdserver_addr) except socket.error: print "

Error connecting to"+bdserver_addr+"

" return os.chmod("/home/csee1/bwilson1/www/bdnet.png", 0777) strip_img('/home/csee1/bwilson1/www/bdnet.png') print "
" print "
" # CONNECTIVITY STATISTICS print "

CONNECTIVITY STATISTICS

" print "Table: ROW last got a update from COLUMN at time ENTRY
" print "" # is a row, " data_sorted = data.keys() data_sorted.sort() for key in data_sorted: print "", print "\n" for key_from in data_sorted: print "" print "" for key_to in data_sorted: print "" print "" print "
is a column -- do rows first print "
" + key.split(":")[0]+"
port: "+key.split(":")[1] + "
" + key_from.split(":")[0]+"
port: "+key_from.split(":")[1] + "
" print "" if data[key_from] == False: print "?" elif key_from == key_to: print "-" elif key_to not in data[key_from][0]: print "X" else: print time.ctime(data[key_from][0][key_to]).split()[3] print "
" print "
" ##### WS-POLICY table print "

WS-POLICY INTERSECTIONS

" print "" # is a row, " print "", print "", print "", print "", print "", print "", print "\n" for i in range(len(keys)): for j in range(i + 1, len(keys)): if data[keys[i]] == False or data[keys[j]] == False: continue print "" intersection = PolicyParser.hasIntersection(data[keys[i]][3], data[keys[j]][3]) if not intersection: iopen= "" iclose= "" else: iopen="" iclose="" print "" print "" print "" print "" print "" print "" if not intersection: intersection = ["NULL"] print "" print "" print "
is a column -- do rows first keys = data.keys() keys.sort() print "
Host
Policy
Host
Policy
Intersection
"+iopen+"
"+keys[i]+"
"+iclose+"
"+iopen+"
("+ ") V (".join([ " /\\ ".join(sublist) for sublist in data[keys[i]][3] ]) +")
"+iclose+"
"+iopen+"
"+keys[j]+"
"+iclose+"
"+iopen+"
("+ ") V (".join([ " /\\ ".join(sublist) for sublist in data[keys[j]][3] ]) +")
"+iclose+"
"+iopen+"
"+ " /\\ ".join(intersection)+"
"+iclose+"
" print "
" # GRAPH print "

WEIGHT DISTRIBUTION GRAPH

" create_graph( data, "/home/csee1/bwilson1/www/bd_graph.png") os.chmod("/home/csee1/bwilson1/www/bd_graph.png", 0777) print "
" print "
" # SERVICES INFO counts = {} total = 0 static_total = 0 dynamic_total = 0 total_weight = 0 # [WEIGHT, TOTAL REPLICAS, DYNAMIC REPLICAS, STATIC REPLICAS, HITS] for key in data: if data[key] == False: continue static_services, dynamic_services = data[key][1] for service in static_services: if service not in counts: counts[service] = [0, 0, 0, 0] total += 1 static_total += 1 counts[service][1] += 1 counts[service][3] += 1 counts[service][0] += static_services[service] total_weight += static_services[service] for service in dynamic_services: if service not in counts: counts[service] = [0, 0, 0, 0] total += 1 dynamic_total += 1 counts[service][1] += 1 counts[service][2] += 1 counts[service][0] += dynamic_services[service] total_weight += dynamic_services[service] counts_table = [ tuple(counts[key]) + (key,) for key in counts ] counts_table.sort() counts_table.reverse() print "

WEB SERVICE STATISTICS

" print "", len(counts_table), "different web services
" print "", total, "replicas
" print "", dynamic_total, "dynamic replicas (", float(dynamic_total) / total *100, "% )
" print "", static_total, "static replicas (", float(static_total) / total *100, "% )
" print "", total_weight, "is the weight of the entire system
" print "", total_weight / total, " is the average weight of each service
" print "
" print "" print "" print "" print "" print "" print "" print "" for row in counts_table: print "" print "
Service NameWeight (% of total)# of replicas# of dynamic replicas# of static replicas
", print row[4], print "", print row[0], if total_weight != 0: print "(", row[0]/total_weight*100, "% )" print "", print row[1], print "", print row[2], print "", print row[3], print "
" print "
" # SERVER INFO print "

INDIVIDUAL SERVER STATISTICS

" print "" print "" print "" print "" print "" print "" print "" # [servers, (static_services, dynamic_services), (load, RST, RRT, RT)] d_keys = data.keys() d_keys.sort() for server in d_keys: if data[server] == False: continue static_services, dynamic_services = data[server][1] print "" print "" print "" RT = data[server][2] print "" print "" print "" print "" print "
HostWeightReceiver ThresholdStatic Services (Weight)Dynamic Services (Weight)
", server, "", sum(static_services.values()) + sum(dynamic_services.values()), "", RT, "" s_keys = static_services.keys() s_keys.sort() for service in s_keys: print service, "(", static_services[service], ")", if service != s_keys[-1]: print "
" print "
" d_keys = dynamic_services.keys() d_keys.sort() for service in d_keys: print service, "(", dynamic_services[service], ")", if service != d_keys[-1]: print "
" print "
" print "
" # The BD Daemon class! class BDdaemon: def __init__(self, port, username, services_dir, max_dynamic, \ receiver_threshold, service_log_dir, graveyard_dir, axis_port, axis_web_dir, \ admin_password="password", backlog=5): signal.signal(signal.SIGCHLD, signal.SIG_IGN) self.port = int(port) self.backlog = int(backlog) self.services_dir = str(services_dir) self.username = str(username) self.admin_password = str(admin_password) # self.ws_policy_file = str(ws_policy) self.RT = float(receiver_threshold) self.service_log_dir = str(service_log_dir) self.max_dynamic = int(max_dynamic) self.graveyard_dir = str(graveyard_dir) self.axis_port = int(axis_port) self.axis_web_dir = str(axis_web_dir) self.cache_path = self.username + ".bdcache" self.BUFFER_SIZE = 10000000 self.server_key = self.username+".key" self.server_csr = self.username+".csr" self.server_crt = self.username+".crt" try: [ os.stat(k) for k in [ self.server_key, self.server_csr, self.server_crt ] ] except OSError: print "Missing crypto keys!!" print self.server_key, self.server_csr, self.server_crt sys.exit(1) pid = os.fork() ## Parent discover, child #1 will listen, child #2 will import services # Parent discover, child listens if pid == 0: self.discoverer() else: self.discoverer_pid = pid self.listener_pid = os.getpid() self.listener() # This function/process is in charge of discovering other nodes and # updating cache def discoverer(self): count = 0 STARTUPSYNCS = 3 MAX_DELAY = 30 MIN_DELAY = 10 global STATS_COUNT visited = [] while True: STATS_COUNT += 1 #debug_print("Total number of messages initiated and sent: " + str(NUM_MESSAGES)) #debug_print("Total number of bytes sent: " + str(TOTAL_BYTES_SENT)) if STATS_COUNT % 5 == 0: debug_print("Average bytes/second:" + str(TOTAL_BYTES_SENT/(time.time() - START_TIME))) if count < STARTUPSYNCS: time.sleep(.25) elif self.is_overloaded(): time.sleep(30) time.sleep(random()*5) else: delay = MAX_DELAY / self.RT * self.total_weight() if delay < MIN_DELAY: delay = MIN_DELAY - random() * 2 time.sleep(delay) count += 1 # Try to grab a lock, if we fail, dont sweat it and just # go back to looping if not self.lock_cache(blocking = False, identity = "discoverer"): continue cache = self.load_cache() self.unlock_cache() # Only makes sense to do node synchronization if we know of any nodes idx_to_delete = [] if len(self.talkable_sites(cache)) > 0: # pick a random node, least recent most common for site in self.talkable_sites(cache): if site not in visited: addr = site visited.append(site) break else: for i in range(len(visited)): if not cache.has_key(visited[i]): idx_to_delete.append(i) continue if random() < .9: addr = visited[i] idx_to_delete.append(i) visited.append(addr) break else: i = int(random()*len(visited)) addr = visited[i] idx_to_delete.append(i) visited.append(addr) idx_to_delete.sort() idx_to_delete.reverse() for idx in idx_to_delete: del visited[idx] self.discover(addr) else: debug_print("Warning: I'm lonely - nobody to synchronize with") def discover(self, site): self.lock_cache(blocking = True, identity = "discover") cache = self.load_cache() addr = site host, port = addr.split(':') # formulate the message msg = ["SYNCHRONIZE", self.port, time.time(), self.get_services(), \ self.get_free_dynamic(), self.get_ws_policy()] # don't need to send the guy his own info temp = list(cache[addr]) del cache[addr] msg.append(cache) try: response = message(host, int(port), msg) # If the site doesnt respond to the message, delete it from the cache except socket.error: debug_print(addr + " doesn't respond anymore, updating cache") # Note that we are writing the cache with the popped addr self.write_cache(cache) self.unlock_cache() return except WSPolicyMismatch: # There is a policy mismatch but we dont know his policy cache[addr] = temp cache[addr][3] = True self.write_cache(cache) self.unlock_cache() return debug_print("Synchronizing with " + addr) self.handle_SYNCHRONIZE(response, (host, int(port)), lock=False) self.unlock_cache() # This function/process is in charge of handling and servicing requests def listener(self): # bind and listen #sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.use_privatekey_file(self.server_key) ctx.use_certificate_file(self.server_crt) s = SSL.Connection(ctx, socket.socket(socket.AF_INET, socket.SOCK_STREAM)) while True: try: s.bind(('', self.port)) s.listen(self.backlog) break except socket.error: time.sleep(2) debug_print("Failed to acquire bind on port " + str(self.port)) debug_print("Listening on port " + str(self.port)) while True: # Wait for a connection conn, addr = s.accept() # debug_print("Incoming connection from " + addr[0]) if addr[0] in BLACK_LIST: debug_print("Ignoring " + str(addr) + " because it in the black list") conn.close() continue # Once a connection is made, fork pid = os.fork() # Have the parent continue to wait for a request if(pid != 0): continue # Have the child handle the request, then die seed() # Need to seed so that not all children use the # same random numbers self.connection_handler(conn, addr, s) sys.exit(0) # This function is used by the listener to handle requests def connection_handler(self, conn, addr, s): if addr[0] == "127.0.0.1": addr = (get_ip(), addr[1]) other_ws_policy = decode(conn.recv(self.BUFFER_SIZE)) if other_ws_policy: intersection = PolicyParser.hasIntersection(other_ws_policy, self.get_ws_policy()) else: intersection = False conn.send(encode(intersection)) if other_ws_policy and intersection == False: sys.exit(0) data = decode(conn.recv(self.BUFFER_SIZE)) if (not type(data) is list) or (len(data) == 0): debug_print("Invalidly formatted request! " + str(data)) conn.send(encode(["INVALID FORMAT"])) return command = data[0] body = data[1:] if command != "MAKEIMAGE": debug_print("Received " + command + " from " + addr[0]) # Check to see what kind of command it is, then call the associated # handler if command == "WILLYOUEXECUTE": message = self.handle_WILLYOUEXECUTE(body, addr) elif command == "GIVEYOURSERVICES": message = self.handle_GIVEYOURSERVICES() elif command == "SYNCHRONIZE": message = self.handle_SYNCHRONIZE(body, addr) elif command == "WHEREIS": message = self.handle_WHEREIS(body, addr) elif command == "GIVESERVICE": message = self.handle_GIVESERVICE(body, addr) elif command == "YOUWILLEXECUTE": message = self.handle_YOUWILLEXECUTE(body, addr) elif command == "HINT": message = self.handle_HINT(body) elif command == "PING": message = self.handle_PING() elif command == "STATUS": message = self.handle_STATUS() elif command == "WILLYOUREPLICATE": message = self.handle_WILLYOUREPLICATE(body, addr) elif command == "KILL": debug_print("Exiting - no longer servicing requests") os.kill(self.discoverer_pid, signal.SIGKILL) os.kill(self.listener_pid, signal.SIGKILL) s.close() message = "PEACE OUT" elif command == "FIND": message = self.handle_FIND(body, addr) elif command == "MAKEIMAGE": message = self.handle_MAKEIMAGE() elif command == "MASSMESSAGE": message = self.handle_MASSMESSAGE(body) else: message = "NO SUCH COMMAND" # debug_print("Responding to " + addr[0] + " with " + str(message)) conn.send(encode(message)) conn.close() def handle_MAKEIMAGE(self): os.popen('cat /home/csee1/bwilson1/www/bdnetwork.dot.txt | /home/csee1/bwilson1/graphviz/bin/dot -Tps | gs -sDEVICE=png16 -sOutputFile=/home/csee1/bwilson1/www/bdnet.png') return ['OK'] def handle_STATUS(self): # Get servers self.lock_cache() cache = self.load_cache() self.unlock_cache() servers = {} for key in cache: servers[key] = cache[key][0] static_services = self.get_static_services() static_services_weights = {} dynamic_services = self.get_dynamic_services() dynamic_services_weights = {} for service in static_services: static_services_weights[service] = self.service_weight(service, give_hits = False) for service in dynamic_services: dynamic_services_weights[service] = self.service_weight(service, give_hits = False) return [servers, (static_services_weights, dynamic_services_weights) , self.RT, self.get_ws_policy()] def handle_PING(self): return ["PONG"] # This function handles the GIVESERVICE request, the request made by other # daemons to get the stuff needed to replicate stuff. It is common courtesy # for the other side to send a synchronization request when it does replicating. def handle_GIVESERVICE(self, body, addr): service = body[0] if not self.service_exists(service): return ["NO"] package = package_files(self.services_dir, [service, service+".bdcert"] ) return ["YES", package, True] def generate_edges(self): self.lock_cache() cache = self.load_cache() self.unlock_cache() cache['self'] = [ None, None, None, self.get_ws_policy() ] edges = {} sites = cache.keys() for i in range(len(sites)): for j in range(i + 1, len(sites)): site1 = sites[i] site2 = sites[j] if not edges.has_key(site1): edges[site1] = [] if not edges.has_key(site2): edges[site2] = [] if PolicyParser.hasIntersection(cache[site1][3], cache[site2][3]): edges[site1].append(site2) edges[site2].append(site1) return edges def generate_message_plan(self, sites): edges = self.generate_edges() sets = [['self']]+ [ [node] for node in sites ] fringe = [] labels = {} for node in edges: labels[node] = False # first step, mark self for node in sites: labels[node] = [node] fringe.append(node) labels['self'] = ['self'] shuffle(fringe) fringe = ['self'] + fringe edges_used = [] new_fringe = [] while len(sets) > 1: change_made = False if len(fringe) == 0: print "len(fringe) == 0, sanity check failure" break for node in fringe: if not edges.has_key(node): debug_print("Trying to send to a node that doesn't exist?? how?") shuffle(edges[node]) for other_node in edges[node]: # If this spot has not been marked yet, mark it if not labels[other_node]: labels[other_node] = labels[node] + [other_node] new_fringe.append(other_node) change_made = True # If this spot HAS been marked, unify else: side1 = labels[node][0] side2 = labels[other_node][0] # it found itself if side1 == side2: continue node_set = -1 other_node_set = -1 for i in range(len(sets)): if side1 in sets[i]: node_set = i if side2 in sets[i]: other_node_set = i if -1 in (node_set, other_node_set): print "node_set or other_node_set was not found", side1, side2 # They already have been unified if node_set == other_node_set: continue # Merge sets of connectiveness sets[node_set] += sets[other_node_set] del sets[other_node_set] copy = labels[node][:] copy.reverse() path = labels[other_node] + copy for i in range(len(path)-1): edges_used.append((path[i],path[i+1])) change_made = True shuffle(new_fringe) fringe = new_fringe if not change_made: return False shuffle(edges_used) # Now generate the plan tree = {} tree['self'] = [] count = 0 while len(edges_used) > count: idx_to_add = [] for i in range(len(edges_used)): if edges_used[i][0] in tree: idx_to_add.append(i) elif edges_used[i][1] in tree: edges_used[i] = (edges_used[i][1], edges_used[i][0]) idx_to_add.append(i) for i in idx_to_add: tree[edges_used[i][0]].append(edges_used[i][1]) tree[edges_used[i][1]] = [] edges_used[i] = (None, None) count += 1 return tree def random_tree_test(self): # print self.generate_message_plan(['130.85.94.186:9001', '130.85.94.187:9003', '130.85.94.187:9004', '130.85.94.188:9008', '130.85.94.187:9005', '130.85.94.188:9006', '130.85.94.188:9007']) # print self.generate_message_plan(['130.85.94.188:9007', '130.85.94.188:9008']) # print self.generate_message_plan(['130.85.94.188:9007']) pass # This function handles the WHEREIS request, the request made by clients # to get where it should send a web-service request # body[0] is the service # body[1] is the client's ws_policy # return [ip,port, ws_dir] def handle_WHEREIS(self, body, addr): if len(body) == 0: debug_print("Invalidly formatted request! " + str(data)) return [] self.lock_cache() cache = self.load_cache() self.unlock_cache() service, client_ws_policy = body i_have = self.service_exists(service) # Get a list of sites that have the service have_service = [ site for site in cache if service in cache[site][1] ] if len(have_service) == 0 and not i_have: return ["CANTFIND"] # Get a list of sites that DON'T have the service dont_have_service = [ site for site in cache if not service in cache[site][1] ] # Get a list of sites that have a compatable policy with the client policy_match = [ site for site in cache if PolicyParser.hasIntersection(cache[site][3], client_ws_policy) ] # Send a message to all nodes that have the service: have_service_and_policy_match = [] for site in have_service: if site in policy_match: have_service_and_policy_match.append(site) rand_val = int(random() * (len(have_service_and_policy_match) + 1)) if rand_val == 0 and i_have: response = self.handle_WILLYOUEXECUTE([service], addr) if response[0] == "YES": return ["ME"] + response[1:] plan = self.generate_message_plan(have_service_and_policy_match) recipients = have_service_and_policy_match response = self.handle_MASSMESSAGE([plan, recipients, ["WILLYOUEXECUTE", service]]) if response[0] == "YES": return response[1:] if response[0] == "NETFAULT": pass # TODO - HANDLE THIS!! if rand_val > 0 and i_have: response = self.handle_WILLYOUEXECUTE([service], addr) if response[0] == "YES": return ["ME"] + response[1:] # Send a message to all nodes that DO NOT have the service: dont_have_service_and_policy_match = [] for site in dont_have_service: if site in policy_match: dont_have_service_and_policy_match.append(site) rand_val = int(random() * (len(dont_have_service_and_policy_match) + 1)) if rand_val == 0 and not i_have: response = self.handle_WILLYOUREPLICATE([service], addr) if response[0] == "YES": return ["ME"] + response[1:] plan = self.generate_message_plan(dont_have_service_and_policy_match) recipients = dont_have_service_and_policy_match response = self.handle_MASSMESSAGE([plan, recipients, ["WILLYOUREPLICATE", service]]) if response[0] == "YES": return response[1:] if response[0] == "NETFAULT": pass # TODO - HANDLE THIS!! if rand_val > 0 and not i_have: response = self.handle_WILLYOUREPLICATE([service], addr) if response[0] == "YES": return ["ME"] + response[1:] # Send a message to all nodes that have the service TELLING THEM THEY WILL EXCUTE IT!!: have_service_and_policy_match = [] for site in have_service: if site in policy_match: have_service_and_policy_match.append(site) rand_val = int(random() * (len(have_service_and_policy_match) + 1)) if rand_val == 0 and i_have: response = self.handle_YOUWILLEXECUTE([service], addr) if response[0] == "YES": return ["ME"] + response[1:] plan = self.generate_message_plan(have_service_and_policy_match) recipients = have_service_and_policy_match response = self.handle_MASSMESSAGE([plan, recipients, ["YOUWILLEXECUTE", service]]) if response[0] == "YES": return response[1:] if response[0] == "NETFAULT": pass # TODO - HANDLE THIS!! if rand_val > 0 and i_have: response = self.handle_YOUWILLEXECUTE([service], addr) if response[0] == "YES": return ["ME"] + response[1:] return ["CANTFIND"] def handle_MASSMESSAGE(self, body): plan, recipients, messagebody = body have_NO = False plan_sends = plan.keys() shuffle(plan_sends) net_faults = [] # Check to see if anyone next in our steps will do what we want: for recipient in recipients: if recipient in plan['self']: host, port = recipient.split(':') try: response = message(host, int(port), messagebody) except socket.error: debug_print(recipient + " isn't there to respond to our message") net_faults.append(recipient) continue # except: # debug_print("SOMETHING BORKEN WITH MASS_MESSAGE " + str(body)) # net_faults.append(recipient) # continue if response[0] == "YES": if messagebody[0] == "WILLYOUREPLICATE": self.discover(recipient) return ["YES", host, response[1], response[2]] if response[0] == "NO": have_NO = True # Forward the message if we didn't get anything send_to_keys = plan['self'] shuffle(send_to_keys) for send_to in send_to_keys: # make a copy of the plain tailored to the receiver plan_cpy = {} valid = False for site in plan: if site == send_to: plan_cpy['self'] = plan[site] valid = True elif site == 'self': pass else: plan_cpy[site] = plan[site] if not valid: continue host, port = send_to.split(':') try: response = message(host, int(port), ["MASSMESSAGE", plan_cpy, recipients, messagebody]) except socket.error: debug_print(recipient + " isn't there to respond to our message") net_faults.append(send_to) continue # except: # debug_print("SOMETHING BORKEN WITH MASS_MESSAGE " + str(body)) # net_faults.append(send_to) # continue if response[0] == "YES": if messagebody[0] == "WILLYOUREPLICATE": self.discover(send_to) return response if response[0] == "NO": have_NO = True if response[0] == "NETFAULT": net_faults += response[1] # There were SOME netfaults -- this could be why it doesn't work if len(net_faults) > 0: # lets delete all these guys from the cache self.lock_cache() cache = self.load_cache() for site in net_faults: try: debug_print("Deleting from the cache") del cache[site] except KeyError: debug_print("We've already deleted " + site) return ["NETFAULT", net_faults] # Couldn't find anything unfortunately if have_NO: return ["NO"] return ["DONTHAVE"] def handle_YOUWILLEXECUTE(self, body, addr): if len(body) != 1: debug_print("Invalidly formatted response! ") return ["INVALID REQUEST"] self.service_hit(body[0], addr[0]) return ["YES", self.axis_port, self.axis_web_dir] def handle_FIND(self, body, addr): if len(body) != 3: debug_print("Invalidly formatted response! ") return ["INVALID REQUEST"] service_name = str(body[0]) tree = BDtree.decode(body[1]) steps_left = int(body[2]) if self.service_exists(service_name): package = package_files(self.services_dir, [service_name, service_name+".bdcert"] ) debug_print("I HAVE the service " + service_name) return ["YES", package] if steps_left == 0: return ["NO", BDtree.encode(tree)] self.lock_cache() cache = self.load_cache() self.unlock_cache() sites = cache.keys() to_send = [] for site in sites: if site not in self.talkable_sites(cache): continue if tree.has_node(site): continue my_host = get_ip() + ":" + str(self.port) tree.add_child(site, my_host) to_send.append(site) for site in to_send: host, port = site.split(':') try: response = message(host, int(port), ["FIND", service_name, BDtree.encode(tree), steps_left - 1]) except socket.error: debug_print(site + " is not responding, not sending DFS!") continue if response[0] == "YES": debug_print("Yes! I am returning the service " + service_name + " gotten from " + site) return ["YES", response[1]] if response[0] == "NO": debug_print("Nope, " + site + " doesn't have it...") tree = BDtree.decode(response[1]) debug_print("Nobody has " + service_name) return ["NO", BDtree.encode(tree)] def handle_WILLYOUREPLICATE(self, body, addr): if len(body) != 1: debug_print("Invalidly formatted response!") return ["INVALID REQUEST"] service = body[0] if self.service_exists(service): debug_print("Not replicating " + service + " because I already have it, but I will run it") return ["YES", self.axis_port, self.axis_web_dir] # Replicate here # Get a list of sites that have the service self.lock_cache() cache = self.load_cache() self.unlock_cache() have_service = [ site for site in cache if service in cache[site][1] ] if len(have_service) == 0: return ["CANTFIND"] plan = self.generate_message_plan(have_service) recipients = have_service response = self.handle_MASSMESSAGE([plan, recipients, ["GIVESERVICE", service]]) debug_print('sending GIVESERVICE to ' + str(recipients)) if response[0] == "NO" or response[0] == "DONTHAVE": debug_print("Couldn't find " + service + " anywhere (" + response[0] + ")" ) return ["CANTFIND"] debug_print("Replicating service " + service + " from " + response[1]) # Yay! we got the service if self.get_free_dynamic() == 0: # We need to get rid of our lightest dynamic service lightest = self.get_lightest_dynamic_service() os.unlink(self.services_dir + "/" + lightest + ".bdcert") unpackage_files(self.services_dir, response[2]) self.set_bdcert(service, replication_type = "DYNAMIC") self.service_hit(service, addr[0]) return ["YES", self.axis_port, self.axis_web_dir] # This function handles the WILLYOUEXECUTE request, which checks to see # if we are able to execute the specified service at this time # The body only contains the service name def handle_WILLYOUEXECUTE(self, body, addr): if len(body) != 1: debug_print("Invalidly formatted response! ") return ["INVALID REQUEST"] # If the connector is mistaken to think we have a service, let him know if not self.service_exists(body[0]): message = "DONTHAVE" # If we have the service and our load is low, say we will service elif not self.is_overloaded(): message = "YES" # Update the hit counter since we can assume we will be getting # a request soon self.service_hit(body[0], addr[0]) # If we have the service but our load is too high, say we will not else: message = "NO" return [message, self.axis_port, self.axis_web_dir] # This function handles the GIVEYOURSERVICES request, which simply # is asking for a dump of our services. This is used for when a client # has a mistaken cache. # The body is empty/ thus ignored (realize the format is still # GIVEYOUSERVICES:\n) def handle_GIVEYOURSERVICES(self): return [time.time(), self.get_services(), self.get_free_dynamic()] # This function handles the SYNCHRONIZE request, which is when another # site sends us their cache, we send them theirs and then we update # our caches so that they hold the most up to date information on # both sides. # The body is in format (no newlines, uses `): # SYNCHRONIZE:MyPort~MyTimeStamp~MyServices` # IP:Port~TimeStamp~service1 service2 service3 ...` # IP:Port~TimeStamp~serviceA serviceB ...` # ... def handle_SYNCHRONIZE(self, body, addr, lock=True): if lock: if not self.lock_cache(blocking = False, identity = "handle_SYNCHRONIZE"): for i in range(10): has_lock = False if self.lock_cache(blocking = False, identity = "handle_SYNCHRONIZE"): has_lock = True break if not has_lock: time.sleep(3*random()) for i in range(3): has_lock = False if self.lock_cache(blocking = False, identity = "handle_SYNCHRONIZE"): has_lock = True break if not has_lock: return ["NO"] if body[0] == "NO": return ["NO"] cache = self.load_cache() cache2 = self.load_cache() other_port = body[0] time_stamp = body[1] services = body[2] dynamic_left = body[3] others_ws_policy = body[4] others_cache = body[5] # Formulate response msg = [self.port, time.time(), self.get_services(), self.get_free_dynamic(), self.get_ws_policy()] # don't need to send the guy his own info try: del cache[addr[0]+":"+str(other_port)] except KeyError: pass try: del cache2[addr[0]+":"+str(other_port)] except KeyError: pass msg.append(cache2) cache[addr[0]+":"+str(other_port)] = (time_stamp, services, dynamic_left, others_ws_policy) # Merge the sender's data for host in others_cache: ip, port = host.split(':') if ip == "127.0.0.1": modified_host = addr[0] + ":" + port ip = addr[0] else: modified_host = host if modified_host not in cache: try: message(ip, port, ["PING"]) cache[modified_host] = others_cache[host] except socket.error: pass except WSPolicyMismatch: cache[modified_host] = others_cache[host] debug_print("WS-POLICY ~ No policy intersection with " + modified_host) pass except: debug_print(" ***** PROBLEM WITH PING!?!?! *****") elif others_cache[host][0] > cache[modified_host][0]: cache[modified_host] = others_cache[host] for host in cache.keys(): if addr[0] + ":" + str(other_port) == host: continue ip, port = host.split(':') if ip == "127.0.0.1": modified_host = addr[0] + ":" + port ip = addr[0] else: modified_host = host if host not in others_cache.keys(): try: message(ip, port, ["PING"]) except socket.error: del cache[modified_host] except WSPolicyMismatch: debug_print("WS-POLICY ~ No policy intersection with " + modified_host) pass except: debug_print(" ** Sir, there appears to be a problem with THE PING **") self.write_cache(cache) if lock: self.unlock_cache() return msg def handle_HINT(self, body): self.lock_cache() cache = self.load_cache() if body[0] not in cache: cache[body[0]] = (0,[], 0, False) self.write_cache(cache) self.unlock_cache() return ["OK"] # Lock the cache file def lock_cache(self, blocking=True, identity="someone"): return lock_file(self.cache_path, blocking, identity) # Unlock the cache file def unlock_cache(self): unlock_file(self.cache_path) # Loads the cache file and organizes it into a dictionary def load_cache(self): try: cache = decode(open(self.cache_path, 'r').read()) return cache except IOError: return {} # Write the cache file, given the dictionary that is in the format given in load_cache def write_cache(self, cache): bdcache_file = open(self.cache_path, 'w') bdcache_file.write(encode(cache)) bdcache_file.close() # Get the list of services BD can see def get_services(self): old_path = os.path.abspath('.') os.chdir(self.services_dir) # Look for bdcert files in the service subdirectories services = [cert[:-len('.bdcert')] for cert in glob.glob('*.bdcert')] os.chdir(old_path) return services # Returns true if a service exists on our site, false otherwise def service_exists(self, service_name): if service_name in self.get_services(): return True return False # Increments the hit log for the given service def service_hit(self, service_name, ip, hit_type = "HIT"): # Make sure the directory exists try: os.mkdir(self.service_log_dir) except OSError: pass path = os.path.join(self.service_log_dir, (service_name + ".bdlog")) lock_file(path) out_log = open(path, 'a') out_log.write(hit_type + " ~ " + str(int(time.time())) + \ " ~ " + str(time.ctime(time.time())) + " ~ " + ip + "\n") out_log.close() unlock_file(path) def is_overloaded(self): return self.total_weight() > self.RT def total_weight(self): return sum([ self.service_weight(service) for service in self.get_services() ]) # Returns the weight of a service def service_weight(self, service_name, give_hits=False): # Make sure the directory exists try: os.mkdir(self.service_log_dir) return 0 except OSError: pass path = os.path.join(self.service_log_dir, (service_name + ".bdlog")) lock_file(path) # There is no log, so I guess there have been no hits try: in_log = open(path, 'r').readlines() except IOError: unlock_file(path) return 0 unlock_file(path) try: cur_hit_value = self.get_bdcert(service_name)["weight"] except IOError: return 0 weight = 0 time_now = int(time.time()) rep_count = 0 hit_count = 0 in_log.reverse() for line in in_log: line = line.strip() if line.startswith("REPLICATE"): rep_count += 1 # We want to decrease the value after each # replicate so that we dont get a over-flooding # at one point in time cur_hit_value *= .9 elif line.startswith("HIT"): hit_count += 1 seconds = time_now - int(line.split('~')[1].strip()) if seconds < 10: weight += cur_hit_value elif seconds > 60: break else: # After 10 seconds, start to lose some value # per second weight += cur_hit_value * 10.0 / (seconds) if give_hits: return weight, rep_count, hit_count return weight def get_bdcert(self, service_name): return read_config(os.path.join(self.services_dir, service_name + '.bdcert')) def set_bdcert(self, service_name, weight = None, replication_type = None): cert = read_config(os.path.join(self.services_dir, service_name + '.bdcert')) if weight: cert['weight'] = weight if replication_type: cert['type'] = replication_type out_file = open(os.path.join(self.services_dir, service_name + '.bdcert'), 'w') for key in cert: out_file.write(key + "=" + str(cert[key]) + '\n') out_file.close() def get_free_dynamic(self): return self.max_dynamic - len(self.get_dynamic_services()) def get_dynamic_services(self): services = self.get_services() dynamic_services = [] for service in services: if self.get_bdcert(service)["type"] == "dynamic": dynamic_services.append(service) return dynamic_services def get_static_services(self): services = self.get_services() static_services = [] for service in services: if self.get_bdcert(service)["type"] == "static": static_services.append(service) return static_services def get_lightest_dynamic_service(self): min_service = None min_weight = 999999999 for service in self.get_dynamic_services(): weight = self.service_weight(service) debug_print(service + " has a weight of " + str(weight)) if weight < min_weight: min_service = service min_weight = weight return min_service def get_heaviest_service(self): max_service = None max_weight = -1 for service in self.get_services(): weight = self.service_weight(service) debug_print(service + " has a weight of " + str(weight)) if weight > max_weight: max_service = service max_weight = weight return max_service def get_ws_policy(self): return WS_POLICY def talkable_sites(self, cache): talkable = [] for site in cache: if cache[site][3] == True: continue if not cache[site][3] or PolicyParser.hasIntersection(cache[site][3], self.get_ws_policy()) != False: talkable.append(site) return talkable if __name__ == "__main__": TOTAL_BYTES_SENT = 0 config = read_config(sys.argv[1]) try: BLACK_LIST = open(sys.argv[3]).read().split('\n') print BLACK_LIST except: pass if len(sys.argv) >= 3: WS_POLICY = PolicyParser.ParsePolicy(sys.argv[2]) debug_print(str(WS_POLICY)) BDdaemon(port = config['port'], username = config['username'], \ services_dir=config['services_dir'], graveyard_dir = config['graveyard_dir'], \ receiver_threshold = config['receiver_threshold'], axis_web_dir = config['axis_web_dir'], \ admin_password = config['admin_password'], service_log_dir = config['service_log_dir'], \ backlog = config['backlog'], max_dynamic = config['max_dynamic'], axis_port = config['axis_port'])