diff --git a/cmd/fmsgd/host.go b/cmd/fmsgd/host.go index 194fed9..0235dbe 100644 --- a/cmd/fmsgd/host.go +++ b/cmd/fmsgd/host.go @@ -1396,6 +1396,14 @@ func downloadMessage(c net.Conn, r io.Reader, h *FMsgHeader, skipData bool) erro return fmt.Errorf("%w actual hash: %s mismatch challenge response: %s", ErrProtocolViolation, actualHashStr, challengeHashStr) } + // Duplicate detection keys on the message's canonical identity (msg.sha256), + // which for an add-to delivery is its original-form hash — not the add-to + // variant verified against the challenge above. + dupHash, err := canonicalMsgHash(h) + if err != nil { + return err + } + // pid/add-to validation is handled during header exchange in readHeader(). // determine file extension from MIME type @@ -1423,7 +1431,7 @@ func downloadMessage(c net.Conn, r io.Reader, h *FMsgHeader, skipData bool) erro acceptedAddTo := []FMsgAddress{} var primaryFilepath string for i, addr := range addrs { - code, err := validateMsgRecvForAddr(h, &addr, msgHash) + code, err := validateMsgRecvForAddr(h, &addr, dupHash) if err != nil { return err } @@ -1552,7 +1560,14 @@ func handleConn(c net.Conn) { if header.ChallengeCompleted && header.InitialResponseCode != AcceptCodeAddTo { addrs := localRecipients(header) var err error - allLocalDup, err = allLocalRecipientsHaveMessageHash(header.ChallengeHash[:], addrs) + // Duplicate detection keys on msg.sha256 (the canonical original-form + // hash). For an add-to delivery that is header.Pid; the challenge hash + // is the add-to variant and would not match a stored row. + dupHash := header.ChallengeHash[:] + if header.Flags&FlagHasAddTo != 0 { + dupHash = header.Pid + } + allLocalDup, err = allLocalRecipientsHaveMessageHash(dupHash, addrs) if err != nil { log.Printf("ERROR: duplicate check failed for %s: %s", c.RemoteAddr().String(), err) if err := sendCode(c, RejectCodeUndisclosed); err != nil { diff --git a/cmd/fmsgd/sender.go b/cmd/fmsgd/sender.go index fb30d3a..1f0a933 100644 --- a/cmd/fmsgd/sender.go +++ b/cmd/fmsgd/sender.go @@ -368,18 +368,11 @@ func deliverMessage(target pendingTarget) { // Ensure sha256 is populated for this message so future pid lookups // (e.g. add-to notifications or replies referencing it) can find it. - // For an add-to message the row being delivered IS the shared message, - // whose original-form hash loadMsg has already placed in h.Pid; calling - // h.GetMessageHash() here would instead hash the add-to variant. var msgHash []byte - if h.Flags&FlagHasAddTo != 0 { - msgHash = h.Pid - } else { - msgHash, err = h.GetMessageHash() - if err != nil { - log.Printf("ERROR: sender: computing message hash for msg %d: %s", target.MsgID, err) - return - } + msgHash, err = canonicalMsgHash(h) + if err != nil { + log.Printf("ERROR: sender: computing message hash for msg %d: %s", target.MsgID, err) + return } if _, err := tx.Exec(`UPDATE msg SET sha256 = $1 WHERE id = $2 AND sha256 IS NULL`, msgHash, target.MsgID); err != nil { diff --git a/cmd/fmsgd/store.go b/cmd/fmsgd/store.go index 825f7f1..830157d 100644 --- a/cmd/fmsgd/store.go +++ b/cmd/fmsgd/store.go @@ -176,6 +176,29 @@ func wirePidForLoadedMessage(storedParentHash []byte, msgHash []byte, hasAddTo b return storedParentHash } +// canonicalMsgHash returns the original-form message hash that is a message's +// stable identity: it is stored in msg.sha256 and is what replies carry as +// their wire pid. For an add-to message the row IS the shared message and its +// original-form hash is already in msg.Pid (the add-to wire pid, SPEC §12); +// GetMessageHash() there would instead hash the add-to variant and also needs +// the message payload, which the code-11 path never downloads. +func canonicalMsgHash(msg *FMsgHeader) ([]byte, error) { + if msg.Flags&FlagHasAddTo != 0 { + return msg.Pid, nil + } + return msg.GetMessageHash() +} + +// relationalParentHash returns the hash of the message this one is a reply to, +// or nil when there is none. An add-to message's Pid is its own identity, not +// a parent pointer (SPEC §12), so it must never be resolved as a parent. +func relationalParentHash(msg *FMsgHeader) []byte { + if msg.Flags&FlagHasAddTo != 0 { + return nil + } + return msg.Pid +} + // getMsgByID loads a message and all its recipients from the database by msg ID. // Returns the full FMsgHeader or nil if the message doesn't exist. func getMsgByID(msgID int64) (*FMsgHeader, error) { @@ -204,6 +227,63 @@ func getMsgByID(msgID int64) (*FMsgHeader, error) { return h, nil } +// existingMsgIDForAddTo returns the id of an already-stored message row whose +// canonical sha256 matches msgHash, for an add-to delivery. It returns 0 when +// the message is not an add-to message or no such row exists, so the caller +// falls through to a normal INSERT. Non-add-to messages never take the attach +// path: a colliding sha256 there is a genuine duplicate, not a shared message. +func existingMsgIDForAddTo(tx *sql.Tx, msg *FMsgHeader, msgHash []byte) (int64, error) { + if msg.Flags&FlagHasAddTo == 0 || len(msgHash) == 0 { + return 0, nil + } + var id int64 + err := tx.QueryRow("SELECT id FROM msg WHERE sha256 = $1", msgHash).Scan(&id) + if err == sql.ErrNoRows { + return 0, nil + } + return id, err +} + +// attachAddToRecipients extends an already-stored message with the recipients +// carried by an add-to delivery, marking those on our domain delivered. It is +// the add-to counterpart of an INSERT: when a host already holds the shared +// message, an add-to delivery must grow its recipient list rather than insert +// a second row under the same unique canonical sha256 (SPEC §12). +func attachAddToRecipients(tx *sql.Tx, msgID int64, msg *FMsgHeader) error { + now := timeutil.TimestampNow().Float64() + + add := func(table string, addr FMsgAddress) error { + var delivered interface{} + if addr.Domain == Domain { + delivered = now + } + _, err := tx.Exec(`insert into `+table+` (msg_id, addr, time_delivered) +values ($1, $2, $3) +on conflict (msg_id, addr) do nothing`, msgID, addr.ToString(), delivered) + return err + } + + for _, addr := range msg.To { + if err := add("msg_to", addr); err != nil { + return err + } + } + for _, addr := range msg.AddTo { + if err := add("msg_add_to", addr); err != nil { + return err + } + } + + if msg.AddToFrom != nil { + if _, err := tx.Exec(`update msg set add_to_from = $1 +where id = $2 and (add_to_from is null or add_to_from = '')`, + msg.AddToFrom.ToString(), msgID); err != nil { + return err + } + } + return nil +} + func storeMsgDetail(msg *FMsgHeader) error { db, err := sql.Open("postgres", "") @@ -218,10 +298,23 @@ func storeMsgDetail(msg *FMsgHeader) error { } defer tx.Rollback() - msgHash, err := msg.GetMessageHash() + msgHash, err := canonicalMsgHash(msg) if err != nil { return err } + parentHash := relationalParentHash(msg) + + // An add-to delivery for a message this host already holds extends the + // existing row's recipient list; inserting again would collide on the + // unique canonical sha256 (SPEC §12). + if existingID, err := existingMsgIDForAddTo(tx, msg, msgHash); err != nil { + return err + } else if existingID != 0 { + if err := attachAddToRecipients(tx, existingID, msg); err != nil { + return err + } + return tx.Commit() + } var addToFrom interface{} if msg.AddToFrom != nil { @@ -254,7 +347,7 @@ returning id`, msg.Topic, msg.Type, msgHash, - msg.Pid, + parentHash, int(msg.Size), msg.Filepath).Scan(&msgID) if err != nil { @@ -316,7 +409,7 @@ values ($1, $2, $3, $4, $5, $6, $7)`) } } - if err := resolveMsgParentLinks(tx, msgID, msgHash, msg.Pid, requiresStoredParent(msg)); err != nil { + if err := resolveMsgParentLinks(tx, msgID, msgHash, parentHash, requiresStoredParent(msg)); err != nil { return err } @@ -340,10 +433,23 @@ func storeMsgHeaderOnly(msg *FMsgHeader) error { } defer tx.Rollback() - msgHash, err := msg.GetMessageHash() + msgHash, err := canonicalMsgHash(msg) if err != nil { return err } + parentHash := relationalParentHash(msg) + + // An add-to delivery for a message this host already holds extends the + // existing row's recipient list; inserting again would collide on the + // unique canonical sha256 (SPEC §12). + if existingID, err := existingMsgIDForAddTo(tx, msg, msgHash); err != nil { + return err + } else if existingID != 0 { + if err := attachAddToRecipients(tx, existingID, msg); err != nil { + return err + } + return tx.Commit() + } var addToFrom interface{} if msg.AddToFrom != nil { @@ -376,7 +482,7 @@ returning id`, msg.Topic, msg.Type, msgHash, - msg.Pid, + parentHash, int(msg.Size), "").Scan(&msgID) if err != nil { @@ -425,7 +531,7 @@ values ($1, $2, $3, $4, $5, $6, $7)`) } } - if err := resolveMsgParentLinks(tx, msgID, msgHash, msg.Pid, requiresStoredParent(msg)); err != nil { + if err := resolveMsgParentLinks(tx, msgID, msgHash, parentHash, requiresStoredParent(msg)); err != nil { return err } diff --git a/cmd/fmsgd/store_test.go b/cmd/fmsgd/store_test.go index 6bfdf98..98a5687 100644 --- a/cmd/fmsgd/store_test.go +++ b/cmd/fmsgd/store_test.go @@ -137,3 +137,47 @@ func TestWirePidForLoadedMessageReplyKeepsParentHash(t *testing.T) { t.Fatalf("reply wire pid = %v, want parent hash %v", got, parentHash) } } + +// An add-to message's row IS the shared message; its canonical hash is the +// original-form hash carried in Pid, not the add-to variant. Storing the +// variant would make replies reference a hash the origin host never knows. +func TestCanonicalMsgHashAddToUsesPid(t *testing.T) { + origHash := []byte{9, 8, 7, 6} + + got, err := canonicalMsgHash(&FMsgHeader{Flags: FlagHasPid | FlagHasAddTo, Pid: origHash}) + if err != nil { + t.Fatalf("canonicalMsgHash returned error: %v", err) + } + if !bytes.Equal(got, origHash) { + t.Fatalf("add-to canonical hash = %v, want original-form hash %v", got, origHash) + } +} + +// A non-add-to message must never take the add-to attach path: a colliding +// sha256 there is a genuine duplicate, not the shared message being extended. +// existingMsgIDForAddTo short-circuits before touching the database for it. +func TestExistingMsgIDForAddToSkipsNonAddTo(t *testing.T) { + id, err := existingMsgIDForAddTo(nil, &FMsgHeader{Flags: FlagHasPid}, []byte{1, 2, 3}) + if err != nil { + t.Fatalf("existingMsgIDForAddTo returned error: %v", err) + } + if id != 0 { + t.Fatalf("non-add-to message returned existing id %d, want 0", id) + } +} + +// An add-to message's Pid identifies the message itself, not a parent, so it +// must not be resolved as a relational parent. A plain reply's Pid is a parent. +func TestRelationalParentHashAddToHasNoParent(t *testing.T) { + pid := []byte{1, 2, 3} + + if got := relationalParentHash(&FMsgHeader{Flags: FlagHasPid | FlagHasAddTo, Pid: pid}); got != nil { + t.Fatalf("add-to relational parent = %v, want nil", got) + } + if got := relationalParentHash(&FMsgHeader{Flags: FlagHasPid, Pid: pid}); !bytes.Equal(got, pid) { + t.Fatalf("reply relational parent = %v, want %v", got, pid) + } + if got := relationalParentHash(&FMsgHeader{}); got != nil { + t.Fatalf("new-thread relational parent = %v, want nil", got) + } +} diff --git a/dd.sql b/dd.sql index ebc6507..6d4c07c 100644 --- a/dd.sql +++ b/dd.sql @@ -132,11 +132,26 @@ create trigger trg_msg_prevent_unreferenceable_parent before update of time_sent, sha256 on msg for each row execute function prevent_referenced_msg_from_becoming_unreferenceable(); --- notify when a new msg_to row is inserted with null time_delivered so the --- sender can pick it up immediately instead of waiting for the next poll. -create or replace function notify_msg_to_insert() returns trigger as $$ +-- Notify the sender's outgoing worker (channel new_msg_to) whenever new +-- delivery work appears. One function serves all three triggers, dispatching +-- on the table it fired for: +-- * msg -- a draft message transitions to sent (time_sent set +-- for the first time); notify every recipient. +-- * msg_to/msg_add_to -- a recipient row is inserted against an already-sent +-- message (recipients added via add-to after the +-- message was sent); notify that recipient. +-- The payload is advisory only: the worker re-polls fully on any wake-up. +create or replace function notify_msg_sent() returns trigger as $$ begin - if NEW.time_delivered is null then + if TG_TABLE_NAME = 'msg' then + if OLD.time_sent is null and NEW.time_sent is not null then + perform pg_notify('new_msg_to', NEW.id::text || ',' || addr) + from msg_to where msg_id = NEW.id; + + perform pg_notify('new_msg_to', NEW.id::text || ',' || addr) + from msg_add_to where msg_id = NEW.id; + end if; + elsif NEW.time_delivered is null then perform pg_notify('new_msg_to', NEW.msg_id::text || ',' || NEW.addr) from msg where id = NEW.msg_id and time_sent is not null; end if; @@ -147,11 +162,15 @@ $$ language plpgsql; drop trigger if exists trg_msg_to_insert on msg_to; create trigger trg_msg_to_insert after insert on msg_to - for each row execute function notify_msg_to_insert(); + for each row execute function notify_msg_sent(); --- notify when a new msg_add_to row is inserted with null time_delivered so the --- sender can pick it up immediately instead of waiting for the next poll. drop trigger if exists trg_msg_add_to_insert on msg_add_to; create trigger trg_msg_add_to_insert after insert on msg_add_to - for each row execute function notify_msg_to_insert(); + for each row execute function notify_msg_sent(); + +drop trigger if exists trg_msg_sent on msg; +create trigger trg_msg_sent + after update on msg + for each row execute function notify_msg_sent(); +