Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions cmd/fmsgd/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,14 @@ func downloadMessage(c net.Conn, r io.Reader, h *FMsgHeader, skipData bool) erro
return fmt.Errorf("%w actual hash: %s mismatch challenge response: %s", ErrProtocolViolation, actualHashStr, challengeHashStr)
}

// Duplicate detection keys on the message's canonical identity (msg.sha256),
// which for an add-to delivery is its original-form hash — not the add-to
// variant verified against the challenge above.
dupHash, err := canonicalMsgHash(h)
if err != nil {
return err
}

// pid/add-to validation is handled during header exchange in readHeader().

// determine file extension from MIME type
Expand Down Expand Up @@ -1423,7 +1431,7 @@ func downloadMessage(c net.Conn, r io.Reader, h *FMsgHeader, skipData bool) erro
acceptedAddTo := []FMsgAddress{}
var primaryFilepath string
for i, addr := range addrs {
code, err := validateMsgRecvForAddr(h, &addr, msgHash)
code, err := validateMsgRecvForAddr(h, &addr, dupHash)
if err != nil {
return err
}
Expand Down Expand Up @@ -1552,7 +1560,14 @@ func handleConn(c net.Conn) {
if header.ChallengeCompleted && header.InitialResponseCode != AcceptCodeAddTo {
addrs := localRecipients(header)
var err error
allLocalDup, err = allLocalRecipientsHaveMessageHash(header.ChallengeHash[:], addrs)
// Duplicate detection keys on msg.sha256 (the canonical original-form
// hash). For an add-to delivery that is header.Pid; the challenge hash
// is the add-to variant and would not match a stored row.
dupHash := header.ChallengeHash[:]
if header.Flags&FlagHasAddTo != 0 {
dupHash = header.Pid
}
allLocalDup, err = allLocalRecipientsHaveMessageHash(dupHash, addrs)
if err != nil {
log.Printf("ERROR: duplicate check failed for %s: %s", c.RemoteAddr().String(), err)
if err := sendCode(c, RejectCodeUndisclosed); err != nil {
Expand Down
15 changes: 4 additions & 11 deletions cmd/fmsgd/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,18 +368,11 @@ func deliverMessage(target pendingTarget) {

// Ensure sha256 is populated for this message so future pid lookups
// (e.g. add-to notifications or replies referencing it) can find it.
// For an add-to message the row being delivered IS the shared message,
// whose original-form hash loadMsg has already placed in h.Pid; calling
// h.GetMessageHash() here would instead hash the add-to variant.
var msgHash []byte
if h.Flags&FlagHasAddTo != 0 {
msgHash = h.Pid
} else {
msgHash, err = h.GetMessageHash()
if err != nil {
log.Printf("ERROR: sender: computing message hash for msg %d: %s", target.MsgID, err)
return
}
msgHash, err = canonicalMsgHash(h)
if err != nil {
log.Printf("ERROR: sender: computing message hash for msg %d: %s", target.MsgID, err)
return
}
if _, err := tx.Exec(`UPDATE msg SET sha256 = $1 WHERE id = $2 AND sha256 IS NULL`,
msgHash, target.MsgID); err != nil {
Expand Down
118 changes: 112 additions & 6 deletions cmd/fmsgd/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,29 @@ func wirePidForLoadedMessage(storedParentHash []byte, msgHash []byte, hasAddTo b
return storedParentHash
}

// canonicalMsgHash returns the original-form message hash that is a message's
// stable identity: it is stored in msg.sha256 and is what replies carry as
// their wire pid. For an add-to message the row IS the shared message and its
// original-form hash is already in msg.Pid (the add-to wire pid, SPEC §12);
// GetMessageHash() there would instead hash the add-to variant and also needs
// the message payload, which the code-11 path never downloads.
func canonicalMsgHash(msg *FMsgHeader) ([]byte, error) {
if msg.Flags&FlagHasAddTo != 0 {
return msg.Pid, nil
}
return msg.GetMessageHash()
}

// relationalParentHash returns the hash of the message this one is a reply to,
// or nil when there is none. An add-to message's Pid is its own identity, not
// a parent pointer (SPEC §12), so it must never be resolved as a parent.
func relationalParentHash(msg *FMsgHeader) []byte {
if msg.Flags&FlagHasAddTo != 0 {
return nil
}
return msg.Pid
}

// getMsgByID loads a message and all its recipients from the database by msg ID.
// Returns the full FMsgHeader or nil if the message doesn't exist.
func getMsgByID(msgID int64) (*FMsgHeader, error) {
Expand Down Expand Up @@ -204,6 +227,63 @@ func getMsgByID(msgID int64) (*FMsgHeader, error) {
return h, nil
}

// existingMsgIDForAddTo returns the id of an already-stored message row whose
// canonical sha256 matches msgHash, for an add-to delivery. It returns 0 when
// the message is not an add-to message or no such row exists, so the caller
// falls through to a normal INSERT. Non-add-to messages never take the attach
// path: a colliding sha256 there is a genuine duplicate, not a shared message.
func existingMsgIDForAddTo(tx *sql.Tx, msg *FMsgHeader, msgHash []byte) (int64, error) {
if msg.Flags&FlagHasAddTo == 0 || len(msgHash) == 0 {
return 0, nil
}
var id int64
err := tx.QueryRow("SELECT id FROM msg WHERE sha256 = $1", msgHash).Scan(&id)
if err == sql.ErrNoRows {
return 0, nil
}
return id, err
}

// attachAddToRecipients extends an already-stored message with the recipients
// carried by an add-to delivery, marking those on our domain delivered. It is
// the add-to counterpart of an INSERT: when a host already holds the shared
// message, an add-to delivery must grow its recipient list rather than insert
// a second row under the same unique canonical sha256 (SPEC §12).
func attachAddToRecipients(tx *sql.Tx, msgID int64, msg *FMsgHeader) error {
now := timeutil.TimestampNow().Float64()

add := func(table string, addr FMsgAddress) error {
var delivered interface{}
if addr.Domain == Domain {
delivered = now
}
_, err := tx.Exec(`insert into `+table+` (msg_id, addr, time_delivered)
values ($1, $2, $3)
on conflict (msg_id, addr) do nothing`, msgID, addr.ToString(), delivered)
return err
}

for _, addr := range msg.To {
if err := add("msg_to", addr); err != nil {
return err
}
}
for _, addr := range msg.AddTo {
if err := add("msg_add_to", addr); err != nil {
return err
}
}

if msg.AddToFrom != nil {
if _, err := tx.Exec(`update msg set add_to_from = $1
where id = $2 and (add_to_from is null or add_to_from = '')`,
msg.AddToFrom.ToString(), msgID); err != nil {
return err
}
}
return nil
}

func storeMsgDetail(msg *FMsgHeader) error {

db, err := sql.Open("postgres", "")
Expand All @@ -218,10 +298,23 @@ func storeMsgDetail(msg *FMsgHeader) error {
}
defer tx.Rollback()

msgHash, err := msg.GetMessageHash()
msgHash, err := canonicalMsgHash(msg)
if err != nil {
return err
}
parentHash := relationalParentHash(msg)

// An add-to delivery for a message this host already holds extends the
// existing row's recipient list; inserting again would collide on the
// unique canonical sha256 (SPEC §12).
if existingID, err := existingMsgIDForAddTo(tx, msg, msgHash); err != nil {
return err
} else if existingID != 0 {
if err := attachAddToRecipients(tx, existingID, msg); err != nil {
return err
}
return tx.Commit()
}

var addToFrom interface{}
if msg.AddToFrom != nil {
Expand Down Expand Up @@ -254,7 +347,7 @@ returning id`,
msg.Topic,
msg.Type,
msgHash,
msg.Pid,
parentHash,
int(msg.Size),
msg.Filepath).Scan(&msgID)
if err != nil {
Expand Down Expand Up @@ -316,7 +409,7 @@ values ($1, $2, $3, $4, $5, $6, $7)`)
}
}

if err := resolveMsgParentLinks(tx, msgID, msgHash, msg.Pid, requiresStoredParent(msg)); err != nil {
if err := resolveMsgParentLinks(tx, msgID, msgHash, parentHash, requiresStoredParent(msg)); err != nil {
return err
}

Expand All @@ -340,10 +433,23 @@ func storeMsgHeaderOnly(msg *FMsgHeader) error {
}
defer tx.Rollback()

msgHash, err := msg.GetMessageHash()
msgHash, err := canonicalMsgHash(msg)
if err != nil {
return err
}
parentHash := relationalParentHash(msg)

// An add-to delivery for a message this host already holds extends the
// existing row's recipient list; inserting again would collide on the
// unique canonical sha256 (SPEC §12).
if existingID, err := existingMsgIDForAddTo(tx, msg, msgHash); err != nil {
return err
} else if existingID != 0 {
if err := attachAddToRecipients(tx, existingID, msg); err != nil {
return err
}
return tx.Commit()
}

var addToFrom interface{}
if msg.AddToFrom != nil {
Expand Down Expand Up @@ -376,7 +482,7 @@ returning id`,
msg.Topic,
msg.Type,
msgHash,
msg.Pid,
parentHash,
int(msg.Size),
"").Scan(&msgID)
if err != nil {
Expand Down Expand Up @@ -425,7 +531,7 @@ values ($1, $2, $3, $4, $5, $6, $7)`)
}
}

if err := resolveMsgParentLinks(tx, msgID, msgHash, msg.Pid, requiresStoredParent(msg)); err != nil {
if err := resolveMsgParentLinks(tx, msgID, msgHash, parentHash, requiresStoredParent(msg)); err != nil {
return err
}

Expand Down
44 changes: 44 additions & 0 deletions cmd/fmsgd/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,47 @@ func TestWirePidForLoadedMessageReplyKeepsParentHash(t *testing.T) {
t.Fatalf("reply wire pid = %v, want parent hash %v", got, parentHash)
}
}

// An add-to message's row IS the shared message; its canonical hash is the
// original-form hash carried in Pid, not the add-to variant. Storing the
// variant would make replies reference a hash the origin host never knows.
func TestCanonicalMsgHashAddToUsesPid(t *testing.T) {
origHash := []byte{9, 8, 7, 6}

got, err := canonicalMsgHash(&FMsgHeader{Flags: FlagHasPid | FlagHasAddTo, Pid: origHash})
if err != nil {
t.Fatalf("canonicalMsgHash returned error: %v", err)
}
if !bytes.Equal(got, origHash) {
t.Fatalf("add-to canonical hash = %v, want original-form hash %v", got, origHash)
}
}

// A non-add-to message must never take the add-to attach path: a colliding
// sha256 there is a genuine duplicate, not the shared message being extended.
// existingMsgIDForAddTo short-circuits before touching the database for it.
func TestExistingMsgIDForAddToSkipsNonAddTo(t *testing.T) {
id, err := existingMsgIDForAddTo(nil, &FMsgHeader{Flags: FlagHasPid}, []byte{1, 2, 3})
if err != nil {
t.Fatalf("existingMsgIDForAddTo returned error: %v", err)
}
if id != 0 {
t.Fatalf("non-add-to message returned existing id %d, want 0", id)
}
}

// An add-to message's Pid identifies the message itself, not a parent, so it
// must not be resolved as a relational parent. A plain reply's Pid is a parent.
func TestRelationalParentHashAddToHasNoParent(t *testing.T) {
pid := []byte{1, 2, 3}

if got := relationalParentHash(&FMsgHeader{Flags: FlagHasPid | FlagHasAddTo, Pid: pid}); got != nil {
t.Fatalf("add-to relational parent = %v, want nil", got)
}
if got := relationalParentHash(&FMsgHeader{Flags: FlagHasPid, Pid: pid}); !bytes.Equal(got, pid) {
t.Fatalf("reply relational parent = %v, want %v", got, pid)
}
if got := relationalParentHash(&FMsgHeader{}); got != nil {
t.Fatalf("new-thread relational parent = %v, want nil", got)
}
}
35 changes: 27 additions & 8 deletions dd.sql
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,26 @@ create trigger trg_msg_prevent_unreferenceable_parent
before update of time_sent, sha256 on msg
for each row execute function prevent_referenced_msg_from_becoming_unreferenceable();

-- notify when a new msg_to row is inserted with null time_delivered so the
-- sender can pick it up immediately instead of waiting for the next poll.
create or replace function notify_msg_to_insert() returns trigger as $$
-- Notify the sender's outgoing worker (channel new_msg_to) whenever new
-- delivery work appears. One function serves all three triggers, dispatching
-- on the table it fired for:
-- * msg -- a draft message transitions to sent (time_sent set
-- for the first time); notify every recipient.
-- * msg_to/msg_add_to -- a recipient row is inserted against an already-sent
-- message (recipients added via add-to after the
-- message was sent); notify that recipient.
-- The payload is advisory only: the worker re-polls fully on any wake-up.
create or replace function notify_msg_sent() returns trigger as $$
begin
if NEW.time_delivered is null then
if TG_TABLE_NAME = 'msg' then
if OLD.time_sent is null and NEW.time_sent is not null then
perform pg_notify('new_msg_to', NEW.id::text || ',' || addr)
from msg_to where msg_id = NEW.id;

perform pg_notify('new_msg_to', NEW.id::text || ',' || addr)
from msg_add_to where msg_id = NEW.id;
end if;
elsif NEW.time_delivered is null then
perform pg_notify('new_msg_to', NEW.msg_id::text || ',' || NEW.addr)
from msg where id = NEW.msg_id and time_sent is not null;
end if;
Expand All @@ -147,11 +162,15 @@ $$ language plpgsql;
drop trigger if exists trg_msg_to_insert on msg_to;
create trigger trg_msg_to_insert
after insert on msg_to
for each row execute function notify_msg_to_insert();
for each row execute function notify_msg_sent();

-- notify when a new msg_add_to row is inserted with null time_delivered so the
-- sender can pick it up immediately instead of waiting for the next poll.
drop trigger if exists trg_msg_add_to_insert on msg_add_to;
create trigger trg_msg_add_to_insert
after insert on msg_add_to
for each row execute function notify_msg_to_insert();
for each row execute function notify_msg_sent();

drop trigger if exists trg_msg_sent on msg;
create trigger trg_msg_sent
after update on msg
for each row execute function notify_msg_sent();

Loading