diff --git a/.gitignore b/.gitignore index fcb4314e..d4c78d92 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,4 @@ vendor/ # Shell repo -repo/ \ No newline at end of file +.threads/ \ No newline at end of file diff --git a/go.mod b/go.mod index 29a2105c..5178b5af 100644 --- a/go.mod +++ b/go.mod @@ -26,10 +26,12 @@ require ( github.com/libp2p/go-libp2p v0.4.0 github.com/libp2p/go-libp2p-connmgr v0.1.1 github.com/libp2p/go-libp2p-core v0.2.3 + github.com/libp2p/go-libp2p-crypto v0.1.0 github.com/libp2p/go-libp2p-gostream v0.2.0 github.com/libp2p/go-libp2p-kad-dht v0.2.1 github.com/libp2p/go-libp2p-peerstore v0.1.3 github.com/libp2p/go-libp2p-pubsub v0.1.1 + github.com/libp2p/go-libp2p-swarm v0.2.2 github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.1.1 github.com/multiformats/go-multibase v0.0.1 diff --git a/shell/main.go b/shell/main.go index 6495df1d..bb5dd407 100644 --- a/shell/main.go +++ b/shell/main.go @@ -4,10 +4,15 @@ import ( "context" "flag" "fmt" + "io/ioutil" + "os" "path" + "path/filepath" "strings" "time" + swarm "github.com/libp2p/go-libp2p-swarm" + "github.com/chzyer/readline" "github.com/fatih/color" ipfslite "github.com/hsanjuan/ipfs-lite" @@ -68,7 +73,7 @@ type msg struct { } func main() { - repo := flag.String("repo", "threads", "repo location") + repo := flag.String("repo", ".threads", "repo location") port := flag.Int("port", 4006, "host port") flag.Parse() @@ -76,33 +81,34 @@ func main() { if err != nil { panic(err) } + if err = os.MkdirAll(repop, os.ModePerm); err != nil { + panic(err) + } var cancel context.CancelFunc ctx, cancel = context.WithCancel(context.Background()) defer cancel() // Build an IPFS-Lite peer + priv := loadKey(filepath.Join(repop, "key")) + ds, err = ipfslite.BadgerDatastore(repop) if err != nil { panic(err) } - priv, _, err := ic.GenerateKeyPair(ic.Ed25519, 0) - if err != nil { - panic(err) - } + listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", *port)) if err != nil { panic(err) } - connman := connmgr.NewConnManager(100, 400, time.Minute) var h host.Host h, dht, err = ipfslite.SetupLibp2p( ctx, priv, nil, []ma.Multiaddr{listen}, - libp2p.ConnectionManager(connman), + libp2p.ConnectionManager(connmgr.NewConnManager(100, 400, time.Minute)), ) if err != nil { panic(err) @@ -245,14 +251,16 @@ func handleLine(line string) (out string, err error) { if strings.HasPrefix(line, ":") { args := strings.Split(line[1:], " ") switch args[0] { - case "use": - return useCmd(args[1:]) - case "link": - return linkCmd() - case "follower": - return followerCmd(args[1:]) + case "address": + return addressCmd() + case "thread": + return threadCmd(args[1:]) + case "thread-address": + return threadAddressCmd() + case "add-follower": + return addFollowerCmd(args[1:]) default: - err = fmt.Errorf("unknown command: %s\n", args[0]) + err = fmt.Errorf("unknown command: %s", args[0]) return } } @@ -261,16 +269,52 @@ func handleLine(line string) (out string, err error) { return } -func useCmd(args []string) (out string, err error) { +func addressCmd() (out string, err error) { + pro := ma.ProtocolWithCode(ma.P_P2P).Name + addr, err := ma.NewMultiaddr("/" + pro + "/" + api.Host().ID().String()) + if err != nil { + return + } + addrs := api.Host().Addrs() + for i, a := range addrs { + a = a.Encapsulate(addr) + out += a.String() + if i != len(addrs)-1 { + out += "\n" + } + } + return +} + +func threadCmd(args []string) (out string, err error) { if len(args) == 0 { err = fmt.Errorf("enter a thread name to use") return } name := args[0] + var addr ma.Multiaddr + if len(args) > 1 { + addr, err = ma.NewMultiaddr(args[1]) + if err != nil { + return + } + if !canDial(addr) { + return "", fmt.Errorf("address is not dialable") + } + } + x, err := ds.Get(datastore.NewKey("/names/" + name)) if err == datastore.ErrNotFound { - threadID = thread.NewIDV1(thread.Raw, 32) + if addr != nil { + info, err := api.AddThread(ctx, addr) + if err != nil { + return "", err + } + threadID = info.ID + } else { + threadID = thread.NewIDV1(thread.Raw, 32) + } err = ds.Put(datastore.NewKey("/names/"+name), threadID.Bytes()) if err != nil { return @@ -288,42 +332,93 @@ func useCmd(args []string) (out string, err error) { return } -func linkCmd() (out string, err error) { +func threadAddressCmd() (out string, err error) { if !threadID.Defined() { - err = fmt.Errorf("choose a thread to use with `:use`") + err = fmt.Errorf("choose a thread to use with `:thread`") return } - // get own log in thread - // find the "most public" address (ideally a follower) - // return "/ip4/x.x.x.x/tcp//p2p//thread/" + lg, err := util.GetOwnLog(api, threadID) + if err != nil { + return + } + ta, err := ma.NewMultiaddr("/" + t.Thread + "/" + threadID.String()) + if err != nil { + return + } + + var addrs []ma.Multiaddr + for _, la := range lg.Addrs { + p2p, err := la.ValueForProtocol(ma.P_P2P) + if err != nil { + return "", err + } + pid, err := peer.IDB58Decode(p2p) + if err != nil { + return "", err + } + + var paddrs []ma.Multiaddr + if pid.String() == api.Host().ID().String() { + paddrs = api.Host().Addrs() + } else { + paddrs = api.Host().Peerstore().Addrs(pid) + } + for _, pa := range paddrs { + addrs = append(addrs, pa.Encapsulate(la).Encapsulate(ta)) + } + } + + for i, a := range addrs { + out += a.String() + if i != len(addrs)-1 { + out += "\n" + } + } + return } -func followerCmd(args []string) (out string, err error) { +func addFollowerCmd(args []string) (out string, err error) { if !threadID.Defined() { - err = fmt.Errorf("choose a thread to use with `:use`") + err = fmt.Errorf("choose a thread to use with `:thread`") return } if len(args) == 0 { - err = fmt.Errorf("enter a thread name to use") + err = fmt.Errorf("enter a peer address") return } - p := args[0] - pid, err := peer.IDB58Decode(p) + addr, err := ma.NewMultiaddr(args[0]) if err != nil { return } - - pctx, cancel := context.WithTimeout(ctx, findPeerTimeout) - defer cancel() - pinfo, err := dht.FindPeer(pctx, pid) + p2p, err := addr.ValueForProtocol(ma.P_P2P) if err != nil { return } - api.Host().Peerstore().AddAddrs(pid, pinfo.Addrs, peerstore.PermanentAddrTTL) + pid, err := peer.IDB58Decode(p2p) + if err != nil { + return + } + dialable, err := getDialable(addr) + if err != nil { + return + } + + if dialable != nil { + api.Host().Peerstore().AddAddr(pid, dialable, peerstore.PermanentAddrTTL) + } + if len(api.Host().Peerstore().Addrs(pid)) == 0 { + pctx, cancel := context.WithTimeout(ctx, findPeerTimeout) + defer cancel() + pinfo, err := dht.FindPeer(pctx, pid) + if err != nil { + return "", err + } + api.Host().Peerstore().AddAddrs(pid, pinfo.Addrs, peerstore.PermanentAddrTTL) + } err = api.AddFollower(ctx, threadID, pid) return @@ -331,7 +426,7 @@ func followerCmd(args []string) (out string, err error) { func sendMessage(txt string) error { if !threadID.Defined() { - return fmt.Errorf("choose a thread to use with `:use`") + return fmt.Errorf("choose a thread to use with `:thread`") } mctx, cancel := context.WithTimeout(ctx, msgTimeout) @@ -345,6 +440,36 @@ func sendMessage(txt string) error { return err } +func loadKey(pth string) ic.PrivKey { + var priv ic.PrivKey + _, err := os.Stat(pth) + if os.IsNotExist(err) { + priv, _, err = ic.GenerateKeyPair(ic.Ed25519, 0) + if err != nil { + panic(err) + } + key, err := ic.MarshalPrivateKey(priv) + if err != nil { + panic(err) + } + if err = ioutil.WriteFile(pth, key, 0400); err != nil { + panic(err) + } + } else if err != nil { + panic(err) + } else { + key, err := ioutil.ReadFile(pth) + if err != nil { + panic(err) + } + priv, err = ic.UnmarshalPrivateKey(key) + if err != nil { + panic(err) + } + } + return priv +} + func parseBootstrapPeers(addrs []string) ([]peer.AddrInfo, error) { maddrs := make([]ma.Multiaddr, len(addrs)) for i, addr := range addrs { @@ -357,6 +482,18 @@ func parseBootstrapPeers(addrs []string) ([]peer.AddrInfo, error) { return peer.AddrInfosFromP2pAddrs(maddrs...) } +func getDialable(addr ma.Multiaddr) (ma.Multiaddr, error) { + parts := strings.Split(addr.String(), "/"+ma.ProtocolWithCode(ma.P_P2P).Name) + return ma.NewMultiaddr(parts[0]) +} + +func canDial(addr ma.Multiaddr) bool { + parts := strings.Split(addr.String(), "/"+ma.ProtocolWithCode(ma.P_P2P).Name) + addr, _ = ma.NewMultiaddr(parts[0]) + tr := api.Host().Network().(*swarm.Swarm).TransportForDialing(addr) + return tr != nil && tr.CanDial(addr) +} + func logError(err error) { fmt.Println(red("Error: " + err.Error())) } diff --git a/test/threads_suite.go b/test/threads_suite.go index 094a53c8..411c0caf 100644 --- a/test/threads_suite.go +++ b/test/threads_suite.go @@ -85,7 +85,7 @@ func testAddPull(ts1, _ tserv.Threadservice) func(t *testing.T) { }() ctx := context.Background() - tid := thread.NewIDV1(thread.Raw, 32) + id := thread.NewIDV1(thread.Raw, 32) body, err := cbornode.WrapObject(map[string]interface{}{ "foo": "bar", @@ -93,13 +93,13 @@ func testAddPull(ts1, _ tserv.Threadservice) func(t *testing.T) { }, mh.SHA2_256, -1) check(t, err) - r1, err := ts1.AddRecord(ctx, body, tserv.AddOpt.ThreadID(tid)) + r1, err := ts1.AddRecord(ctx, body, tserv.AddOpt.ThreadID(id)) check(t, err) if r1.Value() == nil { t.Fatalf("expected node to not be nil") } - r2, err := ts1.AddRecord(ctx, body, tserv.AddOpt.ThreadID(tid)) + r2, err := ts1.AddRecord(ctx, body, tserv.AddOpt.ThreadID(id)) check(t, err) if r2.Value() == nil { t.Fatalf("expected node to not be nil") @@ -110,20 +110,20 @@ func testAddPull(ts1, _ tserv.Threadservice) func(t *testing.T) { } // Pull from the origin - err = ts1.PullThread(ctx, tid) + err = ts1.PullThread(ctx, id) check(t, err) time.Sleep(time.Second) if rcount != 2 { t.Fatalf("expected 2 records got %d", rcount) } - r1b, err := ts1.GetRecord(ctx, tid, r1.LogID(), r1.Value().Cid()) + r1b, err := ts1.GetRecord(ctx, id, r1.LogID(), r1.Value().Cid()) check(t, err) event, err := cbor.GetEvent(ctx, ts1, r1b.BlockID()) check(t, err) - lg, err := ts1.Store().LogInfo(tid, r1.LogID()) + lg, err := ts1.Store().LogInfo(id, r1.LogID()) check(t, err) back, err := event.GetBody(ctx, ts1, lg.ReadKey) check(t, err) @@ -137,16 +137,16 @@ func testAddPull(ts1, _ tserv.Threadservice) func(t *testing.T) { func testAddPeer(ts1, ts2 tserv.Threadservice) func(t *testing.T) { return func(t *testing.T) { ctx := context.Background() - tid := thread.NewIDV1(thread.Raw, 32) + id := thread.NewIDV1(thread.Raw, 32) body, err := cbornode.WrapObject(map[string]interface{}{ "msg": "yo!", }, mh.SHA2_256, -1) check(t, err) - _, err = ts1.AddRecord(ctx, body, tserv.AddOpt.ThreadID(tid)) + _, err = ts1.AddRecord(ctx, body, tserv.AddOpt.ThreadID(id)) check(t, err) - addr, err := ma.NewMultiaddr("/p2p/" + ts1.Host().ID().String() + "/thread/" + tid.String()) + addr, err := ma.NewMultiaddr("/p2p/" + ts1.Host().ID().String() + "/thread/" + id.String()) check(t, err) info, err := ts2.AddThread(ctx, addr) @@ -159,10 +159,10 @@ func testAddPeer(ts1, ts2 tserv.Threadservice) func(t *testing.T) { "msg": "yo back!", }, mh.SHA2_256, -1) check(t, err) - _, err = ts2.AddRecord(ctx, body2, tserv.AddOpt.ThreadID(tid)) + _, err = ts2.AddRecord(ctx, body2, tserv.AddOpt.ThreadID(id)) check(t, err) - info2, err := ts1.Store().ThreadInfo(tid) + info2, err := ts1.Store().ThreadInfo(id) check(t, err) if info2.Logs.Len() != 2 { t.Fatalf("expected 2 logs got %d", info2.Logs.Len()) @@ -173,32 +173,32 @@ func testAddPeer(ts1, ts2 tserv.Threadservice) func(t *testing.T) { func testAddFollower(ts1, ts2 tserv.Threadservice) func(t *testing.T) { return func(t *testing.T) { ctx := context.Background() - tid := thread.NewIDV1(thread.Raw, 32) + id := thread.NewIDV1(thread.Raw, 32) body, err := cbornode.WrapObject(map[string]interface{}{ "msg": "yo!", }, mh.SHA2_256, -1) check(t, err) - r, err := ts1.AddRecord(ctx, body, tserv.AddOpt.ThreadID(tid)) + r, err := ts1.AddRecord(ctx, body, tserv.AddOpt.ThreadID(id)) check(t, err) t.Logf("adding follower %s", ts2.Host().ID().String()) - err = ts1.AddFollower(ctx, tid, ts2.Host().ID()) + err = ts1.AddFollower(ctx, id, ts2.Host().ID()) check(t, err) - info, err := ts2.Store().ThreadInfo(tid) + info, err := ts2.Store().ThreadInfo(id) check(t, err) if info.Logs.Len() != 1 { t.Fatalf("expected 1 log got %d", info.Logs.Len()) } - addrs, err := ts1.Store().Addrs(tid, r.LogID()) + addrs, err := ts1.Store().Addrs(id, r.LogID()) check(t, err) if len(addrs) != 2 { t.Fatalf("expected 2 addresses got %d", len(addrs)) } - addrs2, err := ts2.Store().Addrs(tid, r.LogID()) + addrs2, err := ts2.Store().Addrs(id, r.LogID()) check(t, err) if len(addrs2) != 2 { t.Fatalf("expected 2 addresses got %d", len(addrs2)) diff --git a/threads.go b/threads.go index 1a35c574..9a9ad9a5 100644 --- a/threads.go +++ b/threads.go @@ -28,6 +28,10 @@ import ( "google.golang.org/grpc" ) +func init() { + ma.SwapToP2pMultiaddrs() // /ipfs -> /p2p for peer addresses +} + var log = logging.Logger("threads") // MaxPullLimit is the maximum page size for pulling records. @@ -67,7 +71,6 @@ func NewThreads( err = setLogLevels(map[string]logger.Level{ "threads": logger.DEBUG, "threadstore": logger.DEBUG, - //"ipfslite": logger.DEBUG, }, writer, true) if err != nil { return nil, err