This repository has been archived by the owner on Oct 31, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
56 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
#!/usr/bin/python2.7 | ||
#!/usr/bin/env python2.7 | ||
#! -*- encoding: utf-8 -*- | ||
|
||
|
||
|
@@ -13,13 +13,10 @@ | |
from octopus.dispatcher.model.command import Command | ||
from octopus.dispatcher.model.rendernode import RenderNode | ||
from octopus.dispatcher.model.pool import Pool, PoolShare | ||
from octopus.dispatcher.strategies import createStrategyInstance | ||
from octopus.dispatcher import settings | ||
from octopus.dispatcher.db.pulidb import FolderNodes, TaskNodes, Dependencies, TaskGroups, Rules, Tasks, Commands, Pools, PoolShares, PuliDB, StatDB | ||
from octopus.core.tools import elapsedTimeToString | ||
from octopus.core import singletonconfig | ||
from octopus.dispatcher.db.pulidb import FolderNodes, TaskNodes, Dependencies, TaskGroups, Rules, Tasks, Commands, Pools, PoolShares | ||
|
||
BUFFER_SIZE = 10000 | ||
|
||
BUFFER_SIZE = 1000 | ||
|
||
def deleteElementFromMainDB(table, elementId): | ||
mainConn.query(mainConn.sqlrepr(Delete(table.q, where=(table.q.id==elementId)))) | ||
|
@@ -28,30 +25,32 @@ def insertElementIntoStatDB(table, values): | |
statConn.query(statConn.sqlrepr(Insert(table.q, values=values))) | ||
|
||
def archiveTaskNodesDependencies(taskNodeId): | ||
archiveDependencies(taskNodeId, Dependencies.q.taskNodes) | ||
Dependencies._connection = mainConn | ||
dependencies = Dependencies.select(Dependencies.q.taskNodes == taskNodeId) | ||
for dependency in dependencies: | ||
duplicateDependencyIntoStatDB(dependency) | ||
deleteElementFromMainDB(Dependencies, dependency.id) | ||
|
||
def archiveFolderNodesDependencies(folderNodeId): | ||
archiveDependencies(folderNodeId, Dependencies.q.folderNodes) | ||
|
||
def archiveTaskNodesRules(taskNodeId): | ||
archiveDependencies(taskNodeId, Rules.q.taskNodeId) | ||
|
||
def archiveFolderNodesRules(folderNodeId): | ||
archiveRules(folderNodeId, Rules.q.folderNodeId) | ||
|
||
def archiveDependencies(nodeId, nodeType): | ||
Dependencies._connection = mainConn | ||
dependencies = Dependencies.select(nodeType==nodeId) | ||
dependencies = Dependencies.select(Dependencies.q.folderNodes ==folderNodeId) | ||
for dependency in dependencies: | ||
duplicateDependencyIntoStatDB(dependency) | ||
deleteElementFromMainDB(Dependencies, dependency.id) | ||
|
||
def archiveRules(nodeId, nodeType): | ||
def archiveTaskNodesRules(taskNodeId): | ||
Rules._connection = mainConn | ||
rules = Rules.select(Rules.q.taskNodeId == taskNodeId ) | ||
for rule in rules: | ||
duplicateRuleIntoStatDB(rule) | ||
deleteElementFromMainDB(Rules, rule.id) | ||
|
||
def archiveFolderNodesRules(folderNodeId): | ||
Rules._connection = mainConn | ||
rules = Rules.select(nodeType==nodeId) | ||
rules = Rules.select(Rules.q.folderNodeId == folderNodeId ) | ||
for rule in rules: | ||
duplicateRueIntoStatDB(rule) | ||
deleteElementFromMainDB(Rules, rule.id) | ||
duplicateRuleIntoStatDB(rule) | ||
deleteElementFromMainDB(Rules, rule.id) | ||
|
||
def archivePoolShares(): | ||
PoolShares._connection = mainConn | ||
|
@@ -92,14 +91,17 @@ def archiveFolderNodes(): | |
print "Found " + str(totalItems) + " FolderNodes to archive" | ||
while totalItems > processedItems: | ||
for node in folderNodestoArchive.limit(BUFFER_SIZE): | ||
duplicateFolderNodesIntoStatDB(node) | ||
deleteElementFromMainDB(FolderNodes, node.id) | ||
archiveFolderNodesDependencies(node.id) | ||
archiveFolderNodesRules(node.id) | ||
manageFolderNode(node) | ||
processedItems+=1 | ||
print str(totalItems - processedItems) + " FolderNodes remaining" | ||
print "Finished to archive FolderNodes" | ||
|
||
def manageFolderNode(node): | ||
duplicateFolderNodesIntoStatDB(node) | ||
deleteElementFromMainDB(FolderNodes, node.id) | ||
archiveFolderNodesDependencies(node.id) | ||
archiveFolderNodesRules(node.id) | ||
|
||
def archiveTaskNodes(): | ||
TaskNodes._connection = mainConn | ||
print "Starting to archive TaskNodes" | ||
|
@@ -109,14 +111,18 @@ def archiveTaskNodes(): | |
print "Found " + str(totalItems) + " TaskNodes to archive" | ||
while totalItems > processedItems: | ||
for node in taskNodestoArchive.limit(BUFFER_SIZE): | ||
duplicateTaskNodesIntoStatDB(node) | ||
deleteElementFromMainDB(TaskNodes, node.id) | ||
archiveTaskNodesDependencies(node.id) | ||
archiveTaskNodesRules(node.id) | ||
manageTaskNode(node) | ||
processedItems+=1 | ||
print str(totalItems - processedItems) + " TaskNodes remaining" | ||
print "Finished to archive TaskNodes" | ||
|
||
|
||
def manageTaskNode(node): | ||
duplicateTaskNodesIntoStatDB(node) | ||
deleteElementFromMainDB(TaskNodes, node.id) | ||
archiveTaskNodesDependencies(node.id) | ||
archiveTaskNodesRules(node.id) | ||
|
||
def archiveCommands(): | ||
Commands._connection = mainConn | ||
print "Starting to archive Commands" | ||
|
@@ -162,7 +168,7 @@ def archiveTasks(): | |
print str(totalItems - processedItems) + " tasks remaining" | ||
print "Finished to archive tasks" | ||
|
||
def duplicateRueIntoStatDB(rule): | ||
def duplicateRuleIntoStatDB(rule): | ||
fields = {Rules.q.id.fieldName: rule.id, | ||
Rules.q.name.fieldName: rule.name, | ||
Rules.q.taskNodeId.fieldName: rule.taskNodeId, | ||
|
@@ -298,16 +304,26 @@ def duplicateTaskIntoStatDB(element): | |
Tasks.q.watcherPackages.fieldName: json.dumps(element.watcherPackages)} | ||
insertElementIntoStatDB(Tasks,fields) | ||
|
||
def groupForThread1(): | ||
archivePoolShares() | ||
archivePools() | ||
archiveTaskNodes() | ||
|
||
def groupForThread2(): | ||
archiveTasks() | ||
archiveFolderNodes() | ||
|
||
def groupForThread3(): | ||
archiveTaskGroups() | ||
archiveCommands() | ||
|
||
DB_URL = "mysql://[email protected]/pulidb" | ||
STAT_DB_URL = "mysql://[email protected]/pulistatdb" | ||
|
||
mainConn = connectionForURI(settings.DB_URL) | ||
statConn = StatDB.createConnection() | ||
mainConn = connectionForURI(DB_URL) | ||
statConn = connectionForURI(STAT_DB_URL) | ||
|
||
threading.Thread(target=archiveTasks).start() | ||
threading.Thread(target=archiveTaskGroups).start() | ||
threading.Thread(target=archiveCommands).start() | ||
threading.Thread(target=archiveTaskNodes).start() | ||
threading.Thread(target=archiveFolderNodes).start() | ||
threading.Thread(target=archivePools).start() | ||
threading.Thread(target=archivePoolShares).start() | ||
threading.Thread(target=groupForThread1).start() | ||
threading.Thread(target=groupForThread2).start() | ||
threading.Thread(target=groupForThread3).start() | ||
|