Compare commits

...

20 Commits

Author SHA1 Message Date
braginini
0aa5eaaf5b chore: cleanup 2021-11-20 23:04:45 +01:00
braginini
8642b87a28 chore: cleanup 2021-11-17 19:33:23 +01:00
braginini
23781c1305 chore: cleanup 2021-11-16 14:59:37 +01:00
braginini
b35dcd21df chore: make wireguard work over webrtc 2021-11-16 14:36:24 +01:00
braginini
f0a0888046 feature: successful wireguard connection 2021-11-15 22:08:44 +01:00
braginini
2ba9b59e9b feature: successful wireguard connection 2021-11-15 21:31:28 +01:00
braginini
c29632e7d1 feature: successful WebRTC connection in wireguard bind 2021-11-15 20:44:48 +01:00
braginini
f5e52eb1d9 fix: minor refactor to consider signal status 2021-11-15 15:51:54 +01:00
braginini
ed63dd516c Merge remote-tracking branch 'origin/main' into braginini/wasm
# Conflicts:
#	signal/client/client.go
2021-11-15 15:45:37 +01:00
braginini
6457c48281 chore: wasm further work 2021-11-14 18:08:33 +01:00
braginini
9587b9a930 Merge remote-tracking branch 'origin/main' into braginini/wasm 2021-11-07 13:12:09 +01:00
braginini
04de743dff merge main 2021-11-07 13:05:21 +01:00
braginini
b9aa2aa329 Merge remote-tracking branch 'origin/main' into braginini/wasm
# Conflicts:
#	signal/client/client.go
2021-11-07 12:46:57 +01:00
braginini
f71a46d27d chore: add webrtc bind base 2021-11-04 08:43:52 +01:00
braginini
c9b5a0e5fd chore: add wireguard html frontend 2021-11-03 15:23:45 +01:00
braginini
f519049e63 feature: finalize signal websocket client 2021-11-01 23:59:54 +01:00
braginini
d564400884 feature: add base for the signal websocket client 2021-11-01 23:12:44 +01:00
braginini
19408678cc fix: compilation of the client app tests 2021-11-01 23:11:38 +01:00
braginini
962b8ebc67 Merge remote-tracking branch 'origin/main' into braginini/wasm 2021-11-01 23:04:13 +01:00
braginini
ab79f544b7 feat: add signal websocket support as an alternative to gRPC 2021-11-01 22:40:07 +01:00
23 changed files with 2928 additions and 333 deletions

3
browser/Makefile Normal file
View File

@@ -0,0 +1,3 @@
run:
GOOS=js GOARCH=wasm go build -o assets/client.wasm ./client/
go run main.go

34
browser/assets/index.html Normal file
View File

@@ -0,0 +1,34 @@
<html>
<head>
<meta charset="utf-8"/>
<script src="wasm_exec.js"></script>
<script>
const go = new Go();
WebAssembly.instantiateStreaming(fetch("client.wasm"), go.importObject).then((result) => {
go.run(result.instance);
});
</script>
</head>
<body>
<form>
<input type=button value="Generate Public Key" onclick='generateWireguardKey()'>
</p>
<label for="wgPrivateKey">Wireguard private key:</label>
<input id="wgPrivateKey" type=input size="50" value="qJi7zSrgdokeoXE27fbca2hvMlgg1NQIW6KbrTJhhmc=">
</p>
<label for="publicKey">Wireguard Public Key:</label>
<input id="publicKey" type=input size="50" value="6M9O7PRhKMEOiboBp9cX6rNrLBevtHX7H0O2FMXUkFI=">
<p/>
<label for="wgIp">Wireguard private IP:</label>
<input id="wgIp" type=input size="50" value="10.0.0.2/24">
<p/>
<label for="peerKey">Wireguard Peer Public key:</label>
<input id="peerKey" type=input size="50" value="RFuT84MDhIvmgQndwMkxQPjG195poq713EMJZv1XPEw=">
<p/>
<label for="peerAllowedIPs">Wireguard Peer AllowedIPs:</label>
<input id="peerAllowedIPs" type=input size="50" value="Paste other peer AllowedIPs">
<p/>
<input type=button value="start" onclick='connect()'>
</form>
</body>
</html>

636
browser/assets/wasm_exec.js Normal file
View File

@@ -0,0 +1,636 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
(() => {
// Map multiple JavaScript environments to a single common API,
// preferring web standards over Node.js API.
//
// Environments considered:
// - Browsers
// - Node.js
// - Electron
// - Parcel
// - Webpack
if (typeof global !== "undefined") {
// global already exists
} else if (typeof window !== "undefined") {
window.global = window;
} else if (typeof self !== "undefined") {
self.global = self;
} else {
throw new Error("cannot export Go (neither global, window nor self is defined)");
}
if (!global.require && typeof require !== "undefined") {
global.require = require;
}
if (!global.fs && global.require) {
const fs = require("fs");
if (typeof fs === "object" && fs !== null && Object.keys(fs).length !== 0) {
global.fs = fs;
}
}
const enosys = () => {
const err = new Error("not implemented");
err.code = "ENOSYS";
return err;
};
if (!global.fs) {
let outputBuf = "";
global.fs = {
constants: { O_WRONLY: -1, O_RDWR: -1, O_CREAT: -1, O_TRUNC: -1, O_APPEND: -1, O_EXCL: -1 }, // unused
writeSync(fd, buf) {
outputBuf += decoder.decode(buf);
const nl = outputBuf.lastIndexOf("\n");
if (nl != -1) {
console.log(outputBuf.substr(0, nl));
outputBuf = outputBuf.substr(nl + 1);
}
return buf.length;
},
write(fd, buf, offset, length, position, callback) {
if (offset !== 0 || length !== buf.length || position !== null) {
callback(enosys());
return;
}
const n = this.writeSync(fd, buf);
callback(null, n);
},
chmod(path, mode, callback) { callback(enosys()); },
chown(path, uid, gid, callback) { callback(enosys()); },
close(fd, callback) { callback(enosys()); },
fchmod(fd, mode, callback) { callback(enosys()); },
fchown(fd, uid, gid, callback) { callback(enosys()); },
fstat(fd, callback) { callback(enosys()); },
fsync(fd, callback) { callback(null); },
ftruncate(fd, length, callback) { callback(enosys()); },
lchown(path, uid, gid, callback) { callback(enosys()); },
link(path, link, callback) { callback(enosys()); },
lstat(path, callback) { callback(enosys()); },
mkdir(path, perm, callback) { callback(enosys()); },
open(path, flags, mode, callback) { callback(enosys()); },
read(fd, buffer, offset, length, position, callback) { callback(enosys()); },
readdir(path, callback) { callback(enosys()); },
readlink(path, callback) { callback(enosys()); },
rename(from, to, callback) { callback(enosys()); },
rmdir(path, callback) { callback(enosys()); },
stat(path, callback) { callback(enosys()); },
symlink(path, link, callback) { callback(enosys()); },
truncate(path, length, callback) { callback(enosys()); },
unlink(path, callback) { callback(enosys()); },
utimes(path, atime, mtime, callback) { callback(enosys()); },
};
}
if (!global.process) {
global.process = {
getuid() { return -1; },
getgid() { return -1; },
geteuid() { return -1; },
getegid() { return -1; },
getgroups() { throw enosys(); },
pid: -1,
ppid: -1,
umask() { throw enosys(); },
cwd() { throw enosys(); },
chdir() { throw enosys(); },
}
}
if (!global.crypto && global.require) {
const nodeCrypto = require("crypto");
global.crypto = {
getRandomValues(b) {
nodeCrypto.randomFillSync(b);
},
};
}
if (!global.crypto) {
throw new Error("global.crypto is not available, polyfill required (getRandomValues only)");
}
if (!global.performance) {
global.performance = {
now() {
const [sec, nsec] = process.hrtime();
return sec * 1000 + nsec / 1000000;
},
};
}
if (!global.TextEncoder && global.require) {
global.TextEncoder = require("util").TextEncoder;
}
if (!global.TextEncoder) {
throw new Error("global.TextEncoder is not available, polyfill required");
}
if (!global.TextDecoder && global.require) {
global.TextDecoder = require("util").TextDecoder;
}
if (!global.TextDecoder) {
throw new Error("global.TextDecoder is not available, polyfill required");
}
// End of polyfills for common API.
const encoder = new TextEncoder("utf-8");
const decoder = new TextDecoder("utf-8");
global.Go = class {
constructor() {
this.argv = ["js"];
this.env = {};
this.exit = (code) => {
if (code !== 0) {
console.warn("exit code:", code);
}
};
this._exitPromise = new Promise((resolve) => {
this._resolveExitPromise = resolve;
});
this._pendingEvent = null;
this._scheduledTimeouts = new Map();
this._nextCallbackTimeoutID = 1;
const setInt64 = (addr, v) => {
this.mem.setUint32(addr + 0, v, true);
this.mem.setUint32(addr + 4, Math.floor(v / 4294967296), true);
}
const getInt64 = (addr) => {
const low = this.mem.getUint32(addr + 0, true);
const high = this.mem.getInt32(addr + 4, true);
return low + high * 4294967296;
}
const loadValue = (addr) => {
const f = this.mem.getFloat64(addr, true);
if (f === 0) {
return undefined;
}
if (!isNaN(f)) {
return f;
}
const id = this.mem.getUint32(addr, true);
return this._values[id];
}
const storeValue = (addr, v) => {
const nanHead = 0x7FF80000;
if (typeof v === "number" && v !== 0) {
if (isNaN(v)) {
this.mem.setUint32(addr + 4, nanHead, true);
this.mem.setUint32(addr, 0, true);
return;
}
this.mem.setFloat64(addr, v, true);
return;
}
if (v === undefined) {
this.mem.setFloat64(addr, 0, true);
return;
}
let id = this._ids.get(v);
if (id === undefined) {
id = this._idPool.pop();
if (id === undefined) {
id = this._values.length;
}
this._values[id] = v;
this._goRefCounts[id] = 0;
this._ids.set(v, id);
}
this._goRefCounts[id]++;
let typeFlag = 0;
switch (typeof v) {
case "object":
if (v !== null) {
typeFlag = 1;
}
break;
case "string":
typeFlag = 2;
break;
case "symbol":
typeFlag = 3;
break;
case "function":
typeFlag = 4;
break;
}
this.mem.setUint32(addr + 4, nanHead | typeFlag, true);
this.mem.setUint32(addr, id, true);
}
const loadSlice = (addr) => {
const array = getInt64(addr + 0);
const len = getInt64(addr + 8);
return new Uint8Array(this._inst.exports.mem.buffer, array, len);
}
const loadSliceOfValues = (addr) => {
const array = getInt64(addr + 0);
const len = getInt64(addr + 8);
const a = new Array(len);
for (let i = 0; i < len; i++) {
a[i] = loadValue(array + i * 8);
}
return a;
}
const loadString = (addr) => {
const saddr = getInt64(addr + 0);
const len = getInt64(addr + 8);
return decoder.decode(new DataView(this._inst.exports.mem.buffer, saddr, len));
}
const timeOrigin = Date.now() - performance.now();
this.importObject = {
go: {
// Go's SP does not change as long as no Go code is running. Some operations (e.g. calls, getters and setters)
// may synchronously trigger a Go event handler. This makes Go code get executed in the middle of the imported
// function. A goroutine can switch to a new stack if the current stack is too small (see morestack function).
// This changes the SP, thus we have to update the SP used by the imported function.
// func wasmExit(code int32)
"runtime.wasmExit": (sp) => {
sp >>>= 0;
const code = this.mem.getInt32(sp + 8, true);
this.exited = true;
delete this._inst;
delete this._values;
delete this._goRefCounts;
delete this._ids;
delete this._idPool;
this.exit(code);
},
// func wasmWrite(fd uintptr, p unsafe.Pointer, n int32)
"runtime.wasmWrite": (sp) => {
sp >>>= 0;
const fd = getInt64(sp + 8);
const p = getInt64(sp + 16);
const n = this.mem.getInt32(sp + 24, true);
fs.writeSync(fd, new Uint8Array(this._inst.exports.mem.buffer, p, n));
},
// func resetMemoryDataView()
"runtime.resetMemoryDataView": (sp) => {
sp >>>= 0;
this.mem = new DataView(this._inst.exports.mem.buffer);
},
// func nanotime1() int64
"runtime.nanotime1": (sp) => {
sp >>>= 0;
setInt64(sp + 8, (timeOrigin + performance.now()) * 1000000);
},
// func walltime() (sec int64, nsec int32)
"runtime.walltime": (sp) => {
sp >>>= 0;
const msec = (new Date).getTime();
setInt64(sp + 8, msec / 1000);
this.mem.setInt32(sp + 16, (msec % 1000) * 1000000, true);
},
// func scheduleTimeoutEvent(delay int64) int32
"runtime.scheduleTimeoutEvent": (sp) => {
sp >>>= 0;
const id = this._nextCallbackTimeoutID;
this._nextCallbackTimeoutID++;
this._scheduledTimeouts.set(id, setTimeout(
() => {
this._resume();
while (this._scheduledTimeouts.has(id)) {
// for some reason Go failed to register the timeout event, log and try again
// (temporary workaround for https://github.com/golang/go/issues/28975)
console.warn("scheduleTimeoutEvent: missed timeout event");
this._resume();
}
},
getInt64(sp + 8) + 1, // setTimeout has been seen to fire up to 1 millisecond early
));
this.mem.setInt32(sp + 16, id, true);
},
// func clearTimeoutEvent(id int32)
"runtime.clearTimeoutEvent": (sp) => {
sp >>>= 0;
const id = this.mem.getInt32(sp + 8, true);
clearTimeout(this._scheduledTimeouts.get(id));
this._scheduledTimeouts.delete(id);
},
// func getRandomData(r []byte)
"runtime.getRandomData": (sp) => {
sp >>>= 0;
crypto.getRandomValues(loadSlice(sp + 8));
},
// func finalizeRef(v ref)
"syscall/js.finalizeRef": (sp) => {
sp >>>= 0;
const id = this.mem.getUint32(sp + 8, true);
this._goRefCounts[id]--;
if (this._goRefCounts[id] === 0) {
const v = this._values[id];
this._values[id] = null;
this._ids.delete(v);
this._idPool.push(id);
}
},
// func stringVal(value string) ref
"syscall/js.stringVal": (sp) => {
sp >>>= 0;
storeValue(sp + 24, loadString(sp + 8));
},
// func valueGet(v ref, p string) ref
"syscall/js.valueGet": (sp) => {
sp >>>= 0;
const result = Reflect.get(loadValue(sp + 8), loadString(sp + 16));
sp = this._inst.exports.getsp() >>> 0; // see comment above
storeValue(sp + 32, result);
},
// func valueSet(v ref, p string, x ref)
"syscall/js.valueSet": (sp) => {
sp >>>= 0;
Reflect.set(loadValue(sp + 8), loadString(sp + 16), loadValue(sp + 32));
},
// func valueDelete(v ref, p string)
"syscall/js.valueDelete": (sp) => {
sp >>>= 0;
Reflect.deleteProperty(loadValue(sp + 8), loadString(sp + 16));
},
// func valueIndex(v ref, i int) ref
"syscall/js.valueIndex": (sp) => {
sp >>>= 0;
storeValue(sp + 24, Reflect.get(loadValue(sp + 8), getInt64(sp + 16)));
},
// valueSetIndex(v ref, i int, x ref)
"syscall/js.valueSetIndex": (sp) => {
sp >>>= 0;
Reflect.set(loadValue(sp + 8), getInt64(sp + 16), loadValue(sp + 24));
},
// func valueCall(v ref, m string, args []ref) (ref, bool)
"syscall/js.valueCall": (sp) => {
sp >>>= 0;
try {
const v = loadValue(sp + 8);
const m = Reflect.get(v, loadString(sp + 16));
const args = loadSliceOfValues(sp + 32);
const result = Reflect.apply(m, v, args);
sp = this._inst.exports.getsp() >>> 0; // see comment above
storeValue(sp + 56, result);
this.mem.setUint8(sp + 64, 1);
} catch (err) {
sp = this._inst.exports.getsp() >>> 0; // see comment above
storeValue(sp + 56, err);
this.mem.setUint8(sp + 64, 0);
}
},
// func valueInvoke(v ref, args []ref) (ref, bool)
"syscall/js.valueInvoke": (sp) => {
sp >>>= 0;
try {
const v = loadValue(sp + 8);
const args = loadSliceOfValues(sp + 16);
const result = Reflect.apply(v, undefined, args);
sp = this._inst.exports.getsp() >>> 0; // see comment above
storeValue(sp + 40, result);
this.mem.setUint8(sp + 48, 1);
} catch (err) {
sp = this._inst.exports.getsp() >>> 0; // see comment above
storeValue(sp + 40, err);
this.mem.setUint8(sp + 48, 0);
}
},
// func valueNew(v ref, args []ref) (ref, bool)
"syscall/js.valueNew": (sp) => {
sp >>>= 0;
try {
const v = loadValue(sp + 8);
const args = loadSliceOfValues(sp + 16);
const result = Reflect.construct(v, args);
sp = this._inst.exports.getsp() >>> 0; // see comment above
storeValue(sp + 40, result);
this.mem.setUint8(sp + 48, 1);
} catch (err) {
sp = this._inst.exports.getsp() >>> 0; // see comment above
storeValue(sp + 40, err);
this.mem.setUint8(sp + 48, 0);
}
},
// func valueLength(v ref) int
"syscall/js.valueLength": (sp) => {
sp >>>= 0;
setInt64(sp + 16, parseInt(loadValue(sp + 8).length));
},
// valuePrepareString(v ref) (ref, int)
"syscall/js.valuePrepareString": (sp) => {
sp >>>= 0;
const str = encoder.encode(String(loadValue(sp + 8)));
storeValue(sp + 16, str);
setInt64(sp + 24, str.length);
},
// valueLoadString(v ref, b []byte)
"syscall/js.valueLoadString": (sp) => {
sp >>>= 0;
const str = loadValue(sp + 8);
loadSlice(sp + 16).set(str);
},
// func valueInstanceOf(v ref, t ref) bool
"syscall/js.valueInstanceOf": (sp) => {
sp >>>= 0;
this.mem.setUint8(sp + 24, (loadValue(sp + 8) instanceof loadValue(sp + 16)) ? 1 : 0);
},
// func copyBytesToGo(dst []byte, src ref) (int, bool)
"syscall/js.copyBytesToGo": (sp) => {
sp >>>= 0;
const dst = loadSlice(sp + 8);
const src = loadValue(sp + 32);
if (!(src instanceof Uint8Array || src instanceof Uint8ClampedArray)) {
this.mem.setUint8(sp + 48, 0);
return;
}
const toCopy = src.subarray(0, dst.length);
dst.set(toCopy);
setInt64(sp + 40, toCopy.length);
this.mem.setUint8(sp + 48, 1);
},
// func copyBytesToJS(dst ref, src []byte) (int, bool)
"syscall/js.copyBytesToJS": (sp) => {
sp >>>= 0;
const dst = loadValue(sp + 8);
const src = loadSlice(sp + 16);
if (!(dst instanceof Uint8Array || dst instanceof Uint8ClampedArray)) {
this.mem.setUint8(sp + 48, 0);
return;
}
const toCopy = src.subarray(0, dst.length);
dst.set(toCopy);
setInt64(sp + 40, toCopy.length);
this.mem.setUint8(sp + 48, 1);
},
"debug": (value) => {
console.log(value);
},
}
};
}
async run(instance) {
if (!(instance instanceof WebAssembly.Instance)) {
throw new Error("Go.run: WebAssembly.Instance expected");
}
this._inst = instance;
this.mem = new DataView(this._inst.exports.mem.buffer);
this._values = [ // JS values that Go currently has references to, indexed by reference id
NaN,
0,
null,
true,
false,
global,
this,
];
this._goRefCounts = new Array(this._values.length).fill(Infinity); // number of references that Go has to a JS value, indexed by reference id
this._ids = new Map([ // mapping from JS values to reference ids
[0, 1],
[null, 2],
[true, 3],
[false, 4],
[global, 5],
[this, 6],
]);
this._idPool = []; // unused ids that have been garbage collected
this.exited = false; // whether the Go program has exited
// Pass command line arguments and environment variables to WebAssembly by writing them to the linear memory.
let offset = 4096;
const strPtr = (str) => {
const ptr = offset;
const bytes = encoder.encode(str + "\0");
new Uint8Array(this.mem.buffer, offset, bytes.length).set(bytes);
offset += bytes.length;
if (offset % 8 !== 0) {
offset += 8 - (offset % 8);
}
return ptr;
};
const argc = this.argv.length;
const argvPtrs = [];
this.argv.forEach((arg) => {
argvPtrs.push(strPtr(arg));
});
argvPtrs.push(0);
const keys = Object.keys(this.env).sort();
keys.forEach((key) => {
argvPtrs.push(strPtr(`${key}=${this.env[key]}`));
});
argvPtrs.push(0);
const argv = offset;
argvPtrs.forEach((ptr) => {
this.mem.setUint32(offset, ptr, true);
this.mem.setUint32(offset + 4, 0, true);
offset += 8;
});
// The linker guarantees global data starts from at least wasmMinDataAddr.
// Keep in sync with cmd/link/internal/ld/data.go:wasmMinDataAddr.
const wasmMinDataAddr = 4096 + 4096;
if (offset >= wasmMinDataAddr) {
throw new Error("command line too long");
}
this._inst.exports.run(argc, argv);
if (this.exited) {
this._resolveExitPromise();
}
await this._exitPromise;
}
_resume() {
if (this.exited) {
throw new Error("Go program has already exited");
}
this._inst.exports.resume();
if (this.exited) {
this._resolveExitPromise();
}
}
_makeFuncWrapper(id) {
const go = this;
return function () {
const event = { id: id, this: this, args: arguments };
go._pendingEvent = event;
go._resume();
return event.result;
};
}
}
if (
typeof module !== "undefined" &&
global.require &&
global.require.main === module &&
global.process &&
global.process.versions &&
!global.process.versions.electron
) {
if (process.argv.length < 3) {
console.error("usage: go_js_wasm_exec [wasm binary] [arguments]");
process.exit(1);
}
const go = new Go();
go.argv = process.argv.slice(2);
go.env = Object.assign({ TMPDIR: require("os").tmpdir() }, process.env);
go.exit = process.exit;
WebAssembly.instantiate(fs.readFileSync(process.argv[2]), go.importObject).then((result) => {
process.on("exit", (code) => { // Node.js exits if no event handler is pending
if (code === 0 && !go.exited) {
// deadlock, make Go print error and stack traces
go._pendingEvent = { id: 0 };
go._resume();
}
});
return go.run(result.instance);
}).catch((err) => {
console.error(err);
process.exit(1);
});
}
})();

88
browser/client/client.go Normal file
View File

@@ -0,0 +1,88 @@
package main
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"github.com/wiretrustee/wiretrustee/browser/conn"
"github.com/wiretrustee/wiretrustee/signal/client"
"golang.zx2c4.com/wireguard/device"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"io"
"log"
"net"
"net/http"
"time"
)
//my private key qJi7zSrgdokeoXE27fbca2hvMlgg1NQIW6KbrTJhhmc=
//remote private key KLuBc6tM/NRV1071bfPiNUxZmMhGBCXfxoDg+A+J7ns=
func main() {
key, err := wgtypes.ParseKey("qJi7zSrgdokeoXE27fbca2hvMlgg1NQIW6KbrTJhhmc=")
if err != nil {
panic(err)
}
log.Printf("my public key: %s", key.PublicKey().String())
remoteKey, err := wgtypes.ParseKey("RFuT84MDhIvmgQndwMkxQPjG195poq713EMJZv1XPEw=")
if err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
signal, err := client.NewWebsocketClient(ctx, "ws://localhost:80/signal", key)
time.Sleep(5 * time.Second)
tun, tnet, err := netstack.CreateNetTUN(
[]net.IP{net.ParseIP("1s00.0.2.2")},
[]net.IP{net.ParseIP("8.8.8.8")},
1420)
b := conn.NewWebRTCBind("chann-1", signal, key.PublicKey().String(), remoteKey.String())
dev := device.NewDevice(tun, b, device.NewLogger(device.LogLevelVerbose, ""))
err = dev.IpcSet(fmt.Sprintf("private_key=%s\npublic_key=%s\npersistent_keepalive_interval=5\nendpoint=webrtc://datachannel\nallowed_ip=0.0.0.0/0",
hex.EncodeToString(key[:]),
hex.EncodeToString(remoteKey[:]),
))
dev.Up()
if err != nil {
panic(err)
}
client := http.Client{
Transport: &http.Transport{
DialContext: tnet.DialContext,
},
}
time.Sleep(5 * time.Second)
//go func() {
log.Printf("request")
req, _ := http.NewRequest("POST", "http://100.0.2.1", bytes.NewBufferString("fdffffffffffffffffffffffffffffffdsdfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"))
req.Header.Set("js.fetch:mode", "no-cors")
resp, err := client.Do(req)
if err != nil {
log.Panic(err)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Panic(err)
}
log.Printf(string(body))
log.Printf(resp.Status)
//}()
select {}
}

347
browser/conn/bind_webrtc.go Normal file
View File

@@ -0,0 +1,347 @@
package conn
import (
"context"
"encoding/json"
"fmt"
"github.com/pion/webrtc/v3"
signal "github.com/wiretrustee/wiretrustee/signal/client"
"github.com/wiretrustee/wiretrustee/signal/proto"
"golang.zx2c4.com/wireguard/conn"
"log"
"net"
"sync"
"time"
)
const initDataChannelName = "wiretrustee-init"
func (*WebRTCBind) makeReceive(dcConn net.Conn) conn.ReceiveFunc {
return func(buff []byte) (int, conn.Endpoint, error) {
n, err := dcConn.Read(buff)
if err != nil {
return 0, nil, err
}
//addr := dcConn.RemoteAddr().(DataChannelAddr)
return n, &WebRTCEndpoint{}, err
}
}
// WebRTCBind is an implementation of Wireguard Bind interface backed by WebRTC data channel
type WebRTCBind struct {
id string
pc *webrtc.PeerConnection
conn net.Conn
incoming chan *webrtc.DataChannel
mu sync.Mutex
signal signal.Client
key string
remoteKey string
closeCond *Cond
closeErr error
}
func NewWebRTCBind(id string, signal signal.Client, pubKey string, remotePubKey string) conn.Bind {
return &WebRTCBind{
id: id,
pc: nil,
conn: nil,
signal: signal,
mu: sync.Mutex{},
key: pubKey,
remoteKey: remotePubKey,
closeCond: NewCond(),
incoming: make(chan *webrtc.DataChannel, 1),
}
}
// acceptDC accepts a datachannel over opened WebRTC connection and wraps it into net.Conn
// blocks until channel was successfully opened
func (bind *WebRTCBind) acceptDC() (stream net.Conn, err error) {
for dc := range bind.incoming {
if dc.Label() == initDataChannelName {
continue
}
stream, err := WrapDataChannel(dc)
if err != nil {
dc.Close()
return nil, err
}
log.Printf("accepted datachannel connection %s", dc.Label())
return stream, nil
}
return nil, context.Canceled
}
// openDC creates datachannel over opened WebRTC connection and wraps it into net.Conn
// blocks until channel was successfully opened
func (bind *WebRTCBind) openDC() (stream net.Conn, err error) {
dc, err := bind.pc.CreateDataChannel(bind.id, nil)
if err != nil {
return nil, fmt.Errorf("failed to open RTCDataChannel: %w", err)
}
stream, err = WrapDataChannel(dc)
if err != nil {
dc.Close()
return nil, err
}
log.Printf("opened datachannel connection %s", dc.Label())
return stream, err
}
func newPeerConnection() (*webrtc.PeerConnection, error) {
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
pc, err := webrtc.NewPeerConnection(config)
if err != nil {
return nil, err
}
return pc, nil
}
func (bind *WebRTCBind) Open(port uint16) (fns []conn.ReceiveFunc, actualPort uint16, err error) {
log.Printf("opening WebRTCBind connection")
connected := NewCond()
bind.pc, err = newPeerConnection()
if err != nil {
bind.pc.Close()
return nil, 0, err
}
bind.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
switch state {
case webrtc.ICEConnectionStateConnected:
connected.Signal()
case webrtc.ICEConnectionStateClosed:
log.Printf("WebRTC connection closed")
//TODO
}
})
bind.pc.OnDataChannel(func(dc *webrtc.DataChannel) {
log.Printf("received channel %s %v", dc.Label(), dc)
bind.incoming <- dc
})
controlling := bind.key < bind.remoteKey
// decision who is creating an offer
if controlling {
_, err = bind.pc.CreateDataChannel(initDataChannelName, nil)
if err != nil {
return nil, 0, err
}
offer, err := bind.pc.CreateOffer(nil)
if err != nil {
return nil, 0, err
}
if err := bind.pc.SetLocalDescription(offer); err != nil {
return nil, 0, err
}
// Create channel that is blocked until ICE Gathering is complete
gatherComplete := webrtc.GatheringCompletePromise(bind.pc)
select {
case <-gatherComplete:
case <-bind.closeCond.C:
return nil, 0, fmt.Errorf("closed while waiting for WebRTC candidates")
}
log.Printf("candidates gathered")
err = bind.signal.Send(&proto.Message{
Key: bind.key,
RemoteKey: bind.remoteKey,
Body: &proto.Body{
Type: proto.Body_OFFER,
Payload: Encode(bind.pc.LocalDescription()),
},
})
if err != nil {
return nil, 0, err
}
log.Printf("sent an offer to a remote peer")
//answerCh := make(chan webrtc.SessionDescription, 1)
go bind.signal.Receive(func(msg *proto.Message) error {
log.Printf("received a message from %v -> %v", msg.RemoteKey, msg.Body.Payload)
if msg.GetBody().Type == proto.Body_ANSWER {
log.Printf("received answer")
err := setRemoteDescription(bind.pc, msg.GetBody().GetPayload())
if err != nil {
log.Printf("%v", err)
return err
}
}
return nil
})
} else {
gatherComplete := webrtc.GatheringCompletePromise(bind.pc)
go bind.signal.Receive(func(msg *proto.Message) error {
log.Printf("received a message from %v -> %v", msg.RemoteKey, msg.Body.Payload)
if msg.GetBody().Type == proto.Body_OFFER {
log.Printf("received offer")
err = setRemoteDescription(bind.pc, msg.GetBody().GetPayload())
if err != nil {
log.Printf("%v", err)
return err
}
sdp, err := bind.pc.CreateAnswer(nil)
if err != nil {
log.Printf("%v", err)
return err
}
if err := bind.pc.SetLocalDescription(sdp); err != nil {
log.Printf("%v", err)
return err
}
select {
case <-gatherComplete:
case <-bind.closeCond.C:
return nil
}
log.Printf("candidates gathered")
err = bind.signal.Send(&proto.Message{
Key: bind.key,
RemoteKey: bind.remoteKey,
Body: &proto.Body{
Type: proto.Body_ANSWER,
Payload: Encode(bind.pc.LocalDescription()),
},
})
if err != nil {
return err
}
log.Printf("sent an answer to a remote peer")
}
return nil
})
}
select {
case <-time.After(10 * time.Second):
return nil, 0, fmt.Errorf("failed to connect in time: %w", err)
case <-connected.C:
}
log.Printf("WebRTC connection has opened successfully")
//once WebRTC has been established we can now create a datachannel and resume
var dcConn net.Conn
if controlling {
dcConn, err = bind.openDC()
if err != nil {
return nil, 0, err
}
} else {
dcConn, err = bind.acceptDC()
if err != nil {
return nil, 0, err
}
}
bind.conn = dcConn
fns = append(fns, bind.makeReceive(bind.conn))
return fns, 0, nil
}
func setRemoteDescription(pc *webrtc.PeerConnection, payload string) error {
descr, err := Decode(payload)
if err != nil {
return err
}
err = pc.SetRemoteDescription(*descr)
if err != nil {
return err
}
log.Printf("parsed SDP %s", descr.SDP)
return nil
}
func Decode(in string) (*webrtc.SessionDescription, error) {
descr := &webrtc.SessionDescription{}
err := json.Unmarshal([]byte(in), descr)
if err != nil {
return nil, err
}
return descr, nil
}
func Encode(obj interface{}) string {
b, err := json.Marshal(obj)
if err != nil {
panic(err)
}
return string(b)
}
func (*WebRTCBind) Close() error {
return nil
}
func (*WebRTCBind) SetMark(mark uint32) error {
return nil
}
func (bind *WebRTCBind) Send(b []byte, ep conn.Endpoint) error {
n, err := bind.conn.Write(b)
if err != nil {
return err
}
log.Printf("wrote %d bytes", n)
return nil
}
func (*WebRTCBind) ParseEndpoint(s string) (conn.Endpoint, error) {
log.Printf("peer endpoint %s", s)
return &WebRTCEndpoint{}, nil
}
// WebRTCEndpoint is an implementation of Wireguard's Endpoint interface backed by WebRTC
type WebRTCEndpoint DataChannelAddr
func (e *WebRTCEndpoint) ClearSrc() {
}
func (e *WebRTCEndpoint) SrcToString() string {
return ""
}
func (e *WebRTCEndpoint) DstToString() string {
return (*DataChannelAddr)(e).String()
}
func (e *WebRTCEndpoint) DstToBytes() []byte {
port := 31234
out := net.IP{127, 0, 0, 1}
out = append(out, byte(port&0xff))
out = append(out, byte((port>>8)&0xff))
return out
}
func (e *WebRTCEndpoint) DstIP() net.IP {
return net.IP{127, 0, 0, 1}
}
func (e *WebRTCEndpoint) SrcIP() net.IP {
return nil
}

34
browser/conn/cond.go Normal file
View File

@@ -0,0 +1,34 @@
package conn
// credits to https://github.com/rtctunnel/rtctunnel
import "sync"
// A Cond is a condition variable like sync.Cond, but using a channel so we can use select.
type Cond struct {
once sync.Once
C chan struct{}
}
// NewCond creates a new condition variable.
func NewCond() *Cond {
return &Cond{C: make(chan struct{})}
}
// Do runs f if the condition hasn't been signaled yet. Afterwards it will be signaled.
func (c *Cond) Do(f func()) {
c.once.Do(func() {
f()
close(c.C)
})
}
// Signal closes the condition variable channel.
func (c *Cond) Signal() {
c.Do(func() {})
}
// Wait waits for the condition variable channel to close.
func (c *Cond) Wait() {
<-c.C
}

220
browser/conn/conn.go Normal file
View File

@@ -0,0 +1,220 @@
package conn
// credits to https://github.com/rtctunnel/rtctunnel
import (
"context"
"errors"
"github.com/pion/webrtc/v3"
"io"
"log"
"net"
"time"
)
var ErrClosedByPeer = errors.New("closed by peer")
type DataChannelAddr struct{}
func (addr DataChannelAddr) Network() string {
return "webrtc"
}
func (addr DataChannelAddr) String() string {
return "webrtc://datachannel"
}
// A DataChannelConn implements the net.Conn interface over a webrtc data channel
type DataChannelConn struct {
dc *webrtc.DataChannel
rr ContextReadCloser
rw ContextWriteCloser
openCond *Cond
closeCond *Cond
closeErr error
}
// WrapDataChannel wraps an rtc data channel and implements the net.Conn
// interface
func WrapDataChannel(rtcDataChannel *webrtc.DataChannel) (*DataChannelConn, error) {
rr, rw := io.Pipe()
conn := &DataChannelConn{
dc: rtcDataChannel,
rr: ContextReadCloser{Context: context.Background(), ReadCloser: rr},
rw: ContextWriteCloser{Context: context.Background(), WriteCloser: rw},
openCond: NewCond(),
closeCond: NewCond(),
}
conn.dc.OnClose(func() {
_ = conn.closeWithError(ErrClosedByPeer)
})
conn.dc.OnOpen(func() {
// for reasons I don't understand, when opened the data channel is not immediately available for use
time.Sleep(50 * time.Millisecond)
conn.openCond.Signal()
})
conn.dc.OnMessage(func(msg webrtc.DataChannelMessage) {
log.Printf("received message from data channel %d", len(msg.Data))
if rw != nil {
_, err := rw.Write(msg.Data)
if err != nil {
_ = conn.closeWithError(err)
rw = nil
}
}
})
select {
case <-conn.closeCond.C:
err := conn.closeErr
if err == nil {
err = errors.New("datachannel closed for unknown reasons")
}
return nil, err
case <-conn.openCond.C:
}
return conn, nil
}
func (dc *DataChannelConn) Read(b []byte) (n int, err error) {
return dc.rr.Read(b)
}
func (dc *DataChannelConn) Write(b []byte) (n int, err error) {
log.Printf("writing buffer of size %d", len(b))
err = dc.dc.Send(b)
if err != nil {
return 0, err
}
return len(b), nil
}
func (dc *DataChannelConn) Close() error {
return dc.closeWithError(nil)
}
func (dc *DataChannelConn) LocalAddr() net.Addr {
return DataChannelAddr{}
}
func (dc *DataChannelConn) RemoteAddr() net.Addr {
return DataChannelAddr{}
}
func (dc *DataChannelConn) SetDeadline(t time.Time) error {
var err error
if e := dc.SetReadDeadline(t); e != nil {
err = e
}
if e := dc.SetWriteDeadline(t); e != nil {
err = e
}
return err
}
func (dc *DataChannelConn) SetReadDeadline(t time.Time) error {
return dc.rr.SetReadDeadline(t)
}
func (dc *DataChannelConn) SetWriteDeadline(t time.Time) error {
return dc.rw.SetWriteDeadline(t)
}
func (dc *DataChannelConn) closeWithError(err error) error {
dc.closeCond.Do(func() {
e := dc.rr.Close()
if err == nil {
err = e
}
e = dc.rw.Close()
if err == nil {
err = e
}
e = dc.dc.Close()
if err == nil {
err = e
}
dc.closeErr = err
})
return err
}
type ContextReadCloser struct {
context.Context
io.ReadCloser
cancel func()
}
func (cr ContextReadCloser) Close() error {
err := cr.ReadCloser.Close()
if cr.cancel != nil {
cr.cancel()
cr.cancel = nil
}
return err
}
func (cr ContextReadCloser) SetReadDeadline(t time.Time) error {
if cr.cancel != nil {
cr.cancel()
cr.cancel = nil
}
cr.Context, cr.cancel = context.WithDeadline(context.Background(), t)
return nil
}
func (cr ContextReadCloser) Read(p []byte) (n int, err error) {
done := make(chan struct{})
go func() {
n, err = cr.ReadCloser.Read(p)
close(done)
}()
select {
case <-done:
return n, err
case <-cr.Context.Done():
return 0, cr.Context.Err()
}
}
type ContextWriteCloser struct {
context.Context
io.WriteCloser
cancel func()
}
func (cw ContextWriteCloser) Close() error {
err := cw.WriteCloser.Close()
if cw.cancel != nil {
cw.cancel()
cw.cancel = nil
}
return err
}
func (cw ContextWriteCloser) SetWriteDeadline(t time.Time) error {
if cw.cancel != nil {
cw.cancel()
cw.cancel = nil
}
cw.Context, cw.cancel = context.WithDeadline(context.Background(), t)
return nil
}
func (cw ContextWriteCloser) Write(p []byte) (n int, err error) {
done := make(chan struct{})
go func() {
n, err = cw.WriteCloser.Write(p)
close(done)
}()
select {
case <-done:
return n, err
case <-cw.Context.Done():
return 0, cw.Context.Err()
}
}

18
browser/main.go Normal file
View File

@@ -0,0 +1,18 @@
package main
import (
"fmt"
"log"
"net/http"
)
func main() {
log.Print("listening on http://localhost:9090")
err := http.ListenAndServe(":9090", http.FileServer(http.Dir("./assets")))
//err := http.ListenAndServe(":9090", http.FileServer(http.Dir("/home/braginini/Documents/projects/my/wiretrustee/rtctunnel/examples/browser-http/dist")))
if err != nil {
fmt.Println("Failed to start server", err)
return
}
}

123
browser/server/server.go Normal file
View File

@@ -0,0 +1,123 @@
package main
import (
"context"
"encoding/hex"
"flag"
"fmt"
"github.com/wiretrustee/wiretrustee/browser/conn"
"github.com/wiretrustee/wiretrustee/signal/client"
"golang.zx2c4.com/wireguard/device"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"io"
"log"
"net"
"net/http"
"time"
)
//my private key qJi7zSrgdokeoXE27fbca2hvMlgg1NQIW6KbrTJhhmc=
//remote private key KLuBc6tM/NRV1071bfPiNUxZmMhGBCXfxoDg+A+J7ns=
//./server --key KLuBc6tM/NRV1071bfPiNUxZmMhGBCXfxoDg+A+J7ns= --remote-key 6M9O7PRhKMEOiboBp9cX6rNrLBevtHX7H0O2FMXUkFI= --signal-endpoint ws://0.0.0.0:80/signal --ip 100.0.2.1 --remote-ip 100.0.2.2
func main() {
keyFlag := flag.String("key", "", "a Wireguard private key")
remoteKeyFlag := flag.String("remote-key", "", "a Wireguard remote peer public key")
signalEndpoint := flag.String("signal-endpoint", "ws://apitest.wiretrustee.com:80/signal", "a Signal service Websocket endpoint")
cl := flag.Bool("client", false, "indicates whether the program is a client")
ip := flag.String("ip", "", "Wireguard IP")
remoteIP := flag.String("remote-ip", "", "Wireguard IP")
flag.Parse()
key, err := wgtypes.ParseKey(*keyFlag)
if err != nil {
panic(err)
}
log.Printf("my public key: %s", key.PublicKey().String())
remoteKey, err := wgtypes.ParseKey(*remoteKeyFlag)
if err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
signal, err := client.NewWebsocketClient(ctx, *signalEndpoint, key)
time.Sleep(5 * time.Second)
tun, tnet, err := netstack.CreateNetTUN(
[]net.IP{net.ParseIP(*ip)},
[]net.IP{net.ParseIP("8.8.8.8")},
1420)
b := conn.NewWebRTCBind("chann-1", signal, key.PublicKey().String(), remoteKey.String())
dev := device.NewDevice(tun, b, device.NewLogger(device.LogLevelVerbose, ""))
allowedIPs := *remoteIP + "/32"
if *cl {
allowedIPs = "0.0.0.0/0"
}
err = dev.IpcSet(fmt.Sprintf("private_key=%s\npublic_key=%s\npersistent_keepalive_interval=100\nendpoint=webrtc://datachannel\nallowed_ip=%s",
hex.EncodeToString(key[:]),
hex.EncodeToString(remoteKey[:]),
allowedIPs,
))
dev.Up()
if err != nil {
panic(err)
}
client := http.Client{
Transport: &http.Transport{
DialContext: tnet.DialContext,
},
}
time.Sleep(2 * time.Second)
if *cl {
req, _ := http.NewRequest("GET", "http://"+*remoteIP, nil)
//req.Header.Set("js.fetch:mode", "no-cors")
resp, err := client.Do(req)
if err != nil {
log.Panic(err)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Panic(err)
}
log.Printf(string(body))
log.Printf(resp.Status)
} else {
listener, err := tnet.ListenTCP(&net.TCPAddr{Port: 80})
if err != nil {
log.Panicln(err)
}
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
writer.Header().Set("Access-Control-Allow-Origin", "*")
writer.Header().Set("Access-Control-Allow-Headers", "Content-Type")
if (*request).Method == "OPTIONS" {
return
}
log.Printf("> %s - %s - %s", request.RemoteAddr, request.URL.String(), request.UserAgent())
io.WriteString(writer, "HELOOOOOOOOOOOOOOOOOOOO")
})
err = http.Serve(listener, nil)
if err != nil {
log.Panicln(err)
}
}
select {}
}

60
browser/test/test.go Normal file
View File

@@ -0,0 +1,60 @@
package main
import (
"encoding/hex"
"fmt"
"io"
"log"
"net"
"net/http"
"time"
"golang.zx2c4.com/wireguard/conn"
"golang.zx2c4.com/wireguard/device"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
func main() {
tun, tnet, err := netstack.CreateNetTUN(
[]net.IP{net.ParseIP("10.100.0.2")},
[]net.IP{net.ParseIP("8.8.8.8")},
1420)
if err != nil {
log.Panic(err)
}
clientKey, _ := wgtypes.ParseKey("WI+uoQD9jGi+nyifmFwmswQu5r0uWFH31WeSmfU0snI=")
publicServerkey, _ := wgtypes.ParseKey("Xp2HRQ1AJ1WbSrHV1NNHAIcmirLUjUh9jz3K3n4OcgQ=")
fmt.Printf(clientKey.PublicKey().String())
dev := device.NewDevice(tun, conn.NewDefaultBind(), device.NewLogger(device.LogLevelVerbose, ""))
err = dev.IpcSet(fmt.Sprintf("private_key=%s\npublic_key=%s\npersistent_keepalive_interval=1\nendpoint=65.21.255.241:51820\nallowed_ip=0.0.0.0/0",
hex.EncodeToString(clientKey[:]),
hex.EncodeToString(publicServerkey[:]),
))
if err != nil {
log.Panic(err)
}
err = dev.Up()
if err != nil {
log.Panic(err)
}
client := http.Client{
Transport: &http.Transport{
DialContext: tnet.DialContext,
},
}
resp, err := client.Get("https://httpbin.org/ip")
if err != nil {
log.Panic(err)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Panic(err)
}
log.Println(string(body))
time.Sleep(30 * time.Second)
}

View File

@@ -3,6 +3,7 @@ package cmd
import (
mgmtProto "github.com/wiretrustee/wiretrustee/management/proto"
mgmt "github.com/wiretrustee/wiretrustee/management/server"
"github.com/wiretrustee/wiretrustee/signal/peer"
sigProto "github.com/wiretrustee/wiretrustee/signal/proto"
sig "github.com/wiretrustee/wiretrustee/signal/server"
"google.golang.org/grpc"
@@ -16,7 +17,7 @@ func startSignal(t *testing.T) (*grpc.Server, net.Listener) {
t.Fatal(err)
}
s := grpc.NewServer()
sigProto.RegisterSignalExchangeServer(s, sig.NewServer())
sigProto.RegisterSignalExchangeServer(s, sig.NewServer(peer.NewRegistry()))
go func() {
if err := s.Serve(lis); err != nil {
panic(err)

View File

@@ -72,7 +72,7 @@ func createEngineConfig(key wgtypes.Key, config *internal.Config, peerConfig *mg
}
// connectToSignal creates Signal Service client and established a connection
func connectToSignal(ctx context.Context, wtConfig *mgmProto.WiretrusteeConfig, ourPrivateKey wgtypes.Key) (*signal.Client, error) {
func connectToSignal(ctx context.Context, wtConfig *mgmProto.WiretrusteeConfig, ourPrivateKey wgtypes.Key) (signal.Client, error) {
var sigTLSEnabled bool
if wtConfig.Signal.Protocol == mgmProto.HostConfig_HTTPS {
sigTLSEnabled = true

View File

@@ -35,7 +35,7 @@ type EngineConfig struct {
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
type Engine struct {
// signal is a Signal Service client
signal *signal.Client
signal signal.Client
// mgmClient is a Management Service client
mgmClient *mgm.Client
// conns is a collection of remote peer connections indexed by local public key of the remote peers
@@ -68,7 +68,7 @@ type Peer struct {
}
// NewEngine creates a new Connection Engine
func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *EngineConfig, cancel context.CancelFunc, ctx context.Context) *Engine {
func NewEngine(signalClient signal.Client, mgmClient *mgm.Client, config *EngineConfig, cancel context.CancelFunc, ctx context.Context) *Engine {
return &Engine{
signal: signalClient,
mgmClient: mgmClient,
@@ -262,7 +262,7 @@ func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*
return conn, nil
}
func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error {
func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client) error {
err := s.Send(&sProto.Message{
Key: myKey.PublicKey().String(),
RemoteKey: remoteKey.String(),
@@ -280,7 +280,7 @@ func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtyp
return nil
}
func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client, isAnswer bool) error {
func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client, isAnswer bool) error {
var t sProto.Body_Type
if isAnswer {

16
go.mod
View File

@@ -6,23 +6,27 @@ require (
github.com/cenkalti/backoff/v4 v4.1.0
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.2.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/kardianos/service v1.2.1-0.20210728001519-a323c3813bc7
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.13.0
github.com/pion/ice/v2 v2.1.7
github.com/pion/ice/v2 v2.1.13
github.com/pion/webrtc/v3 v3.1.7
github.com/rs/cors v1.8.0
github.com/sirupsen/logrus v1.7.0
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/vishvananda/netlink v1.1.0
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.zx2c4.com/wireguard v0.0.0-20210805125648-3957e9b9dd19
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359
golang.zx2c4.com/wireguard v0.0.0-20211026125340-e42c6c4bc2d0
golang.zx2c4.com/wireguard/tun/netstack v0.0.0-20211026125340-e42c6c4bc2d0
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20210803171230-4253848d036c
golang.zx2c4.com/wireguard/windows v0.4.5
google.golang.org/grpc v1.32.0
google.golang.org/grpc v1.39.0-dev.0.20210518002758-2713b77e8526
google.golang.org/protobuf v1.26.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
nhooyr.io/websocket v1.8.7
)

720
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -1,25 +1,11 @@
package client
import (
"context"
"crypto/tls"
"fmt"
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/encryption"
"github.com/wiretrustee/wiretrustee/signal/proto"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"io"
"strings"
"sync"
"time"
)
// A set of tools to exchange connection details (Wireguard endpoints) with the remote peer.
@@ -30,228 +16,29 @@ type Status string
const StreamConnected Status = "Connected"
const StreamDisconnected Status = "Disconnected"
// Client Wraps the Signal Exchange Service gRpc client
type Client struct {
key wgtypes.Key
realClient proto.SignalExchangeClient
signalConn *grpc.ClientConn
ctx context.Context
stream proto.SignalExchange_ConnectStreamClient
// connectedCh used to notify goroutines waiting for the connection to the Signal stream
connectedCh chan struct{}
mux sync.Mutex
// StreamConnected indicates whether this client is StreamConnected to the Signal stream
status Status
}
func (c *Client) GetStatus() Status {
return c.status
}
// Close Closes underlying connections to the Signal Exchange
func (c *Client) Close() error {
return c.signalConn.Close()
}
// NewClient creates a new Signal client
func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled bool) (*Client, error) {
transportOption := grpc.WithInsecure()
if tlsEnabled {
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
}
sigCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
conn, err := grpc.DialContext(
sigCtx,
addr,
transportOption,
grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 15 * time.Second,
Timeout: 10 * time.Second,
}))
if err != nil {
log.Errorf("failed to connect to the signalling server %v", err)
return nil, err
}
return &Client{
realClient: proto.NewSignalExchangeClient(conn),
ctx: ctx,
signalConn: conn,
key: key,
mux: sync.Mutex{},
status: StreamDisconnected,
}, nil
}
//defaultBackoff is a basic backoff mechanism for general issues
func defaultBackoff(ctx context.Context) backoff.BackOff {
return backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
}
// Receive Connects to the Signal Exchange message stream and starts receiving messages.
// The messages will be handled by msgHandler function provided.
// This function is blocking and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
// The connection retry logic will try to reconnect for 30 min and if wasn't successful will propagate the error to the function caller.
func (c *Client) Receive(msgHandler func(msg *proto.Message) error) error {
var backOff = defaultBackoff(c.ctx)
operation := func() error {
c.notifyStreamDisconnected()
log.Debugf("signal connection state %v", c.signalConn.GetState())
if !c.ready() {
return fmt.Errorf("no connection to signal")
}
// connect to Signal stream identifying ourselves with a public Wireguard key
// todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management)
stream, err := c.connect(c.key.PublicKey().String())
if err != nil {
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
return err
}
c.notifyStreamConnected()
log.Infof("connected to the Signal Service stream")
// start receiving messages from the Signal stream (from other peers through signal)
err = c.receive(stream, msgHandler)
if err != nil {
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
backOff.Reset()
return err
}
return nil
}
err := backoff.Retry(operation, backOff)
if err != nil {
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err)
return err
}
return nil
}
func (c *Client) notifyStreamDisconnected() {
c.mux.Lock()
defer c.mux.Unlock()
c.status = StreamDisconnected
}
func (c *Client) notifyStreamConnected() {
c.mux.Lock()
defer c.mux.Unlock()
c.status = StreamConnected
if c.connectedCh != nil {
// there are goroutines waiting on this channel -> release them
close(c.connectedCh)
c.connectedCh = nil
}
}
func (c *Client) getStreamStatusChan() <-chan struct{} {
c.mux.Lock()
defer c.mux.Unlock()
if c.connectedCh == nil {
c.connectedCh = make(chan struct{})
}
return c.connectedCh
}
func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) {
c.stream = nil
// add key fingerprint to the request header to be identified on the server side
md := metadata.New(map[string]string{proto.HeaderId: key})
ctx := metadata.NewOutgoingContext(c.ctx, md)
stream, err := c.realClient.ConnectStream(ctx, grpc.WaitForReady(true))
c.stream = stream
if err != nil {
return nil, err
}
// blocks
header, err := c.stream.Header()
if err != nil {
return nil, err
}
registered := header.Get(proto.HeaderRegistered)
if len(registered) == 0 {
return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
}
return stream, nil
}
// ready indicates whether the client is okay and ready to be used
// for now it just checks whether gRPC connection to the service is in state Ready
func (c *Client) ready() bool {
return c.signalConn.GetState() == connectivity.Ready
}
// WaitStreamConnected waits until the client is connected to the Signal stream
func (c *Client) WaitStreamConnected() {
if c.status == StreamConnected {
return
}
ch := c.getStreamStatusChan()
select {
case <-c.ctx.Done():
case <-ch:
}
}
// SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server
// The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange
// Client.connWg can be used to wait
func (c *Client) SendToStream(msg *proto.EncryptedMessage) error {
if !c.ready() {
return fmt.Errorf("no connection to signal")
}
if c.stream == nil {
return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages")
}
err := c.stream.Send(msg)
if err != nil {
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
return err
}
return nil
// Client is an interface describing Signal client
type Client interface {
// Receive handles incoming messages from the Signal service
Receive(msgHandler func(msg *proto.Message) error) error
Close() error
// Send sends a message to the Signal service (just one time rpc call, not stream)
Send(msg *proto.Message) error
// SendToStream sends a message to the Signal service through a connected stream
SendToStream(msg *proto.EncryptedMessage) error
// WaitStreamConnected blocks until client is connected to the Signal stream
WaitStreamConnected()
GetStatus() Status
}
// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key
func (c *Client) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) {
func decryptMessage(msg *proto.EncryptedMessage, wgPrivateKey wgtypes.Key) (*proto.Message, error) {
remoteKey, err := wgtypes.ParseKey(msg.GetKey())
if err != nil {
return nil, err
}
body := &proto.Body{}
err = encryption.DecryptMessage(remoteKey, c.key, msg.GetBody(), body)
err = encryption.DecryptMessage(remoteKey, wgPrivateKey, msg.GetBody(), body)
if err != nil {
return nil, err
}
@@ -264,14 +51,14 @@ func (c *Client) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, er
}
// encryptMessage encrypts the body of the msg using Wireguard private key and Remote peer's public key
func (c *Client) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, error) {
func encryptMessage(msg *proto.Message, wgPrivateKey wgtypes.Key) (*proto.EncryptedMessage, error) {
remoteKey, err := wgtypes.ParseKey(msg.RemoteKey)
if err != nil {
return nil, err
}
encryptedBody, err := encryption.EncryptMessage(remoteKey, c.key, msg.Body)
encryptedBody, err := encryption.EncryptMessage(remoteKey, wgPrivateKey, msg.Body)
if err != nil {
return nil, err
}
@@ -283,60 +70,6 @@ func (c *Client) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, er
}, nil
}
// Send sends a message to the remote Peer through the Signal Exchange.
func (c *Client) Send(msg *proto.Message) error {
if !c.ready() {
return fmt.Errorf("no connection to signal")
}
encryptedMessage, err := c.encryptMessage(msg)
if err != nil {
return err
}
_, err = c.realClient.Send(context.TODO(), encryptedMessage)
if err != nil {
//log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
return err
}
return nil
}
// receive receives messages from other peers coming through the Signal Exchange
func (c *Client) receive(stream proto.SignalExchange_ConnectStreamClient,
msgHandler func(msg *proto.Message) error) error {
for {
msg, err := stream.Recv()
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
log.Warnf("stream canceled (usually indicates shutdown)")
return err
} else if s.Code() == codes.Unavailable {
log.Warnf("Signal Service is unavailable")
return err
} else if err == io.EOF {
log.Warnf("Signal Service stream closed by server")
return err
} else if err != nil {
return err
}
log.Debugf("received a new message from Peer [fingerprint: %s]", msg.Key)
decryptedMessage, err := c.decryptMessage(msg)
if err != nil {
log.Errorf("failed decrypting message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
}
err = msgHandler(decryptedMessage)
if err != nil {
log.Errorf("error while handling message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
//todo send something??
}
}
}
// UnMarshalCredential parses the credentials from the message and returns a Credential instance
func UnMarshalCredential(msg *proto.Message) (*Credential, error) {

View File

@@ -5,6 +5,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/signal/peer"
sigProto "github.com/wiretrustee/wiretrustee/signal/proto"
"github.com/wiretrustee/wiretrustee/signal/server"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@@ -159,7 +160,7 @@ var _ = Describe("Client", func() {
})
func createSignalClient(addr string, key wgtypes.Key) *Client {
func createSignalClient(addr string, key wgtypes.Key) Client {
var sigTLSEnabled = false
client, err := NewClient(context.Background(), addr, key, sigTLSEnabled)
if err != nil {
@@ -189,7 +190,7 @@ func startSignal() (*grpc.Server, net.Listener) {
panic(err)
}
s := grpc.NewServer()
sigProto.RegisterSignalExchangeServer(s, server.NewServer())
sigProto.RegisterSignalExchangeServer(s, server.NewServer(peer.NewRegistry()))
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)

289
signal/client/grpc.go Normal file
View File

@@ -0,0 +1,289 @@
package client
import (
"context"
"crypto/tls"
"fmt"
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/signal/proto"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"io"
"sync"
"time"
)
// A set of tools to exchange connection details (Wireguard endpoints) with the remote peer.
// GrpcClient Wraps the Signal Exchange Service gRpc client
type GrpcClient struct {
key wgtypes.Key
realClient proto.SignalExchangeClient
signalConn *grpc.ClientConn
ctx context.Context
stream proto.SignalExchange_ConnectStreamClient
// connectedCh used to notify goroutines waiting for the connection to the Signal stream
connectedCh chan struct{}
mux sync.Mutex
// streamConnected indicates whether this GrpcClient is streamConnected to the Signal stream
status Status
}
func (c *GrpcClient) GetStatus() Status {
return c.status
}
// Close Closes underlying connections to the Signal Exchange
func (c *GrpcClient) Close() error {
return c.signalConn.Close()
}
// NewClient creates a new Signal GrpcClient
func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled bool) (*GrpcClient, error) {
transportOption := grpc.WithInsecure()
if tlsEnabled {
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
}
sigCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
conn, err := grpc.DialContext(
sigCtx,
addr,
transportOption,
grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 15 * time.Second,
Timeout: 10 * time.Second,
}))
if err != nil {
log.Errorf("failed to connect to the signalling server %v", err)
return nil, err
}
return &GrpcClient{
realClient: proto.NewSignalExchangeClient(conn),
ctx: ctx,
signalConn: conn,
key: key,
mux: sync.Mutex{},
status: StreamDisconnected,
}, nil
}
//defaultBackoff is a basic backoff mechanism for general issues
func defaultBackoff(ctx context.Context) backoff.BackOff {
return backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
}
// Receive Connects to the Signal Exchange message stream and starts receiving messages.
// The messages will be handled by msgHandler function provided.
// This function is blocking and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
// The connection retry logic will try to reconnect for 30 min and if wasn't successful will propagate the error to the function caller.
func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error {
var backOff = defaultBackoff(c.ctx)
operation := func() error {
c.notifyStreamDisconnected()
log.Debugf("signal connection state %v", c.signalConn.GetState())
if !c.ready() {
return fmt.Errorf("no connection to signal")
}
// connect to Signal stream identifying ourselves with a public Wireguard key
// todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management)
stream, err := c.connect(c.key.PublicKey().String())
if err != nil {
log.Warnf("streamDisconnected from the Signal Exchange due to an error: %v", err)
return err
}
c.notifyStreamConnected()
log.Infof("streamConnected to the Signal Service stream")
// start receiving messages from the Signal stream (from other peers through signal)
err = c.receive(stream, msgHandler)
if err != nil {
log.Warnf("streamDisconnected from the Signal Exchange due to an error: %v", err)
backOff.Reset()
return err
}
return nil
}
err := backoff.Retry(operation, backOff)
if err != nil {
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err)
return err
}
return nil
}
func (c *GrpcClient) notifyStreamDisconnected() {
c.mux.Lock()
defer c.mux.Unlock()
c.status = StreamDisconnected
}
func (c *GrpcClient) notifyStreamConnected() {
c.mux.Lock()
defer c.mux.Unlock()
c.status = StreamConnected
if c.connectedCh != nil {
// there are goroutines waiting on this channel -> release them
close(c.connectedCh)
c.connectedCh = nil
}
}
func (c *GrpcClient) getStreamStatusChan() <-chan struct{} {
c.mux.Lock()
defer c.mux.Unlock()
if c.connectedCh == nil {
c.connectedCh = make(chan struct{})
}
return c.connectedCh
}
func (c *GrpcClient) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) {
c.stream = nil
// add key fingerprint to the request header to be identified on the server side
md := metadata.New(map[string]string{proto.HeaderId: key})
ctx := metadata.NewOutgoingContext(c.ctx, md)
stream, err := c.realClient.ConnectStream(ctx, grpc.WaitForReady(true))
c.stream = stream
if err != nil {
return nil, err
}
// blocks
header, err := c.stream.Header()
if err != nil {
return nil, err
}
registered := header.Get(proto.HeaderRegistered)
if len(registered) == 0 {
return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
}
return stream, nil
}
// ready indicates whether the client is okay and ready to be used
// for now it just checks whether gRPC connection to the service is in state Ready
func (c *GrpcClient) ready() bool {
return c.signalConn.GetState() == connectivity.Ready
}
// WaitStreamConnected waits until the client is connected to the Signal stream
func (c *GrpcClient) WaitStreamConnected() {
if c.status == StreamConnected {
return
}
ch := c.getStreamStatusChan()
select {
case <-c.ctx.Done():
case <-ch:
}
}
// SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server
// The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange
// Client.connWg can be used to wait
func (c *GrpcClient) SendToStream(msg *proto.EncryptedMessage) error {
if !c.ready() {
return fmt.Errorf("no connection to signal")
}
if c.stream == nil {
return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages")
}
err := c.stream.Send(msg)
if err != nil {
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
return err
}
return nil
}
// Send sends a message to the remote Peer through the Signal Exchange.
func (c *GrpcClient) Send(msg *proto.Message) error {
if !c.ready() {
return fmt.Errorf("no connection to signal")
}
encryptedMessage, err := encryptMessage(msg, c.key)
if err != nil {
return err
}
_, err = c.realClient.Send(c.ctx, encryptedMessage)
if err != nil {
return err
}
return nil
}
// receive receives messages from other peers coming through the Signal Exchange
func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient,
msgHandler func(msg *proto.Message) error) error {
for {
msg, err := stream.Recv()
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
log.Warnf("stream canceled (usually indicates shutdown)")
return err
} else if s.Code() == codes.Unavailable {
log.Warnf("Signal Service is unavailable")
return err
} else if err == io.EOF {
log.Warnf("Signal Service stream closed by server")
return err
} else if err != nil {
return err
}
log.Debugf("received a new message from Peer [fingerprint: %s]", msg.Key)
decryptedMessage, err := decryptMessage(msg, c.key)
if err != nil {
log.Errorf("failed decrypting message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
}
err = msgHandler(decryptedMessage)
if err != nil {
log.Errorf("error while handling message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
//todo send something??
}
}
}

114
signal/client/websocket.go Normal file
View File

@@ -0,0 +1,114 @@
package client
import (
"context"
"encoding/base64"
pb "github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/signal/proto"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"nhooyr.io/websocket"
"sync"
"time"
)
//WebsocketClient is a Signal server websocket client (alternative to the original gRPC Client)
type WebsocketClient struct {
key wgtypes.Key
ctx context.Context
conn *websocket.Conn
status Status
mu sync.Mutex
}
func (c *WebsocketClient) GetStatus() Status {
return c.status
}
func NewWebsocketClient(ctx context.Context, endpoint string, wgPrivateKey wgtypes.Key) (*WebsocketClient, error) {
sigCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// generate peer identifier from our public key and base64 encode it because it will be transferred via URL param
peerId := base64.URLEncoding.EncodeToString([]byte(wgPrivateKey.PublicKey().String()))
conn, res, err := websocket.Dial(sigCtx, endpoint+"?id="+peerId, &websocket.DialOptions{})
if err != nil {
log.Errorf("failed to connect to the Signal Websocket server %v - %v", err, res)
return nil, err
}
return &WebsocketClient{
key: wgPrivateKey,
ctx: ctx,
conn: conn,
status: StreamConnected,
mu: sync.Mutex{},
}, nil
}
func (c *WebsocketClient) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
c.status = StreamDisconnected
return c.conn.Close(websocket.StatusNormalClosure, "close")
}
func (c *WebsocketClient) Receive(msgHandler func(msg *proto.Message) error) error {
for {
_, byteMsg, err := c.conn.Read(c.ctx)
if err != nil {
log.Errorf("failed reading message from Signal Websocket %v", err)
time.Sleep(2 * time.Second)
//todo propagate to the upper layer and retry
return err
}
encryptedMsg := &proto.EncryptedMessage{}
err = pb.Unmarshal(byteMsg, encryptedMsg)
if err != nil {
log.Errorf("failed unmarshalling message from Signal Websocket %v", err)
continue
}
remotePubKey := encryptedMsg.Key
log.Debugf("received a new message from Peer %s received via Websocket", remotePubKey)
decryptedMsg, err := decryptMessage(encryptedMsg, c.key)
if err != nil {
log.Errorf("failed decrypting a message from peer %s received via Websocket %v", remotePubKey, err)
}
err = msgHandler(decryptedMsg)
if err != nil {
log.Errorf("error while handling message from peer %s %v", remotePubKey, err)
//todo send something??
}
}
}
func (c *WebsocketClient) SendToStream(msg *proto.EncryptedMessage) error {
bytesMsg, err := pb.Marshal(msg)
if err != nil {
log.Errorf("failed marshalling message %v", err)
return err
}
return c.conn.Write(c.ctx, websocket.MessageBinary, bytesMsg)
}
func (c *WebsocketClient) Send(msg *proto.Message) error {
encryptedMessage, err := encryptMessage(msg, c.key)
if err != nil {
return err
}
return c.SendToStream(encryptedMessage)
}
func (c *WebsocketClient) WaitStreamConnected() {
}

View File

@@ -1,20 +1,21 @@
package cmd
import (
"context"
"flag"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/wiretrustee/wiretrustee/encryption"
"github.com/wiretrustee/wiretrustee/signal/peer"
"github.com/wiretrustee/wiretrustee/signal/proto"
"github.com/wiretrustee/wiretrustee/signal/server"
"github.com/wiretrustee/wiretrustee/signal/server/http"
"github.com/wiretrustee/wiretrustee/util"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"net"
"net/http"
"os"
"time"
)
@@ -45,25 +46,20 @@ var (
log.Fatalf("failed initializing log %v", err)
}
registry := peer.NewRegistry()
var opts []grpc.ServerOption
var httpServer *http.Server
if signalLetsencryptDomain != "" {
if _, err := os.Stat(signalSSLDir); os.IsNotExist(err) {
err = os.MkdirAll(signalSSLDir, os.ModeDir)
if err != nil {
log.Fatalf("failed creating datadir: %s: %v", signalSSLDir, err)
}
}
//automatically generate a new certificate with Let's Encrypt
certManager := encryption.CreateCertManager(signalSSLDir, signalLetsencryptDomain)
transportCredentials := credentials.NewTLS(certManager.TLSConfig())
opts = append(opts, grpc.Creds(transportCredentials))
listener := certManager.Listener()
log.Infof("http server listening on %s", listener.Addr())
go func() {
if err := http.Serve(listener, certManager.HTTPHandler(nil)); err != nil {
log.Errorf("failed to serve https server: %v", err)
}
}()
httpServer = http.NewHttpsServer("0.0.0.0:443", certManager, registry)
} else {
httpServer = http.NewHttpServer("0.0.0.0:80", registry)
}
opts = append(opts, signalKaep, signalKasp)
@@ -78,15 +74,34 @@ var (
log.Fatalf("failed to listen: %v", err)
}
proto.RegisterSignalExchangeServer(grpcServer, server.NewServer())
log.Printf("started server: localhost:%v", signalPort)
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
proto.RegisterSignalExchangeServer(grpcServer, server.NewServer(registry))
log.Printf("gRPC server listening on 0.0.0.0:%v", signalPort)
go func() {
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
go func() {
err = httpServer.Start()
if err != nil {
log.Fatalf("failed to serve http server: %v", err)
}
}()
SetupCloseHandler()
<-stopCh
log.Println("Receive signal to stop running the Signal server")
log.Println("received signal to stop running the Signal server")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = httpServer.Stop(ctx)
if err != nil {
log.Fatalf("failed stopping the http server %v", err)
}
grpcServer.Stop()
},
}
)

View File

@@ -1,22 +1,46 @@
package peer
import (
pb "github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/signal/proto"
"sync"
)
//Channel abstracts transport that Peer is using to communicate with teh Signal server.
//There are 2 types channels so far: gRPC- and websocket-based.
type Channel interface {
Send(msg *proto.EncryptedMessage) error
}
type WebsocketChannel struct {
conn *websocket.Conn
}
func NewWebsocketChannel(conn *websocket.Conn) *WebsocketChannel {
return &WebsocketChannel{conn: conn}
}
func (c *WebsocketChannel) Send(msg *proto.EncryptedMessage) error {
b, err := pb.Marshal(msg)
if err != nil {
return err
}
return c.conn.WriteMessage(websocket.BinaryMessage, b)
}
// Peer representation of a connected Peer
type Peer struct {
// a unique id of the Peer (e.g. sha256 fingerprint of the Wireguard public key)
Id string
//a gRpc connection stream to the Peer
Stream proto.SignalExchange_ConnectStreamServer
//a connection stream to the Peer (gRPC or websocket)
Stream Channel
}
// NewPeer creates a new instance of a connected Peer
func NewPeer(id string, stream proto.SignalExchange_ConnectStreamServer) *Peer {
// NewPeer creates a new instance of a Peer connected with gRPC
func NewPeer(id string, stream Channel) *Peer {
return &Peer{
Id: id,
Stream: stream,

View File

@@ -0,0 +1,149 @@
package http
import (
"context"
"encoding/base64"
pb "github.com/golang/protobuf/proto" //nolint
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/signal/peer"
"github.com/wiretrustee/wiretrustee/signal/proto"
"golang.org/x/crypto/acme/autocert"
"net/http"
"time"
)
type Server struct {
server *http.Server
certManager *autocert.Manager
registry *peer.Registry
}
func NewHttpsServer(addr string, certManager *autocert.Manager, registry *peer.Registry) *Server {
server := &http.Server{
Addr: addr,
WriteTimeout: time.Second * 15,
ReadTimeout: time.Second * 15,
IdleTimeout: time.Second * 60,
}
return &Server{
server: server,
certManager: certManager,
registry: registry,
}
}
func NewHttpServer(addr string, registry *peer.Registry) *Server {
return NewHttpsServer(addr, nil, registry)
}
// Stop stops the http server
func (s *Server) Stop(ctx context.Context) error {
err := s.server.Shutdown(ctx)
if err != nil {
return err
}
return nil
}
func (s *Server) Start() error {
r := mux.NewRouter()
r.HandleFunc("/signal", func(w http.ResponseWriter, r *http.Request) {
s.serveWs(w, r)
})
http.Handle("/", r)
if s.certManager != nil {
// if HTTPS is enabled we reuse the listener from the cert manager
listener := s.certManager.Listener()
log.Infof("HTTPs server listening on %s with Let's Encrypt autocert configured", listener.Addr())
if err := http.Serve(listener, s.certManager.HTTPHandler(r)); err != nil {
log.Errorf("failed to serve https server: %v", err)
return err
}
} else {
log.Infof("HTTP server listening on %s", s.server.Addr)
if err := s.server.ListenAndServe(); err != nil {
log.Errorf("failed to serve http server: %v", err)
return err
}
}
return nil
}
// serveWs handles websocket requests from the peer.
func (s *Server) serveWs(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true //TODO not good to allow everything
},
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Errorf("failed upgrading Websocket request %v", err)
//http.Error(w, "failed upgrading Websocket request", http.StatusInternalServerError)
return
}
params := r.URL.Query()
peerId := params.Get("id")
if peerId == "" {
log.Warn("required Websocket query id parameter is missing")
//http.Error(w, "required Websocket query id parameter is missing", http.StatusBadRequest)
conn.Close()
return
}
decodeString, err := base64.URLEncoding.DecodeString(peerId)
if err != nil {
conn.Close()
return
}
channel := peer.NewWebsocketChannel(conn)
p := peer.NewPeer(string(decodeString), channel)
s.registry.Register(p)
defer func() {
s.registry.Deregister(p)
conn.Close()
}()
conn.SetReadLimit(1024 * 1024 * 3)
for {
_, byteMsg, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Errorf("error: %v", err)
}
break
}
msg := &proto.EncryptedMessage{}
err = pb.Unmarshal(byteMsg, msg)
if err != nil {
//todo
return
}
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
//forward the message to the target peer
err := dstPeer.Stream.Send(msg)
if err != nil {
log.Errorf("error while forwarding message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey)
//todo respond to the sender?
} else {
log.Debugf("forwarded message from peer %s to peer %s", msg.Key, msg.RemoteKey)
}
} else {
log.Warnf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey)
//todo respond to the sender?
}
}
}

View File

@@ -19,9 +19,9 @@ type Server struct {
}
// NewServer creates a new Signal server
func NewServer() *Server {
func NewServer(registry *peer.Registry) *Server {
return &Server{
registry: peer.NewRegistry(),
registry: registry,
}
}