Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/#100 improve performance qoh job #111

Merged
merged 6 commits into from
Oct 4, 2015
Prev Previous commit
Next Next commit
improved performance of batch insert by using groovy Sql withBatch fe…
…ature
  • Loading branch information
jmiranda committed Oct 4, 2015
commit 9bf0838faff2a25105469037bb2c7a214439abea
208 changes: 110 additions & 98 deletions grails-app/services/org/pih/warehouse/inventory/InventoryService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ package org.pih.warehouse.inventory

import grails.plugin.springcache.annotations.Cacheable
import grails.validation.ValidationException
import groovy.sql.Sql
import groovy.time.TimeCategory
import org.apache.commons.lang.StringEscapeUtils
import org.apache.commons.lang.StringUtils
import org.grails.plugins.csv.CSVWriter
import org.hibernate.annotations.Cache
import org.hibernate.criterion.CriteriaSpecification
import org.hibernate.id.UUIDHexGenerator
import org.joda.time.LocalDate
import org.pih.warehouse.auth.AuthService
import org.pih.warehouse.core.Constants
Expand All @@ -38,6 +40,7 @@ import org.springframework.context.ApplicationContextAware
import org.springframework.validation.Errors
import util.InventoryUtil

import java.sql.BatchUpdateException
import java.text.ParseException;
import java.util.Random

Expand All @@ -46,12 +49,8 @@ import java.text.SimpleDateFormat

class InventoryService implements ApplicationContextAware {

def dataSource
def sessionFactory
def propertyInstanceMap = org.codehaus.groovy.grails.plugins.DomainClassGrailsPlugin.PROPERTY_INSTANCE_MAP
def startTime = System.currentTimeMillis()
def lastBatchStarted = startTime


def dataService
def productService
def identifierService
Expand Down Expand Up @@ -3233,7 +3232,7 @@ class InventoryService implements ApplicationContextAware {
def transactionEntries = []
if (date) {
def products = Tag.get(tag?.id)?.products
log.info "Products: " + products
log.info "Get products by tag ${tag?.id}: " + products
transactionEntries = criteria.list {
if (products) {
inventoryItem {
Expand Down Expand Up @@ -3681,15 +3680,8 @@ class InventoryService implements ApplicationContextAware {
date.clearTime()
def locations = getDepotLocations()
locations.each { location ->

def count = InventorySnapshot.countByDateAndLocation(date, location)
if (count == 0) {
log.info "Creating or updating inventory snapshot for date ${date}, location ${location.name} ..."
createOrUpdateInventorySnapshot(date, location)
}
else {
log.warn "Skipping inventory snapshot for date ${date}, location ${location.name} as ${count} inventory snapshots already exist"
}
log.info "Creating or updating inventory snapshot for date ${date}, location ${location.name} ..."
createOrUpdateInventorySnapshot(date, location)
}

println "Created inventory snapshot for ${date} in " + (System.currentTimeMillis() - startTime) + " ms"
Expand All @@ -3698,30 +3690,37 @@ class InventoryService implements ApplicationContextAware {

def createOrUpdateInventorySnapshot(Date date, Location location) {
try {
def inventorySnapshots = InventorySnapshot.countByDateAndLocation(date, location)
log.info "Date ${date}, location ${location}: " + inventorySnapshots
//if (inventorySnapshots == 0) {
log.info "Create or update inventory snapshot for location ${location.name} on date ${date}"
//Location.withSession {
if (!location.isAttached()) {
location.attach()
}
// Only process locations with inventory
if (location?.inventory) {
//def productQuantityMap = getQuantityByProductMap(location.inventory)
def quantityMap = getQuantityOnHandAsOfDate(location, date)
def products = quantityMap.keySet();
log.info "Saving inventory snapshot for ${products?.size()} products"
products.eachWithIndex { product, index ->
def onHandQuantity = quantityMap[product]
updateInventorySnapshot(date, product, location, onHandQuantity)
if (index % 50 == 0) {
cleanUpGorm(index)
}
}
}
//}
//}
log.info "Create or update inventory snapshot for location ${location.name} on date ${date}"
// Only process locations with inventory
if (location?.inventory) {
String dateString = date.format("yyyy-MM-dd HH:mm:ss")
//def productQuantityMap = getQuantityByProductMap(location.inventory)
def quantityMap = getQuantityOnHandAsOfDate(location, date)
def products = quantityMap.keySet();

log.info "Saving inventory snapshots for ${products?.size()} products "
def startTime = System.currentTimeMillis()

def sql = new Sql(dataSource)
if (sql) {
try {
sql.withBatch(1000) { stmt ->
products.eachWithIndex { product, index ->
//log.info "Saving inventory snapshot for product[${index}]: " + product
def onHandQuantity = quantityMap[product]
def insertStmt = "insert into inventory_snapshot(version,date,location_id,product_id,inventory_item_id,quantity_on_hand) " +
"values (0,'${dateString}','${location?.id}','${product?.id}',NULL,${onHandQuantity}) " +
"ON DUPLICATE KEY UPDATE quantity_on_hand=${onHandQuantity}"
stmt.addBatch(insertStmt)
}
stmt.executeBatch()
}
} catch (BatchUpdateException e) {
log.error("Error executing batch update for location ${location.name} " + e.message, e)
}
}
log.info ("Time to execute batch statements " + (System.currentTimeMillis() - startTime) + " ms")
}
log.info "Saved inventory snapshot for products=ALL, location=${location}, date=${date}"
} catch (Exception e) {
log.error("Unable to complete inventory snapshot process", e)
Expand All @@ -3732,13 +3731,9 @@ class InventoryService implements ApplicationContextAware {
try {
def dates = getTransactionDates(location, product)
dates.each { date ->
//def inventorySnapshots = InventorySnapshot.countByDateAndLocation(date, location)
//println "Date ${date}, location ${location}: " + inventorySnapshots
//if (inventorySnapshots == 0) {
def quantity = getQuantity(product, location, date)
log.info "Create or update inventory snapshot for product ${product} at location ${location.name} on date ${date} = ${quantity} ${product.unitOfMeasure}"
updateInventorySnapshot(date, product, location, quantity)
//}
createOrUpdateInventorySnapshot(date, product, location, quantity)
}
log.info "Saved inventory snapshot for product=${product.productCode}, location=${location}, dates=ALL"
} catch (Exception e) {
Expand All @@ -3755,7 +3750,7 @@ class InventoryService implements ApplicationContextAware {
if (inventorySnapshots == 0) {
log.info "Create or update inventory snapshot for location ${location.name} on date ${date}"
def quantity = getQuantity(product, location, date)
updateInventorySnapshot(date, product, location, quantity)
createOrUpdateInventorySnapshot(date, product, location, quantity)
}
log.info "Saved inventory snapshot for product=${product.productCode}, location=${location}, date=${date}"
} catch (Exception e) {
Expand All @@ -3764,10 +3759,10 @@ class InventoryService implements ApplicationContextAware {
}


def updateInventorySnapshot(Date date, Product product, Location location, Integer onHandQuantity) {
//log.info "Updating inventory snapshot for product " + product.name + " @ " + location.name
def createOrUpdateInventorySnapshot(Date date, Product product, Location location, Integer onHandQuantity) {
log.info "Updating inventory snapshot for product " + product.name + " @ " + location.name
try {
def inventorySnapshot = InventorySnapshot.findWhere(date: date, location: location, product:product)
def inventorySnapshot = InventorySnapshot.findWhere(date: date, location: location, product:product)
if (!inventorySnapshot) {
inventorySnapshot = new InventorySnapshot(date: date, location: location, product: product)
}
Expand All @@ -3776,7 +3771,7 @@ class InventoryService implements ApplicationContextAware {
//inventorySnapshot.quantityInbound = pendingQuantity[0]?:0
//inventorySnapshot.quantityOutbound = pendingQuantity[1]?:0
//inventorySnapshot.lastUpdated = new Date()
inventorySnapshot.save()
inventorySnapshot.save(flush:true)
}
catch (Exception e) {
log.error("Error saving inventory snapshot for product " + product.name + " and location " + location.name, e)
Expand All @@ -3785,56 +3780,64 @@ class InventoryService implements ApplicationContextAware {
}




def createOrUpdateInventoryItemSnapshot(Date date) {
def startTime = System.currentTimeMillis()
date.clearTime()
def locations = getDepotLocations()
locations.each { location ->

def count = InventoryItemSnapshot.countByDateAndLocation(date, location)
if (count == 0) {
log.info "Creating or updating inventory item snapshot for date ${date} and location ${location.name} ..."
createOrUpdateInventoryItemSnapshot(date, location)
}
else {
log.warn "Skipping inventory item snapshot for date ${date} and location ${location.name} as ${count} inventory item snapshots already exist"
}
log.info "Creating or updating inventory item snapshot for date ${date} and location ${location.name} ..."
createOrUpdateInventoryItemSnapshot(date, location)
}
println "Created inventory snapshot for ${date} in " + (System.currentTimeMillis() - startTime) + " ms"
}



def createOrUpdateInventoryItemSnapshot(Date date, Location location) {

try {
if (!location.isAttached()) {
location.attach()
}
def inventoryItemSnapshots = InventoryItemSnapshot.countByDateAndLocation(date, location)
log.info "Date ${date}, location ${location}: " + inventoryItemSnapshots
log.info "Create or update inventory snapshot for location ${location.name} on date ${date}"
// Only process locations with inventory
if (location.inventory) {

def onHandQuantityMap = getQuantityForInventory(location.inventory)
def inventoryItems = onHandQuantityMap.keySet();

log.info "Saving inventory snapshot for ${inventoryItems?.size()} inventory items"
// Get quantity on hand for all products at a given location
def onHandQuantityMap = getQuantityForInventory(location.inventory)
def inventoryItems = onHandQuantityMap.keySet();
log.info "Saving inventory item snapshots for ${inventoryItems?.size()} inventory items"

def startTime = System.currentTimeMillis()
def sql = new Sql(dataSource)
if (sql) {
String dateString = date.format("yyyy-MM-dd HH:mm:ss")
sql.withBatch(1000) { stmt ->
inventoryItems.eachWithIndex { inventoryItem, index ->
//log.info "Saving inventory snapshot for product[${index}]: " + product
def onHandQuantity = onHandQuantityMap[inventoryItem]

//stmt.addBatch(date:date.format("yyyy-MM-dd hh:mm:ss"), locationId:location.id, productId:product.id, inventoryItemId:null, quantityOnHand:onHandQuantity)
def insertStmt = "insert into inventory_snapshot(id,version,date,location_id,product_id,inventory_item_id,quantity_on_hand,date_created,last_updated) " +
"values ('${UUID.randomUUID().toString()}', 0,'${dateString}','${location?.id}','${inventoryItem?.product?.id}',NULL,${onHandQuantity},now(),now()) " +
"ON DUPLICATE KEY UPDATE quantity_on_hand=${onHandQuantity},last_updated=now()"
stmt.addBatch(insertStmt)
}
stmt.executeBatch()
}
}
log.info ("Time to execute batch statements " + (System.currentTimeMillis() - startTime) + " ms")

inventoryItems.eachWithIndex { inventoryItem, index ->
def onHandQuantity = onHandQuantityMap[inventoryItem]
updateInventoryItemSnapshot(date, inventoryItem, location, onHandQuantity)
if (index % 50 == 0) {
cleanUpGorm(index)
}
}
}
log.info "Saved inventory snapshot for all products at location=${location.name} on date=${date}"
} catch (Exception e) {
log.error("Unable to complete inventory snapshot process", e)
}
}

def updateInventoryItemSnapshot(Date date, InventoryItem inventoryItem, Location location, Integer quantityOnHand) {
//log.info "Updating inventory snapshot for product " + product.name + " @ " + location.name
def createOrUpdateInventoryItemSnapshot(Date date, InventoryItem inventoryItem, Location location, Integer quantityOnHand) {
log.info "Updating inventory snapshot for product " + inventoryItem?.product?.name + " at location " + location.name
try {
def inventoryItemSnapshot = InventoryItemSnapshot.findWhere(date: date, location: location, product:inventoryItem?.product, inventoryItem: inventoryItem)
if (!inventoryItemSnapshot) {
Expand All @@ -3853,8 +3856,13 @@ class InventoryService implements ApplicationContextAware {
}
}



/**
* Calculate pending quantity for a given product and location.
*
* @param product
* @param location
* @return
*/
def calculatePendingQuantity(product, location) {
def inboundQuantity = 0;
def outboundQuantity = 0;
Expand Down Expand Up @@ -3891,25 +3899,12 @@ class InventoryService implements ApplicationContextAware {
[inboundQuantity, outboundQuantity]
}

def cleanUpGorm(index) {
def session = sessionFactory.currentSession
session.flush()
session.clear()
propertyInstanceMap.get().clear()
printStatus(index)
}

def printStatus(index) {
def batchEnded = System.currentTimeMillis()
def milliseconds = (batchEnded-lastBatchStarted)
//def total = (batchEnded-startTime)/1000
log.info "Flushed last batch ${index} ... took ${milliseconds} ms"
lastBatchStarted = batchEnded
}




/**
* Build quantity map for all products at a given location.
*
* @param location
* @return
*/
def getQuantityMap(Location location) {
def quantityMap = [:]
def products = Product.list()
Expand All @@ -3919,12 +3914,23 @@ class InventoryService implements ApplicationContextAware {
return quantityMap
}

/**
* Calculate the quantity on hand for a given inventory item and location.
*
* @param inventoryItem
* @param location
*/
def calculateQuantityOnHand(InventoryItem inventoryItem, Location location) {
throw new UnsupportedOperationException("Method has not been implemented yet")
}



/**
* Calculate the quantity on hand for a given product and location.
*
* @param product
* @param location
* @return
*/
def calculateQuantityOnHand(Product product, Location location) {
long startTime = System.currentTimeMillis()

Expand All @@ -3949,7 +3955,13 @@ class InventoryService implements ApplicationContextAware {
return quantityOnHand - quantityDebit + quantityCredit
}


/**
* Get most recent product inventory transaction.
*
* @param product
* @param location
* @return
*/
def getMostRecentQuantityOnHand(Product product, Location location) {
def results = Transaction.executeQuery( """
SELECT max(te.transaction.transactionDate), sum(te.quantity)
Expand Down