Skip to content

Commit

Permalink
add naive station agregation. Baseline 2.39s, Naive with MMap 8.9s
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkezone committed Mar 9, 2024
1 parent e8e7bdf commit 7bfbda2
Showing 1 changed file with 238 additions and 4 deletions.
242 changes: 238 additions & 4 deletions 1BRC/fast/src/main.zig
Original file line number Diff line number Diff line change
@@ -1,5 +1,199 @@
const std = @import("std");

const StationAgregate = struct {
ally: std.mem.Allocator,
name: []const u8,
valuecount: u32, //unclear if this will overflow based on number of readings
temperaturetotal: f64, //unclear if this will overflow based on number of readings. For multithreaded case, 31M rows * max temp would need to be stored asuming split over 32 cores
temperaturemin: f64,
temperaturemax: f64,

pub fn init(ally: std.mem.Allocator, name: []const u8) !StationAgregate {
return StationAgregate{
.ally = ally,
.name = try ally.dupe(u8, name),
.valuecount = 0,
.temperaturetotal = 0,
.temperaturemin = 0,
.temperaturemax = 0,
};
}

pub fn deinit(self: *StationAgregate) void {
self.ally.free(self.name);
}

pub fn averageTemperature(self: StationAgregate) f64 {
if (self.valuecount == 0) {
return 0.0;
}
// if denominator is f32, rounding is broken
return self.temperaturetotal / @as(f64, @floatFromInt(self.valuecount));
//return try std.math.divCeil(f64, self.temperaturetotal, @as(f64, @floatFromInt(self.valuecount)));
}

pub fn recordTemperature(self: *StationAgregate, value: f64) void {
self.*.temperaturetotal += value;
if (self.*.temperaturemin == 0) {
self.*.temperaturemin = value;
}
if (self.*.temperaturemax == 0) {
self.*.temperaturemax = value;
}
if (value < self.*.temperaturemin) {
self.*.temperaturemin = value;
} else if (value > self.*.temperaturemax) {
self.*.temperaturemax = value;
}
self.*.valuecount += 1;
}
};

test "StationAgregate" {
const ally = std.testing.allocator;
var station = try StationAgregate.init(ally, "test");
try std.testing.expect(station.averageTemperature() == 0.0);
station.recordTemperature(10.0);
try std.testing.expect(station.averageTemperature() == 10.0);
station.recordTemperature(20.0);
try std.testing.expect(station.averageTemperature() == 15.0);
station.deinit();
}

const Stations = struct {
ally: std.mem.Allocator,
stations: std.StringArrayHashMap(StationAgregate),
resultcount: u64,

//TODO error return causes deinit not to be found
pub fn init(ally: std.mem.Allocator) Stations {
var tracker = std.StringArrayHashMap(StationAgregate).init(ally);
// no perf improvement:
tracker.ensureTotalCapacity(10000) catch {
unreachable;
};
const stat = Stations{ .ally = ally, .stations = tracker, .resultcount = 0 };
return stat;
}

pub fn deinit(self: *Stations) void {
var it = self.stations.iterator();
while (it.next()) |th| {
self.ally.free(th.value_ptr.*.name);
}
self.stations.deinit();
}

pub fn Store(self: *Stations, name: []const u8, temp: f32) !void {
const thing = try self.stations.getOrPut(name);
if (!thing.found_existing) {
thing.value_ptr.* = try StationAgregate.init(self.ally, name);

//doesn't change hash but replaces storage from name passed in to name stored / allocated
thing.key_ptr.* = thing.value_ptr.*.name;
}
thing.value_ptr.*.recordTemperature(temp);
self.resultcount += 1;
}

pub fn PrintSummary(self: *Stations) void {
std.debug.print("Rowscancount{} Storagecount {}\n", .{ self.resultcount, self.stations.count() });
}

fn compareStrings(_: void, lhs: []const u8, rhs: []const u8) bool {
return std.mem.order(u8, lhs, rhs).compare(std.math.CompareOperator.lt);
}

pub fn GetStation(self: *Stations, name: []const u8) ?StationAgregate {
const result = self.stations.get(name);
return result;
}

pub fn PrintSpecific(self: *Stations, name: []const u8) !void {
const out = std.io.getStdOut().writer();

const result = self.GetStation(name);
if (result) |ri| {
try out.print("{s}={d}/{d}/{d}\n", .{
ri.name,
ri.temperaturemin,
ri.averageTemperature(),
ri.temperaturemax,
});
}
}

pub fn PrintAll(self: *const Stations) !void {
const out = std.io.getStdOut().writer();
//var it = self.stations.iterator();
const stationlist = std.ArrayList([]const u8);
var list = stationlist.init(self.ally);
defer list.deinit();
try list.appendSlice(self.stations.keys());
const rawlist = try list.toOwnedSlice();
defer self.ally.free(rawlist);
std.sort.insertion([]const u8, rawlist, {}, compareStrings);
//doesn't work
//self.stations.sort(lessthan);
try out.print("{{", .{});
std.debug.print("sorted: ", .{});
for (rawlist) |li| {
const result = self.stations.get(li);
if (result) |ri| {
try out.print("{s}={d:.1}/{d:.1}/{d:.1}, ", .{
ri.name,
ri.temperaturemin,
ri.averageTemperature(),
ri.temperaturemax,
});
}
}
try out.print("}}\n", .{});
}
};

test "hashmap stations" {
var stats = Stations.init(std.testing.allocator);
defer stats.deinit();
try stats.Store("foo", 32);
try stats.Store("foo", 10);
try stats.Store("bar", 15);

try std.testing.expect(stats.resultcount == 3);

const foo = stats.GetStation("foo");
try std.testing.expect(foo.?.averageTemperature() == 21.0);
}

const passError = error{ DelimiterNotFound, LineIsComment };

pub fn parseLine(buff: []const u8) !struct { name: []const u8, value: f32 } {
if (buff[0] == '#') return passError.LineIsComment;
var splitindex: usize = 0;
splitindex = std.mem.indexOfScalar(u8, buff, ';') orelse {
return passError.DelimiterNotFound;
};
const stationName = buff[0..splitindex];
const tempStr = buff[splitindex + 1 ..];
const temp = try std.fmt.parseFloat(f32, tempStr);
return .{ .name = stationName, .value = temp };
}

test "parseLine" {
const line = "foobar;2.444";
const parsed = try parseLine(line);
try std.testing.expect(std.mem.eql(u8, parsed.name, "foobar"));
try std.testing.expect(parsed.value == 2.444);

const nodelim = "foobar2.444\n";
const e = parseLine(nodelim);
try std.testing.expectError(passError.DelimiterNotFound, e);

const withcomment = "# Adapted from https://simplemaps.com/data/world-cities";
const e2 = parseLine(withcomment);
try std.testing.expectError(passError.LineIsComment, e2);
}

const scanconfig = struct {
start: u64,
end: u64,
Expand All @@ -9,7 +203,43 @@ const scanconfig = struct {
waitgroup: *std.Thread.WaitGroup,
};

fn dobaslineoperation(mapper: []u8, state: *scanconfig) void {
fn dostationoperation(mapper: []u8, state: *scanconfig) !void {
//TODO fix this
//TODO agregate stats
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
var stats = Stations.init(gpa.allocator());
if (state.start != 0) {
//attempt fast forward.
var returnpos = std.mem.indexOfScalarPos(u8, mapper, state.start, '\n');
if (returnpos) |pos| {
// don't count the first line
state.start = pos + 1; // always increment start to ensure loop not entered if first seek takes out of bounds
} else {
unreachable;
}
}
while (state.start < state.end) {
var returnpos = std.mem.indexOfScalarPos(u8, mapper, state.start, '\n');
if (returnpos) |pos| {
if (parseLine(mapper[state.start..pos])) |vals| {
try stats.Store(vals.name, vals.value);
} else |err| {
if (err == passError.LineIsComment) {
continue;
} else {
return err;
}
}

state.linecount += 1;
state.start = pos + 1;
} else {
break;
}
}
}

fn dobaslineoperation(mapper: []u8, state: *scanconfig) !void {
if (state.start != 0) {
//attempt fast forward.
var returnpos = std.mem.indexOfScalarPos(u8, mapper, state.start, '\n');
Expand All @@ -32,15 +262,19 @@ fn dobaslineoperation(mapper: []u8, state: *scanconfig) void {
}

fn scanfilesegment(state: *scanconfig) void {
std.debug.print("Thread start: {} end: {}\n", .{ state.start, state.end });
//std.debug.print("Thread start: {} end: {}\n", .{ state.start, state.end });
//TODO std.os.MAP.SHARED not working in 0.12
//TODO can we map portions of the file to save memory?
//const mapper = std.os.mmap(null, state.*.end, std.os.PROT.READ, std.os.MAP.SHARED, state.*.file.handle, state.*.start) catch |err| {
//const mapper = stdobaslineoperationd.os.mmap(null, state.*.end, std.os.PROT.READ, std.os.MAP.SHARED, state.*.file.handle, state.*.start) catch |err| {
const mapper = std.os.mmap(null, state.*.length, std.os.PROT.READ, std.os.MAP.SHARED, state.*.file.handle, 0) catch |err| {
std.debug.print("Error: {}\n", .{err});
return;
};
dobaslineoperation(mapper, state);
//dostationoperation(mapper, state) catch |err| {
dobaslineoperation(mapper, state) catch |err| {
std.debug.print("Error: {}\n", .{err});
unreachable;
};
defer std.os.munmap(mapper);
std.debug.print("Thread Stop with linecount {}\n", .{state.linecount});
state.*.waitgroup.finish();
Expand Down

0 comments on commit 7bfbda2

Please sign in to comment.