diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index eb5acfd..76edde5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -15,7 +15,7 @@ jobs: strategy: matrix: haxe-version: [3.4.7] - python-version: [2.7] + python-version: [3.8] node-version: [12.x] steps: @@ -34,6 +34,12 @@ jobs: with: python-version: ${{ matrix.python-version }} + - name: Install numpy and matplotlib + run: | + python -m pip install --upgrade pip + pip install numpy + pip install matplotlib + - name: Set up Haxe ${{ matrix.haxe-version }} uses: krdlab/setup-haxe@v1 with: @@ -47,3 +53,7 @@ jobs: - name: make test run: src/make_without_docker.sh test + + - name: analysis test + run: python analysis/tests_runner.py + diff --git a/analysis/decoding_cosmicos.py b/analysis/decoding_cosmicos.py deleted file mode 100755 index 3d5784d..0000000 --- a/analysis/decoding_cosmicos.py +++ /dev/null @@ -1,577 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -""" -Created on Mon Jun 27 21:16:12 2016 - -@author: joha2 - -The complexity of this decoder is IMHO a measure -for the simplicity of the message. -Format was taken from an old form of message. - -""" - -import re -import sys -import math -import numpy as np -import matplotlib.pyplot as plt - -class DecoderClass(object): - - def __init__(self): - self.datadict = {} - self.datacounter = 0 - self.commanddict = {} - self.commandcounter = 0 - self.defdict = {} - self.defcounter = 0 - - - def generateRandomMessage(self, limit=10000): - print('---------') - print("Generating random message with limit %d characters" % (limit,)) - np.random.seed(1337) - preliminary = [str(i) for i in list(np.random.random_integers(0, 3, (limit,)))] - self.origmsgtext = ''.join(preliminary) - self.msgtext = self.origmsgtext - return (limit, self.msgtext) - - def generateBinomialRandomMessage(self, p=0.5, limit=10000): - print('---------') - print("Generating binomial distributed message with limit %d characters" % (limit,)) - np.random.seed(1337) - preliminary = [str(i) for i in list(np.random.binomial(3, p, (limit,)))] - self.origmsgtext = ''.join(preliminary) - self.msgtext = self.origmsgtext - return (limit, self.msgtext) - - - - def readStandardTextFromFile(self, filename, limit=10000): - print('---------') - print("Reading text from file %s with limit %d characters" % (filename, limit)) - - fl = open(filename,'r') - s = fl.read() - s.lower() - fl.close() - if limit > 0: - s = s[0:limit] - lenmsg = len(s) - print('---------') - return (lenmsg, s) - - - def readMessage(self, filename, limit=10000): - print('---------') - print("Reading message from file %s with limit %d characters" % (filename, limit)) - - fl = open(filename,'r') - self.origmsgtext = fl.read() - fl.close() - self.msgtext = re.sub(r'[\n]+', '', self.origmsgtext) # remove end line symbols - if limit > 0: - self.msgtext = self.msgtext[0:limit] - lenmsg = len(self.msgtext) - print('---------') - return lenmsg - - - def performStatistics(self, msgtext, lets, maxlen=2): - - print("msglen %d" % (len(msgtext),)) - - printentropies = False - - letters = r'['+lets+']' - numletters = len(lets) - worddict = {} - ngramlist = [] - for k in range(maxlen): - worddictngram = {} - kp = k + 1 - pattern = re.compile(letters+"{"+str(kp)+"}") - matchedpattern = re.findall(pattern, msgtext) - - if matchedpattern == []: - print('empty matching for %s at length %d' % (letters, kp)) - numpatterns = len(matchedpattern) - - for w in matchedpattern: - if worddict.get(w) == None: - worddict[w] = 1.0/numpatterns - else: - worddict[w] += 1.0/numpatterns - - if worddictngram.get(w) == None: - worddictngram[w] = 1.0/numpatterns - else: - worddictngram[w] += 1.0/numpatterns - - ngramlist.append(worddictngram) - - if printentropies: - digrams = [] - monograms = [] - for (gr, hgr) in worddict.items(): - if len(gr) == 2: - digrams.append((gr, hgr)) - if len(gr) == 1: - monograms.append((gr, hgr)) - - hsum = 0.0 - for (mon, hmon) in monograms: - print("h(\'%c\') = %f" % (mon, -hmon*math.log(hmon, numletters))) - hsum += -hmon*math.log(hmon, numletters) - print("hsum = %f" % (hsum,)) - - numdigrams = len(digrams) - hsumdi = 0.0 - for (di, hdi) in digrams: - print("h(\'%s\') = %f" % (di, -hdi*math.log(hdi, numdigrams))) - hsumdi += -hdi*math.log(hdi, numdigrams) - print("hsumdi = %f" % (hsumdi,)) - - entropyngramlist = [] - for (ind, wd) in enumerate(ngramlist): - - hsumn = 0.0 - numngrams = len(wd) - - for (ngram, hn) in wd.items(): - sn = 0.0 - if numngrams > 1 and hn != 0: - sn = -hn*math.log(hn, numngrams) - hsumn += sn - - if hsumn < 1e-6: - print(wd) - - print("%d %f" % (ind+1, hsumn)) - entropyngramlist.append([ind+1, hsumn]) - return(entropyngramlist) - - def preparePyPM(self, outputfile): - outputmsgtext = re.sub(r'2233', '\n', self.msgtext) - - outputmsgtext = re.sub(r'([0123]{1})', r'\1 ', outputmsgtext) - - fo = open(outputfile, 'w') - fo.write(outputmsgtext) - fo.close() - - - def performFrequencyRankOrderingAndFit(self, msgtext, delimsymbols, wordre, rankcutoff=100): - modifiedmsg = re.sub(delimsymbols, ' ', msgtext) - lenmodifiedmsg = len(modifiedmsg) - pwords = re.compile(wordre) # usually \w+ but we have digits instead of letters - wordlist = pwords.findall(modifiedmsg) - - worddict = {} - for w in wordlist: - if worddict.get(w) == None: - worddict[w] = 1 - else: - worddict[w] += 1 - print(sorted(worddict.items())) - ranklist = [pair for pair in sorted(worddict.items(), key=lambda (word,rank): rank, reverse=True)] - - if rankcutoff > 0: - ranklist = ranklist[0:rankcutoff] - - - - freqranking = np.array([(k+1, float(i)/float(lenmodifiedmsg)) - for (k, (w, i)) in enumerate(ranklist)]) - - log10freqranking = np.log10(freqranking) - - [decreasing, intersection] = np.lib.polynomial.polyfit(log10freqranking[:,0],log10freqranking[:,1],1) - - return (freqranking, decreasing, intersection) - - - def doesItObeyZipfsLaw(self, textlist, delimiterlist, wordrelist, colorlist_points, colorlist_fits, rankcutoff=100): - - print("Printing word frequency over ordered by frequency rank.") - print("This obviously relies on the correct choice of delimiter symbols.") - print("This should give a power law according to Zipf\'s law.") - - fig = plt.figure(1) - ax = fig.add_subplot(111) - - ax.axis('equal') - - ax.set_yscale('log') - ax.set_xscale('log') - - ax.set_xlabel('rank # according to frequency (-> decreasing frequency)') - ax.set_ylabel('word frequency') - - texts_to_analyse = [self.msgtext] + textlist - delimiters_to_use = [r'[23]+'] + delimiterlist - wordres_to_use = [r'[01]+'] + wordrelist - colorlist_points_to_use = ['r'] + colorlist_points - colorlist_fits_to_use = ['r'] + colorlist_fits - - for (text, delimiters, wordre, color_points, color_fits) in zip(texts_to_analyse, delimiters_to_use, wordres_to_use, colorlist_points_to_use, colorlist_fits_to_use): - - (freqranking, decreasing, intersection) = self.performFrequencyRankOrderingAndFit(text, delimiters, wordre, rankcutoff) - - # formulas for the log-log plot - # y = a*x^b - # log10 y = log10 a + b*log10 x - - xfit = np.linspace(freqranking[0, 0], freqranking[-1, 0], 100) - yfit = 10.0**intersection*np.power(xfit, decreasing) - - - ax.set_title('Zipf\'s Law y = a*x^b') - - print('a = %f, b = %f' % (10.0**intersection, decreasing)) - print(freqranking) - - ax.plot(freqranking[:, 0], freqranking[:, 1], color_points+'.', xfit, yfit, color_fits) - - - try: - plt.show() - except ValueError: - print('something wrong with values in log plot') - - - def showGraphicalRepresentation(self, width=128): - - msgtext = self.msgtext - lenmsg = len(msgtext) - - numlines = lenmsg/width - numoverhead = lenmsg % width - padding = width - numoverhead - - msgtext += "".join(['X' for i in range(padding)]) - - floatmsg = [] - for c in msgtext: - if c != 'X': - floatmsg.append(float(c)) - else: - floatmsg.append(np.NaN) - - nummsgtext = np.array(floatmsg) - - - Data = nummsgtext.reshape((numlines+1, width)) - - - nx, ny = width, numlines+1 - x = range(nx) - y = range(ny) - - X, Y = np.meshgrid(x, y) - - fig = plt.figure() - ax = fig.add_subplot(111) - - ax.set_xlabel('width') - ax.set_ylabel('lines') - - ax.set_title('Graphical Representation of Message') - - ax.imshow(Data, interpolation='None') - - def showGraphicalRepresentationLineTerminal(self, terminalsymbol='2233', maxlen=1000): - - msgtext = self.msgtext - - # split msg at terminalsymbol - # fill all lines up with X until length of longest line - # convert into image - - msgtext = re.split(terminalsymbol, msgtext) - - howmanylines = len(msgtext) - - sortedlines = sorted(msgtext, key=lambda line: len(line)) - - lengths = sorted([len(l) for l in msgtext]) - - print("last 20 lengths: %s" % str(lengths[-20:-1])) - - longestline = sortedlines[-1] - - if maxlen == -1: - width = len(longestline) - else: - width = maxlen - - howmanypixel = int(math.ceil(float(howmanylines)/float(width))) - # to correct the aspect ratio between length and width - - paddedfloatlines = [] - for (linnum, line) in enumerate(msgtext): - paddedline = line - if len(line) < width: - paddedline = line + (''.join(['X' for i in range(width - len(line))])) - elif len(line) > width: - print("truncated line %d" % (linnum,)) - paddedline = line[:width] - tmplist = [] - for c in paddedline: - if c != 'X': - tmplist = tmplist + [float(c) for i in range(howmanypixel)] - else: - tmplist = tmplist + [np.NaN for i in range(howmanypixel)] - paddedfloatlines.append(tmplist) - numlines = len(paddedfloatlines) - newwidth = len(paddedfloatlines[0]) - - - - - Data = np.array(paddedfloatlines) - - nx, ny = newwidth, numlines+1 - x = range(nx) - y = range(ny) - - X, Y = np.meshgrid(x, y) - - fig = plt.figure() - ax = fig.add_subplot(111) - - ax.set_xlabel('width') - ax.set_ylabel('lines') - - ax.set_title('Linewise Graphical Representation of Message') - - ax.imshow(Data, interpolation='None') - - - - def guessShortControlSymbols(self, maxlen=5): - print('---------') - print('Guessing control symbols by counting') - print('every occurence of strings of fixed') - print('length up to %d characters. Are these (nearly)' % (maxlen,)) - print('identical for two strings, there is a high chance') - print('that these are delimiters of blocks. They may not occur at a higher') - print('level together, to be delimiters. A single occurence') - print('will not be shown.') - for k in range(maxlen): - wordlength = k+1 - pk = re.compile(r'[0-3]{'+str(wordlength)+'}') - nk = pk.findall(self.msgtext) - - nkdict = {} - for wk in nk: - try: - nkdict[wk] += 1 - except KeyError: - nkdict[wk] = 1 - - sortednkdict = sorted(nkdict.items(), key=lambda pair: pair[1]) - occur = [] - for (w, i) in sortednkdict: - if i > 1: - occur.append((w,i)) - if occur != []: - print("%d-char strings: %s" % (wordlength, occur)) - print('--------') - - - def decodeLine(self, linetext, leftdelimiter='', rightdelimiter=''): - - scanner=re.Scanner([ - (r"2032[01]+3", lambda scanner, token: ("DEFINITION", token[4:-1])), - (r"2[01]+3*", lambda scanner, token: ("DATA", token[1:-1])), - (r"2[0123]+3*", lambda scanner, token: ("NESTED_COMMAND", token[1:-1])), - (r"023", lambda scanner, token: ("HASPROPERTY", token)) - ]) - - (results, remain) = scanner.scan(linetext) - - # the first data cell in the line is typically a command - - if linetext != '': - if results[0][0] == 'DATA': - results[0] = ('COMMAND', results[0][1]) - - # now add commands and definitions to dictionaries - - for w in results: - if w[0] == 'DEFINITION': - if self.defdict.get(w[1]) == None: - self.defdict[w[1]] = 'DEFINITION' + str(self.defcounter) - self.defcounter += 1 - if w[0] == 'COMMAND': - if self.defdict.get(w[1]) == None: - print("ERROR COMMAND NOT DEFINED (%s); INSERTING INTO DEFINITION DICT\n" % (w[1],)) - self.defdict[w[1]] = 'DEFINITION' + str(self.defcounter) - self.defcounter += 1 - if w[0] == 'DATA': - if self.datadict.get(w[1]) == None: - self.datadict[w[1]] = 'DATA' + str(self.datacounter) - self.datacounter += 1 - - - # is there something remaining which is not covered by our pattern matching? - if remain != '': - print('CANNOT INTERPRET %s \n' % (remain,)) - - return results - - def parseBlock(self, leftdelimiter='', rightdelimiter='', eol=''): - print('--------') - print("Using leftdelimiter '%s', rightdelimiter '%s', EOL '%s'" % (leftdelimiter, rightdelimiter, eol)) - - modifiedmsg = self.msgtext - - if eol!='': - modifiedmsg = re.sub(eol, '\n', modifiedmsg) - - modifiedmsg = modifiedmsg.split('\n') - - decoded = [self.decodeLine(line, leftdelimiter, rightdelimiter) for line in modifiedmsg] - - - - print('--------') - return decoded - - def decodeBlock(self, parsedBlocks): - print('decoding blocks ...') - lines = [] - for line in parsedBlocks: - linestring = '' - - for parsedPair in line: - if parsedPair[0] == 'DEFINITION': - linestring += 'DEFINITION ' + self.defdict[parsedPair[1]] + ' ' - if parsedPair[0] == 'COMMAND': - linestring += 'COMMAND ' + self.defdict[parsedPair[1]] + ' ' - if parsedPair[0] == 'DATA': - linestring += self.datadict[parsedPair[1]] + ' ' - if parsedPair[0] == 'HASPROPERTY': - linestring += 'HASPROPERTY ' - if parsedPair[0] == 'NESTED_COMMAND': - linestring += 'NESTED_COMMAND ' + parsedPair[1] - lines.append(linestring) - return lines - - - def plotNGramEntropy(self, entlengtharrays, colors, labels): - fig = plt.figure(1) - ax = fig.add_subplot(111) - - ax.set_xlabel('n-gram length') - ax.set_ylabel('Shannon-Boltzmann entropy') - - for (data, color, l) in zip(entlengtharrays, colors, labels): - - ax.plot(data[:, 0], data[:, 1], '-', color=color) - - ax.legend(labels, loc='lower right') - - plt.show() - - -def main(argv): - - # TODO: rewrite main function into different analysis - # steps per command line to clean up this section - - if len(argv) != 2: - print("Shows a few properties of the message:") - print("string frequencies -- to determine delimiters") - print("graphics -- fulfillment of Zipf's law") - print("preliminary decoding -- as far as possible") - print("dictionaries -- of commands, data, ...") - print("%s msgfile\n" % (argv[0],)) - return - - d = DecoderClass() - - (msglen, randomtext) = d.generateRandomMessage(limit=600000) - (mlenb1, binomialtext1) = d.generateBinomialRandomMessage(p=0.1, limit=600000) - #(mlenb2, binomialtext2) = d.generateBinomialRandomMessage(p=0.2, limit=600000) - #(mlenb3, binomialtext3) = d.generateBinomialRandomMessage(p=0.3, limit=600000) - #(mlenb4, binomialtext4) = d.generateBinomialRandomMessage(p=0.5, limit=600000) - - (txtlen, mobytext) = d.readStandardTextFromFile("../moby_dick.txt", limit=0) - (metilen, metitext) = d.readStandardTextFromFile("../meti.txt", limit=0) - d.readMessage(argv[1], limit=0) - - #d.doesItObeyZipfsLaw([randomtext], [r'[23]+'], [r'[01]+'], ['g'], ['g']) - # check various texts or messages for their ranked frequency content - - #d.showGraphicalRepresentation(width=512) - #d.showGraphicalRepresentationLineTerminal(maxlen=128) - #d.guessShortControlSymbols(maxlen=2) - #res = d.parseBlock(leftdelimiter='2', rightdelimiter='3', eol='2233') - #d.decodeBlock(res) - - mobytext = re.sub(r'\n', '', mobytext) # remove punctuation - - metitext = re.sub(r'[ \n]+', '', metitext) - - wmeti = metitext.split() - encodedmeti = '' - metidict = {} - count = 0 - for w in wmeti: - cdstr = '' - if metidict.get(w) == None: - metidict[w] = count - cdstr = hex(count)[2:] - count += 1 - else: - cdstr = hex(metidict[w])[2:] - lcdstr = len(cdstr) - if lcdstr < 4: - cdstr = (''.join(['0' for i in range(4-lcdstr)])) + cdstr - encodedmeti += cdstr - - Srnd = np.array(d.performStatistics(randomtext, '0123', maxlen=100)) - Sbinomial1 = np.array(d.performStatistics(binomialtext1, '0123', maxlen=100)) - #Sbinomial2 = np.array(d.performStatistics(binomialtext2, '0123', maxlen=100)) - #Sbinomial3 = np.array(d.performStatistics(binomialtext3, '0123', maxlen=100)) - #Sbinomial4 = np.array(d.performStatistics(binomialtext4, '0123', maxlen=100)) - Scos = np.array(d.performStatistics(d.msgtext, '0123', maxlen=100)) - Smoby = np.array(d.performStatistics(mobytext, '0123456789abcdefghijklmnopqrstuvwxyz', maxlen=100)) - Smeti = np.array(d.performStatistics(metitext, '01234567', maxlen=100)) - - d.plotNGramEntropy([Srnd, - Sbinomial1, - #Sbinomial2, - #Sbinomial3, - #Sbinomial4, - Scos, - Smoby, - Smeti], - ['r', - r'#000000', - #r'#220000', - #r'#440000', - #r'#880000', - 'g', - 'b', - 'm'], - ['Random text (uniformly distributed 0123)', - 'Random text (binomial distributed 0123 p=0.1)', - #'Random text (binomial distributed 0123 p=0.2)', - #'Random text (binomial distributed 0123 p=0.3)', - #'Random text (binomial distributed 0123 p=0.5)', - 'CosmicOS', - 'Moby Dick (lowercase + numbers)', 'METI (dearet.org, removed space and \\n)']) - - # used later for automated process analysis - #d.preparePyPM('lm.txt') - -if __name__ == '__main__': - main(sys.argv) - - - - diff --git a/analysis/statistical_graphical_analysis.py b/analysis/statistical_graphical_analysis.py new file mode 100755 index 0000000..c510fc1 --- /dev/null +++ b/analysis/statistical_graphical_analysis.py @@ -0,0 +1,569 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +""" +Created on Mon Jun 27 21:16:12 2016 + +@author: joha2 + +The complexity of this decoder is IMHO a measure +for the simplicity of the message. +Format was taken from an old form of message. + +20220415: + - removed decoding stuff (only statistical and graphical analysis needed) + - added command line switches + - cleaned up file +20160627: + - played around a bit with message + - added Zipf's law analysis + - added N-gram entropy analysis + +""" + +import re +import sys +import math +import logging +from operator import itemgetter +from collections import Counter +import argparse + +import numpy as np +import matplotlib.pyplot as plt + + +class DecoderClass: + + def __init__(self, logger): + self.logger = logger + + def convert_args_to_string(self, *args): + return " ".join([str(a) for a in args]) + + def info(self, *args): + self.logger.info(self.convert_args_to_string(*args)) + + def debug(self, *args): + self.logger.debug(self.convert_args_to_string(*args)) + + def error(self, *args): + self.logger.error(self.convert_args_to_string(*args)) + + + def generateRandomMessage(self, seed=1337, limit=10000): + self.info('---------') + self.info("Generating random message with limit %d characters" % (limit,)) + rng = np.random.default_rng(seed) + preliminary = [str(c) + for c in rng.integers(3+1, size=limit).tolist()] + result = ''.join(preliminary) + return (limit, result, {"type": "random", "seed": seed}) + + def generateBinomialRandomMessage(self, p=0.5, seed=1337, limit=10000): + self.info('---------') + self.info("Generating binomial distributed message with limit %d characters" % (limit,)) + rng = np.random.default_rng(seed) + preliminary = [str(c) for c in rng.binomial(3, p, size=limit).tolist()] + result = ''.join(preliminary) + return (limit, result, {"type": "binomial", "seed": seed, "p": p}) + + + + def readStandardTextFromFile(self, filename, limit=10000): + self.info('---------') + self.info("Reading text from file %s with limit %d characters" % (filename, limit)) + + with open(filename,'r') as fl: + s = fl.read() + s = " ".join(s.lower().split()) # substituted all \n by spaces + if limit > 0: + s = s[0:limit] + lenmsg = len(s) + self.info('---------') + return (lenmsg, s, {"type": "standardtext", "filename": filename}) + + + def readMessage(self, filename, limit=10000): + self.info('---------') + self.info("Reading message from file %s with limit %d characters" % (filename, limit)) + + with open(filename,'rt') as fl: + self.origmsgtext = fl.read() + + self.msgtext = "".join(self.origmsgtext.split()) # remove end line symbols + if limit > 0: + self.msgtext = self.msgtext[0:limit] + lenmsg = len(self.msgtext) + self.info('---------') + return (lenmsg, self.msgtext, {"type": "message", "filename": filename}) + + + def performStatistics(self, msgtext, lets, maxlen=2): + + self.info("msglen %d" % (len(msgtext),)) + + letters = r'['+lets+']' + + list_entropy_ngrams = [] + for n_gram_length in range(1, maxlen + 1): + pattern = re.compile(letters+"{"+str(n_gram_length)+"}") + matched_patterns = re.findall(pattern, msgtext) + + if matched_patterns == []: + self.debug('empty matching for %s at length %d' % (letters, n_gram_length)) + + ngram_counter = Counter(matched_patterns) # count ngrams in patterns + ngrams_found = len(ngram_counter) # how many ngrams found? + if ngrams_found > 0: + ngram_counts = np.array(list(ngram_counter.values())) + ngram_overall_count = np.sum(ngram_counts) + ps = ngram_counts/ngram_overall_count # relative counts + if np.abs(np.log(ngrams_found)) > 0: + Hs = -ps*np.log(ps)/np.log(ngrams_found) + # Notice: reference of ngrams_found as "size of the alphabet" + # leads to entropy limit of 1 for long ngrams because every + # found ngram appears only once. While choosing lets**n_gram_length + # as reference would lead to a decay of the entropy to zero, + # because only few of the large ngrams compared to the + # large pool are found in the message (which is comparable + # to the only one letter in the stream limit). + # Therefore we go by the first choice. + else: + Hs = np.zeros_like(ps) + Hngram = float(np.sum(Hs)) # calculate entropy + else: + Hngram = 0. + + list_entropy_ngrams.append([n_gram_length, Hngram]) + return list_entropy_ngrams + + + def performFrequencyRankOrderingAndFit(self, msgtext, delimsymbols, wordre, rankcutoff=100): + modifiedmsg = re.sub(delimsymbols, ' ', msgtext) + + pwords = re.compile(wordre) + # usually \w+ but we have digits instead of letters + + wordlist = pwords.findall(modifiedmsg) # find all words + len_wordlist = len(wordlist) + worddict = Counter(wordlist) # count them + self.debug(sorted(worddict.items())) + ranklist = [pair for pair in sorted(worddict.items(), + key=itemgetter(1), reverse=True)] + # sort by rank + + if rankcutoff > 0: + ranklist = ranklist[0:rankcutoff] + + freqranking = np.array([(rank+1, float(counter)/float(len_wordlist)) + for (rank, (_, counter)) in enumerate(ranklist)]) # add frequencies + + log10freqranking = np.log10(freqranking) # perform logarithm + + decreasing, intersection =\ + np.lib.polynomial.polyfit(log10freqranking[:,0], + log10freqranking[:,1],1) # fit loglog + + return (freqranking, decreasing, intersection) + + + def doesItObeyZipfsLaw(self, textlist, + delimiterlist, + wordrelist, + colorlist_points, + colorlist_fits, + labels, + rankcutoff=100): + + self.info("Printing word frequency over ordered by frequency rank.") + self.info("This obviously relies on the correct choice of delimiter symbols.") + self.info("This should give a power law according to Zipf\'s law.") + + fig = plt.figure() + ax = fig.add_subplot(111) + + ax.axis('equal') + + ax.set_yscale('log') + ax.set_xscale('log') + + ax.set_xlabel('rank # according to frequency (-> decreasing frequency)') + ax.set_ylabel('word frequency') + + # self.msgtext: [r'[23]+'], [r'[01]+'], r, r + texts_to_analyse = textlist + delimiters_to_use = delimiterlist + wordres_to_use = wordrelist + colorlist_points_to_use = colorlist_points + colorlist_fits_to_use = colorlist_fits + + for (text, delimiters, wordre, color_points, color_fits) \ + in zip(texts_to_analyse, + delimiters_to_use, + wordres_to_use, + colorlist_points_to_use, + colorlist_fits_to_use): + + (freqranking, decreasing, intersection) =\ + self.performFrequencyRankOrderingAndFit(text, + delimiters, + wordre, + rankcutoff) + + # formulas for the log-log plot + # y = a*x^b + # log10 y = log10 a + b*log10 x + + xfit = np.linspace(freqranking[0, 0], freqranking[-1, 0], 100) + yfit = 10.0**intersection*np.power(xfit, decreasing) + + + ax.set_title('Zipf\'s Law y = a*x^b') + + self.info('a = %f, b = %f' % (10.0**intersection, decreasing)) + self.debug(freqranking) + + ax.scatter(freqranking[:, 0], freqranking[:, 1], color=color_points) + ax.plot(xfit, yfit, color_fits) + + ax.legend(labels, loc='lower right') + + + try: + plt.show() + except ValueError: + self.error('something wrong with values in log plot') + + + def showGraphicalRepresentation(self, msgtext, width=128): + + lenmsg = len(msgtext) + + numlines = lenmsg//width + numoverhead = lenmsg % width + padding = width - numoverhead + + msgtext += "".join(['X' for i in range(padding)]) + + floatmsg = [] + for c in msgtext: + if c != 'X' and c != " ": + floatmsg.append(float(c)) + else: + floatmsg.append(np.NaN) + + nummsgtext = np.array(floatmsg) + + + Data = nummsgtext.reshape((numlines+1, width)) + + + nx, ny = width, numlines+1 + x = range(nx) + y = range(ny) + + X, Y = np.meshgrid(x, y) + + fig = plt.figure() + ax = fig.add_subplot(111) + + ax.set_xlabel('width') + ax.set_ylabel('lines') + + ax.set_title('Graphical Representation of Message') + + ax.imshow(Data, interpolation='None') + + def showGraphicalRepresentationLineTerminal(self, msgtext, terminalsymbol='2233', maxlen=1000): + + # split msg at terminalsymbol + # fill all lines up with X until length of longest line + # convert into image + + msgtext = re.split(terminalsymbol, msgtext) + + howmanylines = len(msgtext) + + sortedlines = sorted(msgtext, key=lambda line: len(line)) + + lengths = sorted([len(l) for l in msgtext]) + + print("last 20 lengths: %s" % str(lengths[-20:-1])) + + longestline = sortedlines[-1] + + if maxlen == -1: + width = len(longestline) + else: + width = maxlen + + howmanypixel = int(math.ceil(float(howmanylines)/float(width))) + # to correct the aspect ratio between length and width + + paddedfloatlines = [] + for (linnum, line) in enumerate(msgtext): + paddedline = line + if len(line) < width: + paddedline = line + (''.join(['X' for i in range(width - len(line))])) + elif len(line) > width: + self.debug("truncated line %d" % (linnum,)) + paddedline = line[:width] + tmplist = [] + for c in paddedline: + if c != 'X' and c != " ": + tmplist = tmplist + [float(c) for i in range(howmanypixel)] + else: + tmplist = tmplist + [np.NaN for i in range(howmanypixel)] + paddedfloatlines.append(tmplist) + numlines = len(paddedfloatlines) + newwidth = len(paddedfloatlines[0]) + + + + + Data = np.array(paddedfloatlines) + + nx, ny = newwidth, numlines+1 + x = range(nx) + y = range(ny) + + X, Y = np.meshgrid(x, y) + + fig = plt.figure() + ax = fig.add_subplot(111) + + ax.set_xlabel('width') + ax.set_ylabel('lines') + + ax.set_title('Linewise Graphical Representation of Message') + + ax.imshow(Data, interpolation='None') + + def guessShortControlSymbols(self, text, maxlen=5): + self.info('---------') + self.info('Guessing control symbols by counting') + self.info('every occurence of strings of fixed') + self.info('length up to %d characters. Are these (nearly)' % (maxlen,)) + self.info('identical for two strings, there is a high chance') + self.info('that these are delimiters of blocks. They may not occur at a higher') + self.info('level together, to be delimiters. A single occurence') + self.info('will not be shown.') + for wordlength in range(1, maxlen + 1): + pk = re.compile(r'[0-3]{'+str(wordlength)+'}') + nk = pk.findall(text) + + nkdict = Counter(nk) + + sortednkdict = sorted(nkdict.items(), key=lambda pair: pair[1]) + occur = [] + for (w, i) in sortednkdict: + if i > 1: + occur.append((w,i)) + if occur != []: + self.info("%d-char strings: %s" % (wordlength, occur)) + self.info('--------') + + def plotNGramEntropy(self, entlengtharrays, colors, labels): + fig = plt.figure() + ax = fig.add_subplot(111) + + ax.set_xlabel('n-gram length') + ax.set_ylabel('Shannon-Boltzmann entropy') + + for (data, color, l) in zip(entlengtharrays, colors, labels): + + ax.plot(data[:, 0], data[:, 1], '-', color=color) + + ax.legend(labels, loc='lower right') + + plt.show() + +def parse_generate_parameter(generate_parameter): + if generate_parameter is None: + return [] + else: + if isinstance(generate_parameter, list): + return [tuple(x.strip().split()) + for x in generate_parameter] + +def main(args_from_argsparse): + + print(args_from_argsparse) + + generate_lengthlimit = args_from_argsparse.genmsglength + file_lengthlimit = args_from_argsparse.filmsglength + generate_list = parse_generate_parameter(args_from_argsparse.generate) + message_file_no = args_from_argsparse.messagenumber + + MAX_NGRAM_LENGTH = 100 + analyse_chars_list = args_from_argsparse.messagechars.split() + verbose = args_from_argsparse.verbose + if verbose: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + + showgraphical = args_from_argsparse.graphical + + d = DecoderClass(logging.getLogger("analysis")) + + statistics_list = [] # list of texts to analyze + + # first grab texts from articially generated ones + for generate_tuple in generate_list: + if len(generate_tuple) > 0: + type_of_generation = generate_tuple[0].lower() + if type_of_generation == "random": + if len(generate_tuple) == 1: + seed = 1337 + else: + (_, seed, *rest) = generate_tuple + try: + seed = int(seed) + except ValueError: + print("ERROR: value " + seed + " is no valid float number!") + statistics_list.append( + d.generateRandomMessage(limit=generate_lengthlimit, + seed=seed)) + elif type_of_generation == "binomial": + if len(generate_tuple) > 1: + (_, p, *rest) = generate_tuple + if len(rest) >= 1: + (seed, *_) = rest + else: + seed = 1337 + try: + p = float(p) + seed = int(seed) + except ValueError: + print("ERROR: value " + str(p) + " is no valid float number " + + "or value " + str(seed) + " is no valid int number!") + else: + statistics_list.append( + d.generateBinomialRandomMessage( + p=p, limit=generate_lengthlimit, seed=seed)) + else: + print("ERROR: unknown type \"" + type_of_generation + "\"") + + # second grab texts from provided text files + for (parsed_filename_no, parsed_filename) in enumerate(args_from_argsparse.files): + try: + if parsed_filename_no == message_file_no: + statistics_list.append( + d.readMessage(parsed_filename, + limit=file_lengthlimit)) + else: + statistics_list.append( + d.readStandardTextFromFile(parsed_filename, + limit=file_lengthlimit)) + except FileNotFoundError: + print("ERROR: file not found: " + parsed_filename) + + if len(analyse_chars_list) == 1: + analyse_chars_list *= len(statistics_list) + + # generate colors and labels for unified labelling and coloring + plot_colors = [] + plot_labels = [] + + for ((length, _, props), analyse_chars) in zip(statistics_list, analyse_chars_list): + color = "#" + "".join( + [hex(x)[2:].zfill(2) + for x in np.random.randint(256, size=3).tolist()]) + label = " ".join([k + ": " + str(v) for (k, v) in props.items()]).strip() + label += " (" + analyse_chars + ")" + plot_colors.append(color) + plot_labels.append(label) + + if showgraphical: + for (length, text, props) in statistics_list: + d.showGraphicalRepresentation(text, + width=args_from_argsparse.linerep) + d.showGraphicalRepresentationLineTerminal(text, maxlen=128) + + if args_from_argsparse.guesscontrolsymbols: + for (_, text, _) in statistics_list: + d.guessShortControlSymbols(text, maxlen=2) + + if args_from_argsparse.ngram: + + plot_ngram_entropy_plots = [] + for ((length, text, typedict), analyse_chars) in zip(statistics_list, analyse_chars_list): + Splot = np.array( + d.performStatistics(text, + analyse_chars, + maxlen=MAX_NGRAM_LENGTH)) + plot_ngram_entropy_plots.append(Splot) + + d.plotNGramEntropy(plot_ngram_entropy_plots, + plot_colors, + plot_labels) + + + if args_from_argsparse.zipf: + d.doesItObeyZipfsLaw([text + for (length, text, props) in statistics_list], + [r'[23]+']*len(statistics_list), + [r'[01]+']*len(statistics_list), + plot_colors, + plot_colors, + plot_labels) + + + # check various texts or messages for their ranked frequency content + # analyse chars list can be submitted by arg to the programm + # but beware: while for 4-char text 0123 is sufficient, for meti + # you have to use 01234567 and for a normal text e.g. + # 0123456789abcdefghijklmnopqrstuvwxyz + + # TODO: use PyPM later for automated process analysis + +if __name__ == '__main__': + program_description =""" + Analyse different text files (including the message file from + CosmicOS) from the perspective of different measures of information. + This is to be thought as a naive investigation whether such a + peace of information contains a message and if possible to derive + the format of the message. + """ + parser = argparse.ArgumentParser(description=program_description) + parser.add_argument("files", metavar="file", type=str, nargs="+", + help="a file to be analysed") + parser.add_argument("--ngram", action="store_true", + help="show ngram entropy for files") + parser.add_argument("--zipf", action="store_true", + help="show whether files obey Zipf's law") + parser.add_argument("--linerep", type=int, default=512, + help="show line representation of files (length int)") + parser.add_argument("--generate", + action="append", + type=str, + help=""" +show generated distributions together with files: + + --generate "random" + --generate "binomial p" +""") + parser.add_argument("--genmsglength", type=int, default=10000, + help="Cutoff length of generated message") + parser.add_argument("--filmsglength", type=int, default=0, + help="Cutoff length of file message") + parser.add_argument("--messagenumber", type=int, default=0, + help="Which filename in the list is the message?") + parser.add_argument("--messagechars", type=str, default="0123", + help="Which chars can occur in the message?" + + " Either \"--messagechars 0123\" or " + + "space separated strings\"--messagechars 0123 0123"+ + " ... 01234567\" (no of texts). " + + "First generated then files.") + parser.add_argument("--verbose", action="store_true", + help="Increases the debug output.") + parser.add_argument("--graphical", action="store_true", + help="Show graphical representations.") + parser.add_argument("--guesscontrolsymbols", action="store_true", + help="Guess control symbols (aka data delimiters)") + + args = parser.parse_args() + + main(args) + diff --git a/analysis/tests_analysis.py b/analysis/tests_analysis.py new file mode 100755 index 0000000..87ad874 --- /dev/null +++ b/analysis/tests_analysis.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Sat Apr 23 15:58:14 2022 + +@author: joha2 + +Some tests of the statistical analysis file. +""" + +from statistical_graphical_analysis import DecoderClass + +import unittest +import random +import logging +import numpy as np + + +NUM_TESTS = 10 + +class TestMessagesContainEveryCharacter(unittest.TestCase): + """ + Checks whether the different random messages contain every symbol. + """ + + def setUp(self): + self.d = DecoderClass(logging.getLogger("random msg test")) + + def test_uniform_random_message(self): + + def create_random_message(seed, length): + (_, message, _) = self.d.generateRandomMessage(seed=seed, + limit=length) + return message + + for _ in range(NUM_TESTS): + seed = random.randint(0, 1000) + length = random.randint(1000, 20000) + msg = create_random_message(seed, length) + assert len(msg) == length and all([c in msg for c in "0123"]) + + def test_binomial_random_message(self): + + def create_binomial_message(p, seed, length): + (_, message, _) = self.d.generateBinomialRandomMessage(p=p, + seed=seed, + limit=length) + return message + + + for _ in range(NUM_TESTS): + p = 0.5 + seed = random.randint(0, 1000) + length = random.randint(1000, 20000) + msg = create_binomial_message(p, seed, length) + assert len(msg) == length and all([c in msg for c in "0123"]) + + + def tearDown(self): + pass + + +class TestMessagesEntropy(unittest.TestCase): + """ + Checks certain limiting cases for the entropy. + The same symbol over and over again in the stream should lead to zero + entropy. A random stream of symbols should lead to an entropy near one. + """ + + def setUp(self): + self.d = DecoderClass(logging.getLogger("entropy test")) + + def test_empty_text_word_lengths(self): + # empty message and zero word length gives empty list + assert len(self.d.performStatistics("", "0123", maxlen=0)) == 0 + # some message and zero word length gives empty list + assert len(self.d.performStatistics("0000", "0123", maxlen=0)) == 0 + # empty message and non-zero word length gives non-empty list + assert len(self.d.performStatistics("", "0123", maxlen=10)) == 10 + + def test_zero_entropy(self): + for _ in range(NUM_TESTS): + text_length = random.randint(0, 100) + max_length = random.randint(0, 20) + statistics = self.d.performStatistics("0"*text_length, + "0123", + maxlen=max_length) + # check numbers of word lengths + assert(tuple([number for (number, _) in statistics]) ==\ + tuple(range(1, max_length+1))) + # check entropy zero + assert all([abs(value) < 1e-15 for (_, value) in statistics]) + + def test_high_entropy(self): + for _ in range(NUM_TESTS): + text_length = random.randint(1000, 10000) + (_, text, _) = self.d.generateRandomMessage(limit=text_length) + max_length = random.randint(0, 20) + statistics = self.d.performStatistics(text, + "0123", + maxlen=max_length) + # check numbers of word lengths + assert(tuple([number for (number, _) in statistics]) ==\ + tuple(range(1, max_length+1))) + # check entropy > 0.95 + assert all([abs(value) > 0.95 for (_, value) in statistics]) + + + def tearDown(self): + pass + + +class TestMessageZipf(unittest.TestCase): + """ + Test whether the function calculating the ranks and frequencies + gives the correct results. + """ + + def setUp(self): + self.d = DecoderClass(logging.getLogger("zipf test")) + self.letters = "ABCD" + self.delimiter = "XX" + self.text_length_words = 20000 + self.max_word_length = 8 + self.max_number_words = 100 + self.words = [] + for _ in range(self.max_number_words): + self.words.append("".join( + [self.letters[random.randint(0, len(self.letters)-1)] + for _ in range(random.randint(3, self.max_word_length))])) + self.words = tuple(self.words) + + def test_short_frequency_distribution(self): + text = "AAAXXAAAXXAAAXXBBXXBBXXCXX" + (rank_frequency, _, _) = self.d.performFrequencyRankOrderingAndFit( + text, self.delimiter, "["+self.letters+"]+") + assert np.allclose(np.array(rank_frequency), + np.array([[1, 0.5], + [2, 0.33333333333], + [3, 0.16666666666]])) + + def test_long_frequency_distribution(self): + text = "" + wordcount = {} + for _ in range(self.text_length_words): + # choose word + word = self.words[random.randint(0, self.max_number_words-1)] + text += word + self.delimiter + wordcount[word] = wordcount.get(word, 0) + 1 + + sorted_word_counts = sorted(wordcount.items(), key=lambda x: x[1], reverse=True) + sorted_ranked_word_counts =\ + [[rank0+1, count/self.text_length_words] + for (rank0, (_, count)) in enumerate(sorted_word_counts)] + (rank_frequency, _, _) = self.d.performFrequencyRankOrderingAndFit( + text, self.delimiter, "["+self.letters+"]+") + assert np.allclose(rank_frequency, sorted_ranked_word_counts) + + def tearDown(self): + pass \ No newline at end of file diff --git a/analysis/tests_runner.py b/analysis/tests_runner.py new file mode 100755 index 0000000..83276e7 --- /dev/null +++ b/analysis/tests_runner.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Sat Apr 23 16:03:36 2022 + +@author: joha2 + +Boilerplate code for unittest +""" + +import unittest + +import tests_analysis + +loader = unittest.TestLoader() +suite = unittest.TestSuite() + +suite.addTests(loader.loadTestsFromModule(tests_analysis)) + +runner = unittest.TextTestRunner(verbosity=3) +result = runner.run(suite) \ No newline at end of file