generated from databricks-industry-solutions/industry-solutions-blueprints
-
Notifications
You must be signed in to change notification settings - Fork 9
/
06_rule.py
164 lines (122 loc) · 5.44 KB
/
06_rule.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# Databricks notebook source
# MAGIC %md This notebook is available at https://github.com/databricks-industry-solutions/smart-claims
# COMMAND ----------
# MAGIC %md
# MAGIC # Rule Engine
# MAGIC * These are pre-defined static checks that can be applied without requiring a human in the loop, thereby speeding up routine cases
# MAGIC * When the reported data does not comply with auto detected info, flags are raised to involve additional human investigation
# MAGIC * Eg. Checks on policy coverage, assessed severity, accident location and speed limit violations
# MAGIC * <b>Input Table:</b> silver_claim_policy_accident
# MAGIC * <b>Rules Table:</b> claim_rules
# MAGIC * <b>Output Table:</b> gold_insights
# COMMAND ----------
# MAGIC %md
# MAGIC ## Dynamic Rules
# MAGIC * Ability to dynamically add/edit rules to meet bsiness requirements around claim processing
# MAGIC * Rules are persisted in claim_rules and applied on new data in a generic pattern as prescribed in the rule definition
# MAGIC * Rule definition inludes a
# MAGIC * Unique Rule name/id
# MAGIC * Definition of acceptable and not aceptable data - written as code that can be directly applied
# MAGIC * Severity (HIGH, MEDIUM, LOW)
# MAGIC * Is_Activ (True/False)
# MAGIC * Some common checks include
# MAGIC * Claim date should be within coverage period
# MAGIC * Reported Severity should match ML predicted severity
# MAGIC * Accident Location as reported by telematics data should match the location as reported in claim
# MAGIC * Speed limit as reported by telematics should be within speed limits of that region if there is a dispute on who was on the offense
# COMMAND ----------
# MAGIC %run ./setup/initialize
# COMMAND ----------
# MAGIC %sql
# MAGIC drop table if exists claims_rules;
# MAGIC CREATE TABLE IF NOT EXISTS claims_rules (
# MAGIC rule_id BIGINT GENERATED ALWAYS AS IDENTITY,
# MAGIC rule STRING,
# MAGIC check_name STRING,
# MAGIC check_code STRING,
# MAGIC check_severity STRING,
# MAGIC is_active Boolean
# MAGIC );
# COMMAND ----------
# MAGIC %md
# MAGIC # Configure Rules
# COMMAND ----------
# MAGIC %md
# MAGIC ## Invalid Policy Date
# COMMAND ----------
invalid_policy_date = '''
CASE WHEN to_date(pol_eff_date, "yyyy-MM-dd") < to_date(claim_date) and to_date(pol_expiry_date, "yyyy-MM-dd") < to_date(claim_date) THEN "VALID"
ELSE "NOT VALID"
END
'''
s_sql = "INSERT INTO claims_rules(rule,check_name, check_code, check_severity, is_active) values('invalid policy date', 'valid_date', '" + invalid_policy_date + " ', 'HIGH', TRUE)"
print(s_sql)
spark.sql(s_sql)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Exceeds Policy Amount
# COMMAND ----------
exceeds_policy_amount = '''
CASE WHEN sum_insured >= claim_amount_total
THEN "calim value in the range of premium"
ELSE "claim value more than premium"
END
'''
s_sql = "INSERT INTO claims_rules(rule,check_name, check_code, check_severity,is_active) values('exceeds policy amount', 'valid_amount','" + exceeds_policy_amount + " ', 'HIGH', TRUE)"
print(s_sql)
spark.sql(s_sql)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Severity Mismatch
# COMMAND ----------
severity_mismatch = '''
CASE WHEN incident_severity="Total Loss" AND severity > 0.9 THEN "Severity matches the report"
WHEN incident_severity="Major Damage" AND severity > 0.8 THEN "Severity matches the report"
WHEN incident_severity="Minor Damage" AND severity > 0.7 THEN "Severity matches the report"
WHEN incident_severity="Trivial Damage" AND severity > 0.4 THEN "Severity matches the report"
ELSE "Severity does not match"
END
'''
s_sql = "INSERT INTO claims_rules(rule,check_name, check_code, check_severity, is_active) values('severity mismatch', 'reported_severity_check', '" + severity_mismatch + " ', 'HIGH', TRUE)"
print(s_sql)
spark.sql(s_sql)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Exceeds Speed
# COMMAND ----------
exceeds_speed = '''
CASE WHEN telematics_speed <= 45 and telematics_speed > 0 THEN "Normal Speed"
WHEN telematics_speed > 45 THEN "High Speed"
ELSE "Invalid speed"
END
'''
s_sql = "INSERT INTO claims_rules(rule,check_name, check_code, check_severity,is_active) values('exceeds speed', 'speed_check', '" + exceeds_speed + " ', 'HIGH', TRUE)"
print(s_sql)
spark.sql(s_sql)
# COMMAND ----------
release_funds = '''
CASE WHEN reported_severity_check="Severity matches the report" and valid_amount="calim value in the range of premium" and valid_date="VALID" then "release funds"
ELSE "claim needs more investigation"
END
'''
s_sql = "INSERT INTO claims_rules(rule,check_name, check_code, check_severity,is_active) values('release funds', 'release_funds', '" + release_funds + " ', 'HIGH', TRUE)"
print(s_sql)
spark.sql(s_sql)
# COMMAND ----------
# MAGIC %md
# MAGIC # Dynamic Application of Rules
# COMMAND ----------
from pyspark.sql.functions import *
df = spark.sql("SELECT * FROM silver_claim_policy_accident")
rules = spark.sql('SELECT * FROM claims_rules where is_active=True order by rule_id').collect()
for rule in rules:
print(rule.rule, rule.check_code)
df=df.withColumn(rule.check_name, expr(rule.check_code))
display(df)
# COMMAND ----------
#overwrite table with new insights
df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("gold_insights")
# COMMAND ----------
#profile insights generated
df = spark.sql("SELECT valid_date, valid_amount,reported_severity_check, release_funds FROM gold_insights")
display(df)