Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callback in perf_reader to handle lost sample events #1092

Merged
merged 1 commit into from
Apr 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/reference_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -897,9 +897,9 @@ These are equivalent.

### 2. open_perf_buffer()

Syntax: ```table.open_perf_buffers(callback, page_cnt=N)```
Syntax: ```table.open_perf_buffers(callback, page_cnt=N, lost_cb=None)```

This operates on a table as defined in BPF as BPF_PERF_OUTPUT(), and associates the callback Python function ```callback``` to be called when data is available in the perf ring buffer. This is part of the recommended mechanism for transferring per-event data from kernel to user space. The size of the perf ring buffer can be specified via the ```page_cnt``` parameter, which must be a power of two number of pages and defaults to 8.
This operates on a table as defined in BPF as BPF_PERF_OUTPUT(), and associates the callback Python function ```callback``` to be called when data is available in the perf ring buffer. This is part of the recommended mechanism for transferring per-event data from kernel to user space. The size of the perf ring buffer can be specified via the ```page_cnt``` parameter, which must be a power of two number of pages and defaults to 8. If the callback is not processing data fast enough, some submitted data may be lost. ```lost_cb``` will be called to log / monitor the lost count. If ```lost_cb``` is the default ```None``` value, it will just print a line of message to ```stderr```.

Example:

Expand Down
6 changes: 4 additions & 2 deletions src/cc/BPF.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,9 @@ StatusTuple BPF::detach_perf_event(uint32_t ev_type, uint32_t ev_config) {
}

StatusTuple BPF::open_perf_buffer(const std::string& name,
perf_reader_raw_cb cb, void* cb_cookie,
perf_reader_raw_cb cb,
perf_reader_lost_cb lost_cb,
void* cb_cookie,
int page_cnt) {
if (perf_buffers_.find(name) == perf_buffers_.end()) {
TableStorage::iterator it;
Expand All @@ -406,7 +408,7 @@ StatusTuple BPF::open_perf_buffer(const std::string& name,
if ((page_cnt & (page_cnt - 1)) != 0)
return StatusTuple(-1, "open_perf_buffer page_cnt must be a power of two");
auto table = perf_buffers_[name];
TRY2(table->open_all_cpu(cb, cb_cookie, page_cnt));
TRY2(table->open_all_cpu(cb, lost_cb, cb_cookie, page_cnt));
return StatusTuple(0);
}

Expand Down
4 changes: 3 additions & 1 deletion src/cc/BPF.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ class BPF {
return BPFStackTable({});
}

StatusTuple open_perf_buffer(const std::string& name, perf_reader_raw_cb cb,
StatusTuple open_perf_buffer(const std::string& name,
perf_reader_raw_cb cb,
perf_reader_lost_cb lost_cb = nullptr,
void* cb_cookie = nullptr,
int page_cnt = DEFAULT_PERF_BUFFER_PAGE_CNT);
StatusTuple close_perf_buffer(const std::string& name);
Expand Down
14 changes: 8 additions & 6 deletions src/cc/BPFTable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ std::vector<std::string> BPFStackTable::get_stack_symbol(int stack_id,
return res;
}

StatusTuple BPFPerfBuffer::open_on_cpu(perf_reader_raw_cb cb, int cpu,
void* cb_cookie, int page_cnt) {
StatusTuple BPFPerfBuffer::open_on_cpu(perf_reader_raw_cb cb,
perf_reader_lost_cb lost_cb,
int cpu, void* cb_cookie, int page_cnt) {
if (cpu_readers_.find(cpu) != cpu_readers_.end())
return StatusTuple(-1, "Perf buffer already open on CPU %d", cpu);

auto reader = static_cast<perf_reader*>(
bpf_open_perf_buffer(cb, cb_cookie, -1, cpu, page_cnt));
bpf_open_perf_buffer(cb, lost_cb, cb_cookie, -1, cpu, page_cnt));
if (reader == nullptr)
return StatusTuple(-1, "Unable to construct perf reader");

Expand All @@ -98,8 +99,9 @@ StatusTuple BPFPerfBuffer::open_on_cpu(perf_reader_raw_cb cb, int cpu,
return StatusTuple(0);
}

StatusTuple BPFPerfBuffer::open_all_cpu(perf_reader_raw_cb cb, void* cb_cookie,
int page_cnt) {
StatusTuple BPFPerfBuffer::open_all_cpu(perf_reader_raw_cb cb,
perf_reader_lost_cb lost_cb,
void* cb_cookie, int page_cnt) {
if (cpu_readers_.size() != 0 || epfd_ != -1)
return StatusTuple(-1, "Previously opened perf buffer not cleaned");

Expand All @@ -108,7 +110,7 @@ StatusTuple BPFPerfBuffer::open_all_cpu(perf_reader_raw_cb cb, void* cb_cookie,
epfd_ = epoll_create1(EPOLL_CLOEXEC);

for (int i : cpus) {
auto res = open_on_cpu(cb, i, cb_cookie, page_cnt);
auto res = open_on_cpu(cb, lost_cb, i, cb_cookie, page_cnt);
if (res.code() != 0) {
TRY2(close_all_cpu());
return res;
Expand Down
8 changes: 4 additions & 4 deletions src/cc/BPFTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ class BPFPerfBuffer : protected BPFTableBase<int, int> {
: BPFTableBase<int, int>(desc), epfd_(-1) {}
~BPFPerfBuffer();

StatusTuple open_all_cpu(perf_reader_raw_cb cb, void* cb_cookie,
int page_cnt);
StatusTuple open_all_cpu(perf_reader_raw_cb cb, perf_reader_lost_cb lost_cb,
void* cb_cookie, int page_cnt);
StatusTuple close_all_cpu();
void poll(int timeout);

private:
StatusTuple open_on_cpu(perf_reader_raw_cb cb, int cpu, void* cb_cookie,
int page_cnt);
StatusTuple open_on_cpu(perf_reader_raw_cb cb, perf_reader_lost_cb lost_cb,
int cpu, void* cb_cookie, int page_cnt);
StatusTuple close_on_cpu(int cpu);

std::map<int, perf_reader*> cpu_readers_;
Expand Down
13 changes: 7 additions & 6 deletions src/cc/libbpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ void * bpf_attach_kprobe(int progfd, enum bpf_probe_attach_type attach_type, con
int n;

snprintf(new_name, sizeof(new_name), "%s_bcc_%d", ev_name, getpid());
reader = perf_reader_new(cb, NULL, cb_cookie, probe_perf_reader_page_cnt);
reader = perf_reader_new(cb, NULL, NULL, cb_cookie, probe_perf_reader_page_cnt);
if (!reader)
goto error;

Expand Down Expand Up @@ -417,7 +417,7 @@ void * bpf_attach_uprobe(int progfd, enum bpf_probe_attach_type attach_type, con
int n;

snprintf(new_name, sizeof(new_name), "%s_bcc_%d", ev_name, getpid());
reader = perf_reader_new(cb, NULL, cb_cookie, probe_perf_reader_page_cnt);
reader = perf_reader_new(cb, NULL, NULL, cb_cookie, probe_perf_reader_page_cnt);
if (!reader)
goto error;

Expand Down Expand Up @@ -503,7 +503,7 @@ void * bpf_attach_tracepoint(int progfd, const char *tp_category,
char buf[256];
struct perf_reader *reader = NULL;

reader = perf_reader_new(cb, NULL, cb_cookie, probe_perf_reader_page_cnt);
reader = perf_reader_new(cb, NULL, NULL, cb_cookie, probe_perf_reader_page_cnt);
if (!reader)
goto error;

Expand All @@ -525,13 +525,14 @@ int bpf_detach_tracepoint(const char *tp_category, const char *tp_name) {
return 0;
}

void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie, int pid,
int cpu, int page_cnt) {
void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb,
perf_reader_lost_cb lost_cb, void *cb_cookie,
int pid, int cpu, int page_cnt) {
int pfd;
struct perf_event_attr attr = {};
struct perf_reader *reader = NULL;

reader = perf_reader_new(NULL, raw_cb, cb_cookie, page_cnt);
reader = perf_reader_new(NULL, raw_cb, lost_cb, cb_cookie, page_cnt);
if (!reader)
goto error;

Expand Down
6 changes: 4 additions & 2 deletions src/cc/libbpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ int bpf_open_raw_sock(const char *name);
typedef void (*perf_reader_cb)(void *cb_cookie, int pid, uint64_t callchain_num,
void *callchain);
typedef void (*perf_reader_raw_cb)(void *cb_cookie, void *raw, int raw_size);
typedef void (*perf_reader_lost_cb)(uint64_t lost);

void * bpf_attach_kprobe(int progfd, enum bpf_probe_attach_type attach_type,
const char *ev_name, const char *fn_name,
Expand All @@ -68,8 +69,9 @@ void * bpf_attach_tracepoint(int progfd, const char *tp_category,
int group_fd, perf_reader_cb cb, void *cb_cookie);
int bpf_detach_tracepoint(const char *tp_category, const char *tp_name);

void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie, int pid,
int cpu, int page_cnt);
void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb,
perf_reader_lost_cb lost_cb, void *cb_cookie,
int pid, int cpu, int page_cnt);

/* attached a prog expressed by progfd to the device specified in dev_name */
int bpf_attach_xdp(const char *dev_name, int progfd);
Expand Down
13 changes: 11 additions & 2 deletions src/cc/perf_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
struct perf_reader {
perf_reader_cb cb;
perf_reader_raw_cb raw_cb;
perf_reader_lost_cb lost_cb;
void *cb_cookie; // to be returned in the cb
void *buf; // for keeping segmented data
size_t buf_size;
Expand All @@ -41,12 +42,15 @@ struct perf_reader {
};

struct perf_reader * perf_reader_new(perf_reader_cb cb,
perf_reader_raw_cb raw_cb, void *cb_cookie, int page_cnt) {
perf_reader_raw_cb raw_cb,
perf_reader_lost_cb lost_cb,
void *cb_cookie, int page_cnt) {
struct perf_reader *reader = calloc(1, sizeof(struct perf_reader));
if (!reader)
return NULL;
reader->cb = cb;
reader->raw_cb = raw_cb;
reader->lost_cb = lost_cb;
reader->cb_cookie = cb_cookie;
reader->fd = -1;
reader->page_size = getpagesize();
Expand Down Expand Up @@ -235,7 +239,12 @@ void perf_reader_event_read(struct perf_reader *reader) {
}

if (e->type == PERF_RECORD_LOST) {
fprintf(stderr, "Lost %lu samples\n", *(uint64_t *)(ptr + sizeof(*e)));
uint64_t lost = *(uint64_t *)(ptr + sizeof(*e));
if (reader->lost_cb) {
reader->lost_cb(lost);
} else {
fprintf(stderr, "Possibly lost %llu samples\n", lost);
}
} else if (e->type == PERF_RECORD_SAMPLE) {
if (reader->type == PERF_TYPE_TRACEPOINT)
parse_tracepoint(reader, ptr, e->size);
Expand Down
4 changes: 3 additions & 1 deletion src/cc/perf_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ extern "C" {
struct perf_reader;

struct perf_reader * perf_reader_new(perf_reader_cb cb,
perf_reader_raw_cb raw_cb, void *cb_cookie, int page_cnt);
perf_reader_raw_cb raw_cb,
perf_reader_lost_cb lost_cb,
void *cb_cookie, int page_cnt);
void perf_reader_free(void *ptr);
int perf_reader_mmap(struct perf_reader *reader, unsigned type, unsigned long sample_type);
void perf_reader_event_read(struct perf_reader *reader);
Expand Down
4 changes: 2 additions & 2 deletions src/lua/bcc/libbcc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ int bpf_open_raw_sock(const char *name);

typedef void (*perf_reader_cb)(void *cb_cookie, int pid, uint64_t callchain_num, void *callchain);
typedef void (*perf_reader_raw_cb)(void *cb_cookie, void *raw, int raw_size);
typedef void (*perf_reader_lost_cb)(uint64_t lost);

void * bpf_attach_kprobe(int progfd, int attach_type, const char *ev_name,
const char *fn_name,
Expand All @@ -54,7 +55,7 @@ void * bpf_attach_uprobe(int progfd, int attach_type, const char *ev_name,

int bpf_detach_uprobe(const char *ev_name);

void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie, int pid, int cpu, int page_cnt);
void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, perf_reader_lost_cb lost_cb, void *cb_cookie, int pid, int cpu, int page_cnt);
]]

ffi.cdef[[
Expand Down Expand Up @@ -98,7 +99,6 @@ int bpf_table_leaf_sscanf(void *program, size_t id, const char *buf, void *leaf)
ffi.cdef[[
struct perf_reader;

struct perf_reader * perf_reader_new(perf_reader_cb cb, perf_reader_raw_cb raw_cb, void *cb_cookie);
void perf_reader_free(void *ptr);
int perf_reader_mmap(struct perf_reader *reader, unsigned type, unsigned long sample_type);
int perf_reader_poll(int num_readers, struct perf_reader **readers, int timeout);
Expand Down
16 changes: 12 additions & 4 deletions src/lua/bcc/table.lua
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,22 @@ local function _perf_id(id, cpu)
return string.format("bcc:perf_event_array:%d:%d", tonumber(id), cpu or 0)
end

function PerfEventArray:_open_perf_buffer(cpu, callback, ctype, page_cnt)
function PerfEventArray:_open_perf_buffer(cpu, callback, ctype, page_cnt, lost_cb)
local _cb = ffi.cast("perf_reader_raw_cb",
function (cookie, data, size)
callback(cpu, ctype(data)[0])
end)

local _lost_cb = nil
if lost_cb then
_lost_cb = ffi.cast("perf_reader_lost_cb",
function (lost)
lost_cb(lost)
end)
end

-- default to 8 pages per buffer
local reader = libbcc.bpf_open_perf_buffer(_cb, nil, -1, cpu, page_cnt or 8)
local reader = libbcc.bpf_open_perf_buffer(_cb, nil, _lost_cb, -1, cpu, page_cnt or 8)
assert(reader, "failed to open perf buffer")

local fd = libbcc.perf_reader_fd(reader)
Expand All @@ -259,11 +267,11 @@ function PerfEventArray:_open_perf_buffer(cpu, callback, ctype, page_cnt)
self._callbacks[cpu] = _cb
end

function PerfEventArray:open_perf_buffer(callback, data_type, data_params, page_cnt)
function PerfEventArray:open_perf_buffer(callback, data_type, data_params, page_cnt, lost_cb)
assert(data_type, "a data type is needed for callback conversion")
local ctype = ffi.typeof(data_type.."*", unpack(data_params or {}))
for i = 0, Posix.cpu_count() - 1 do
self:_open_perf_buffer(i, callback, ctype, page_cnt)
self:_open_perf_buffer(i, callback, ctype, page_cnt, lost_cb)
end
end

Expand Down
3 changes: 2 additions & 1 deletion src/python/bcc/libbcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
_CB_TYPE = ct.CFUNCTYPE(None, ct.py_object, ct.c_int,
ct.c_ulonglong, ct.POINTER(ct.c_ulonglong))
_RAW_CB_TYPE = ct.CFUNCTYPE(None, ct.py_object, ct.c_void_p, ct.c_int)
_LOST_CB_TYPE = ct.CFUNCTYPE(None, ct.c_ulonglong)
lib.bpf_attach_kprobe.argtypes = [ct.c_int, ct.c_int, ct.c_char_p, ct.c_char_p, ct.c_int,
ct.c_int, ct.c_int, _CB_TYPE, ct.py_object]
lib.bpf_detach_kprobe.restype = ct.c_int
Expand All @@ -102,7 +103,7 @@
lib.bpf_detach_tracepoint.restype = ct.c_int
lib.bpf_detach_tracepoint.argtypes = [ct.c_char_p, ct.c_char_p]
lib.bpf_open_perf_buffer.restype = ct.c_void_p
lib.bpf_open_perf_buffer.argtypes = [_RAW_CB_TYPE, ct.py_object, ct.c_int, ct.c_int, ct.c_int]
lib.bpf_open_perf_buffer.argtypes = [_RAW_CB_TYPE, _LOST_CB_TYPE, ct.py_object, ct.c_int, ct.c_int, ct.c_int]
lib.bpf_open_perf_event.restype = ct.c_int
lib.bpf_open_perf_event.argtypes = [ct.c_uint, ct.c_ulonglong, ct.c_int, ct.c_int]
lib.perf_reader_poll.restype = ct.c_int
Expand Down
11 changes: 6 additions & 5 deletions src/python/bcc/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import multiprocessing
import os

from .libbcc import lib, _RAW_CB_TYPE
from .libbcc import lib, _RAW_CB_TYPE, _LOST_CB_TYPE
from .perf import Perf
from .utils import get_online_cpus
from .utils import get_possible_cpus
Expand Down Expand Up @@ -507,7 +507,7 @@ def __delitem__(self, key):
super(PerfEventArray, self).__delitem__(key)
self.close_perf_buffer(key)

def open_perf_buffer(self, callback, page_cnt=8):
def open_perf_buffer(self, callback, page_cnt=8, lost_cb=None):
"""open_perf_buffers(callback)

Opens a set of per-cpu ring buffer to receive custom perf event
Expand All @@ -521,11 +521,12 @@ def open_perf_buffer(self, callback, page_cnt=8):
raise Exception("Perf buffer page_cnt must be a power of two")

for i in get_online_cpus():
self._open_perf_buffer(i, callback, page_cnt)
self._open_perf_buffer(i, callback, page_cnt, lost_cb)

def _open_perf_buffer(self, cpu, callback, page_cnt):
def _open_perf_buffer(self, cpu, callback, page_cnt, lost_cb):
fn = _RAW_CB_TYPE(lambda _, data, size: callback(cpu, data, size))
reader = lib.bpf_open_perf_buffer(fn, None, -1, cpu, page_cnt)
lost_fn = _LOST_CB_TYPE(lambda lost: lost_cb(lost) if lost_cb else -1)
reader = lib.bpf_open_perf_buffer(fn, lost_fn, None, -1, cpu, page_cnt)
if not reader:
raise Exception("Could not open perf buffer")
fd = lib.perf_reader_fd(reader)
Expand Down